WebSockets dengan Play Framework dan Akka

1. Ikhtisar

Ketika kami ingin klien web kami mempertahankan dialog dengan server kami, maka WebSockets dapat menjadi solusi yang berguna. WebSockets mempertahankan koneksi dupleks penuh yang persisten. Ini memberi kami kemampuan untuk mengirim pesan dua arah antara server dan klien kami.

Dalam tutorial ini, kita akan mempelajari cara menggunakan WebSockets dengan Akka di Play Framework.

2. Penyiapan

Mari kita siapkan aplikasi obrolan sederhana. Pengguna akan mengirim pesan ke server, dan server akan merespon dengan pesan dari JSONPlaceholder.

2.1. Menyiapkan Aplikasi Play Framework

Kami akan membangun aplikasi ini menggunakan Play Framework.

Mari ikuti instruksi dari Introduction to Play in Java untuk menyiapkan dan menjalankan aplikasi Play Framework sederhana.

2.2. Menambahkan File JavaScript yang Diperlukan

Selain itu, kita perlu bekerja dengan JavaScript untuk pembuatan skrip sisi klien. Ini akan memungkinkan kami menerima pesan baru yang didorong dari server. Kami akan menggunakan perpustakaan jQuery untuk ini.

Mari tambahkan jQuery ke bagian bawah file app / views / i ndex.scala.html :

2.3. Menyiapkan Akka

Terakhir, kami akan menggunakan Akka untuk menangani koneksi WebSocket di sisi server.

Mari arahkan ke file build.sbt dan tambahkan dependensi.

Kita perlu menambahkan dependensi akka-actor dan akka-testkit :

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion

Kami membutuhkan ini untuk dapat menggunakan dan menguji kode Akka Framework.

Selanjutnya, kami akan menggunakan aliran Akka. Jadi, mari tambahkan dependensi akka-stream :

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion

Terakhir, kita perlu memanggil titik akhir istirahat dari aktor Akka. Untuk ini, kita membutuhkan ketergantungan akka-http . Saat kita melakukannya, endpoint akan mengembalikan data JSON yang harus kita deserialisasi, jadi kita perlu menambahkan dependensi akka-http-jackson juga:

libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion

Dan sekarang kita sudah siap. Mari kita lihat bagaimana WebSockets berfungsi!

3. Menangani WebSockets Dengan Akka Actors

Mekanisme penanganan WebSocket Play dibangun di sekitar aliran Akka. Sebuah WebSocket dimodelkan sebagai Flow. Jadi, pesan WebSocket yang masuk dimasukkan ke dalam aliran, dan pesan yang dihasilkan oleh aliran tersebut dikirim ke klien.

Untuk menangani WebSocket menggunakan Actor, kita memerlukan utilitas Play ActorFlow yang mengonversi ActorRef menjadi aliran. Ini terutama membutuhkan beberapa kode Java, dengan sedikit konfigurasi.

3.1. Metode Pengontrol WebSocket

Pertama, kita membutuhkan instance Materializer . Materializer adalah pabrik untuk mesin eksekusi aliran.

Kita perlu memasukkan ActorSystem dan Materializer ke dalam aplikasi controller / controllers / HomeController.java :

private ActorSystem actorSystem; private Materializer materializer; @Inject public HomeController( ActorSystem actorSystem, Materializer materializer) { this.actorSystem = actorSystem; this.materializer = materializer; }

Sekarang mari tambahkan metode pengontrol soket:

public WebSocket socket() { return WebSocket.Json .acceptOrResult(this::createActorFlow); }

Di sini kita memanggil fungsi acceptOrResult yang mengambil header permintaan dan mengembalikan masa depan. Masa depan yang dikembalikan adalah aliran untuk menangani pesan WebSocket.

Sebagai gantinya, kami dapat menolak permintaan dan mengembalikan hasil penolakan.

Sekarang, mari buat alurnya:

private CompletionStage
    
     > createActorFlow(Http.RequestHeader request) { return CompletableFuture.completedFuture( F.Either.Right(createFlowForActor())); }
    

Kelas F di Play Framework mendefinisikan satu set pembantu gaya pemrograman fungsional. Dalam kasus ini, kami menggunakan F. Either.Right untuk menerima koneksi dan mengembalikan aliran.

Katakanlah kita ingin menolak koneksi ketika klien tidak diautentikasi.

Untuk ini, kami dapat memeriksa apakah nama pengguna disetel dalam sesi. Dan jika tidak, kami menolak koneksi dengan HTTP 403 Forbidden:

private CompletionStage
    
     > createActorFlow2(Http.RequestHeader request) { return CompletableFuture.completedFuture( request.session() .getOptional("username") .map(username -> F.Either.
     
      Right( createFlowForActor())) .orElseGet(() -> F.Either.Left(forbidden()))); }
     
    

Kami menggunakan F.Either.Left untuk menolak koneksi dengan cara yang sama seperti kami menyediakan aliran dengan F.Either.Right .

Terakhir, kami menautkan aliran ke aktor yang akan menangani pesan:

private Flow createFlowForActor() { return ActorFlow.actorRef(out -> Messenger.props(out), actorSystem, materializer); }

The ActorFlow.actorRef menciptakan aliran yang ditangani oleh Messenger aktor .

3.2. The -rute Berkas

Sekarang, mari tambahkan definisi routes untuk metode controller di conf / routes :

GET / controllers.HomeController.index(request: Request) GET /chat controllers.HomeController.socket GET /chat/with/streams controllers.HomeController.akkaStreamsSocket GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset)

Definisi rute ini memetakan permintaan HTTP yang masuk ke metode tindakan pengontrol seperti yang dijelaskan dalam Perutean di Aplikasi Play di Java.

3.3. Pelaksanaan Pelaku

Bagian terpenting dari kelas aktor adalah metode createReceive yang menentukan pesan mana yang dapat ditangani aktor:

@Override public Receive createReceive() { return receiveBuilder() .match(JsonNode.class, this::onSendMessage) .matchAny(o -> log.error("Received unknown message: {}", o.getClass())) .build(); }

Aktor akan meneruskan semua pesan yang cocok dengan kelas JsonNode ke metode penanganan onSendMessage :

private void onSendMessage(JsonNode jsonNode) { RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode); String message = requestDTO.getMessage().toLowerCase(); //.. processMessage(requestDTO); }

Kemudian handler akan menanggapi setiap pesan menggunakan metode processMessage :

private void processMessage(RequestDTO requestDTO) { CompletionStage responseFuture = getRandomMessage(); responseFuture.thenCompose(this::consumeHttpResponse) .thenAccept(messageDTO -> out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf())); }

3.4. Mengkonsumsi Rest API dengan Akka HTTP

Kami akan mengirimkan permintaan HTTP ke generator pesan palsu di JSONPlaceholder Posts. Ketika respon tiba, kami mengirim respon ke klien dengan menulis itu keluar .

Mari kita memiliki metode yang memanggil titik akhir dengan id posting acak:

private CompletionStage getRandomMessage() { int postId = ThreadLocalRandom.current().nextInt(0, 100); return Http.get(getContext().getSystem()) .singleRequest(HttpRequest.create( "//jsonplaceholder.typicode.com/posts/" + postId)); }

Kami juga memproses HttpResponse yang kami dapat dari memanggil layanan untuk mendapatkan respons JSON:

private CompletionStage consumeHttpResponse( HttpResponse httpResponse) { Materializer materializer = Materializer.matFromSystem(getContext().getSystem()); return Jackson.unmarshaller(MessageDTO.class) .unmarshal(httpResponse.entity(), materializer) .thenApply(messageDTO -> { log.info("Received message: {}", messageDTO); discardEntity(httpResponse, materializer); return messageDTO; }); }

Kelas MessageConverter adalah utilitas untuk mengkonversi antara JsonNode dan DTO:

public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) { ObjectMapper mapper = new ObjectMapper(); return mapper.convertValue(jsonNode, MessageDTO.class); }

Selanjutnya, kita perlu membuang entitas tersebut. The discardEntityBytes metode convenience melayani tujuan dengan mudah membuang entitas jika tidak memiliki tujuan bagi kita.

Mari kita lihat cara membuang byte:

private void discardEntity( HttpResponse httpResponse, Materializer materializer) { HttpMessage.DiscardedEntity discarded = httpResponse.discardEntityBytes(materializer); discarded.completionStage() .whenComplete((done, ex) -> log.info("Entity discarded completely!")); }

Sekarang setelah menyelesaikan penanganan WebSocket, mari kita lihat bagaimana kita dapat menyiapkan klien untuk ini menggunakan HTML5 WebSockets.

4. Menyiapkan Klien WebSocket

Untuk klien kami, mari buat aplikasi obrolan berbasis web sederhana.

4.1. Tindakan Pengontrol

Kita perlu mendefinisikan aksi pengontrol yang menampilkan halaman indeks. Kami akan meletakkan ini di kelas pengontrol app.controllers.HomeController :

public Result index(Http.Request request) { String url = routes.HomeController.socket() .webSocketURL(request); return ok(views.html.index.render(url)); } 

4.2. Halaman Template

Sekarang, mari menuju ke halaman app / views / ndex.scala.html dan menambahkan wadah untuk pesan yang diterima dan formulir untuk menangkap pesan baru:

 F   Send 

We'll also need to pass in the URL for the WebSocket controller action by declaring this parameter at the top of the app/views/index.scala.htmlpage:

@(url: String)

4.3. WebSocket Event Handlers in JavaScript

And now, we can add the JavaScript to handle the WebSocket events. For simplicity, we'll add the JavaScript functions at the bottom of the app/views/index.scala.html page.

Let's declare the event handlers:

var webSocket; var messageInput; function init() { initWebSocket(); } function initWebSocket() { webSocket = new WebSocket("@url"); webSocket.onopen = onOpen; webSocket.onclose = onClose; webSocket.onmessage = onMessage; webSocket.onerror = onError; }

Let's add the handlers themselves:

function onOpen(evt) { writeToScreen("CONNECTED"); } function onClose(evt) { writeToScreen("DISCONNECTED"); } function onError(evt) { writeToScreen("ERROR: " + JSON.stringify(evt)); } function onMessage(evt) { var receivedData = JSON.parse(evt.data); appendMessageToView("Server", receivedData.body); }

Then, to present the output, we'll use the functions appendMessageToView and writeToScreen:

function appendMessageToView(title, message) { $("#messageContent").append("

" + title + ": " + message + "

"); } function writeToScreen(message) { console.log("New message: ", message); }

4.4. Running and Testing the Application

We're ready to test the application, so let's run it:

cd websockets sbt run

With the application running, we can chat with the server by visiting //localhost:9000:

Every time we type a message and hit Send the server will immediately respond with some lorem ipsum from the JSON Placeholder service.

5. Handling WebSockets Directly with Akka Streams

If we are processing a stream of events from a source and sending these to the client, then we can model this around Akka streams.

Let's see how we can use Akka streams in an example where the server sends messages every two seconds.

We'll start with the WebSocket action in the HomeController:

public WebSocket akkaStreamsSocket() { return WebSocket.Json.accept(request -> { Sink in = Sink.foreach(System.out::println); MessageDTO messageDTO = new MessageDTO("1", "1", "Title", "Test Body"); Source out = Source.tick( Duration.ofSeconds(2), Duration.ofSeconds(2), MessageConverter.messageToJsonNode(messageDTO) ); return Flow.fromSinkAndSource(in, out); }); }

The Source#tick method takes three parameters. The first is the initial delay before the first tick is processed, and the second is the interval between successive ticks. We've set both values to two seconds in the above snippet. The third parameter is an object that should be returned on each tick.

To see this in action, we need to modify the URL in the index action and make it point to the akkaStreamsSocket endpoint:

String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);

And now refreshing the page, we'll see a new entry every two seconds:

6. Terminating the Actor

At some point, we'll need to shut down the chat, either through a user request or through a timeout.

6.1. Handling Actor Termination

How do we detect when a WebSocket has been closed?

Play will automatically close the WebSocket when the actor that handles the WebSocket terminates. So we can handle this scenario by implementing the Actor#postStop method:

@Override public void postStop() throws Exception { log.info("Messenger actor stopped at {}", OffsetDateTime.now() .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)); }

6.2. Manually Terminating the Actor

Further, if we must stop the actor, we can send a PoisonPill to the actor. In our example application, we should be able to handle a “stop” request.

Let's see how to do this in the onSendMessage method:

private void onSendMessage(JsonNode jsonNode) { RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode); String message = requestDTO.getMessage().toLowerCase(); if("stop".equals(message)) { MessageDTO messageDTO = createMessageDTO("1", "1", "Stop", "Stopping actor"); out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()); self().tell(PoisonPill.getInstance(), getSelf()); } else { log.info("Actor received. {}", requestDTO); processMessage(requestDTO); } }

When we receive a message, we check if it's a stop request. If it is, we send the PoisonPill. Otherwise, we process the request.

7. Configuration Options

We can configure several options in terms of how the WebSocket should be handled. Let's look at a few.

7.1. WebSocket Frame Length

WebSocket communication involves the exchange of data frames.

The WebSocket frame length is configurable. We have the option to adjust the frame length to our application requirements.

Configuring a shorter frame length may help reduce denial of service attacks that use long data frames. We can change the frame length for the application by specifying the max length in application.conf:

play.server.websocket.frame.maxLength = 64k

We can also set this configuration option by specifying the max length as a command-line parameter:

sbt -Dwebsocket.frame.maxLength=64k run

7.2. Connection Idle Timeout

By default, the actor we use to handle the WebSocket is terminated after one minute. This is because the Play server in which our application is running has a default idle timeout of 60 seconds. This means that all connections that do not receive a request in sixty seconds are closed automatically.

We can change this through configuration options. Let's head over to our application.conf and change the server to have no idle timeout:

play.server.http.idleTimeout = "infinite"

Or we can pass in the option as command-line arguments:

sbt -Dhttp.idleTimeout=infinite run

We can also configure this by specifying devSettings in build.sbt.

Config options specified in build.sbt are only used in development, they will be ignored in production:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"

Jika kami menjalankan ulang aplikasi, aktor tidak akan berhenti.

Kita dapat mengubah nilainya menjadi detik:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "120 s"

Kami dapat mengetahui lebih lanjut tentang opsi konfigurasi yang tersedia di dokumentasi Play Framework.

8. Kesimpulan

Dalam tutorial ini, kami mengimplementasikan WebSockets di Play Framework dengan Akka actor dan Akka Streams.

Kami kemudian melanjutkan untuk melihat cara menggunakan aktor Akka secara langsung dan kemudian melihat bagaimana Aliran Akka dapat disiapkan untuk menangani koneksi WebSocket.

Di sisi klien, kami menggunakan JavaScript untuk menangani acara WebSocket kami.

Terakhir, kami melihat beberapa opsi konfigurasi yang dapat kami gunakan.

Seperti biasa, kode sumber untuk tutorial ini tersedia di GitHub.