Java 9 Reaktif Stream

1. Ikhtisar

Pada artikel ini, kita akan melihat Java 9 Reactive Streams. Sederhananya, kita akan dapat menggunakan kelas Flow , yang menyertakan blok penyusun utama untuk membangun logika pemrosesan aliran reaktif.

Aliran Reaktif adalah standar untuk pemrosesan aliran asinkron dengan tekanan balik non-pemblokiran. Spesifikasi ini didefinisikan dalam Manifesto Reaktif, dan terdapat berbagai penerapannya, misalnya, RxJava atau Akka-Streams.

2. Ringkasan API Reaktif

Untuk membangun Arus , kita dapat menggunakan tiga abstraksi utama dan menyusunnya ke dalam logika pemrosesan asinkron.

Setiap Alur perlu memproses peristiwa yang dipublikasikan kepadanya oleh instance Penerbit ; yang Penerbit memiliki satu metode - berlangganan ().

Jika ada pelanggan yang ingin menerima acara yang diterbitkan olehnya, mereka harus berlangganan ke Penerbit tertentu .

Penerima pesan perlu mengimplementasikan antarmuka Subscriber . Biasanya ini adalah akhir untuk setiap pemrosesan Flow karena instansinya tidak mengirim pesan lebih lanjut.

Kita bisa menganggap Subscriber sebagai Sink. Ini memiliki empat metode yang perlu diganti - onSubscribe (), onNext (), onError (), dan onComplete (). Kami akan melihat itu di bagian selanjutnya.

Jika kita ingin mengubah pesan masuk dan meneruskannya ke Pelanggan berikutnya , kita perlu mengimplementasikan antarmuka Prosesor . Ini bertindak sebagai Pelanggan karena menerima pesan, dan sebagai Penerbit karena memproses pesan tersebut dan mengirimkannya untuk diproses lebih lanjut.

3. Menerbitkan dan Mengkonsumsi Pesan

Katakanlah kita ingin membuat Flow sederhana , di mana kita memiliki Publisher menerbitkan pesan, dan Subscriber sederhana yang memakan pesan saat mereka tiba - satu per satu.

Mari buat kelas EndSubscriber . Kita perlu mengimplementasikan antarmuka Subscriber . Selanjutnya, kami akan mengganti metode yang diperlukan.

Metode onSubscribe () dipanggil sebelum pemrosesan dimulai. Instance dari Subscription dilewatkan sebagai argumen. Ini adalah kelas yang digunakan untuk mengontrol aliran pesan antara Pelanggan dan Penerbit:

public class EndSubscriber implements Subscriber { private Subscription subscription; public List consumedElements = new LinkedList(); @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } }

Kami juga diinisialisasi kosong Daftar dari consumedElements yang akan digunakan dalam tes.

Sekarang, kita perlu mengimplementasikan metode yang tersisa dari antarmuka Subscriber . Metode utama di sini adalah onNext () - ini dipanggil setiap kali Penerbit menerbitkan pesan baru:

@Override public void onNext(T item) { System.out.println("Got : " + item); subscription.request(1); }

Perhatikan bahwa saat kami memulai langganan dalam metode onSubscribe () dan saat kami memproses pesan, kami perlu memanggil metode request () pada Langganan untuk memberi sinyal bahwa Pelanggan saat ini siap untuk mengonsumsi lebih banyak pesan.

Terakhir, kita perlu mengimplementasikan onError () - yang dipanggil setiap kali beberapa pengecualian akan dilemparkan dalam pemrosesan, serta onComplete () - dipanggil saat Publisher ditutup:

@Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { System.out.println("Done"); }

Mari tulis tes untuk Arus Pemrosesan . Kami akan menggunakan kelas SubmissionPublisher - sebuah konstruksi dari java.util.concurrent - yang mengimplementasikan antarmuka Publisher .

Kami akan mengirimkan N elemen ke Publisher - yang akan diterima EndSubscriber kami :

@Test public void whenSubscribeToIt_thenShouldConsumeAll() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); EndSubscriber subscriber = new EndSubscriber(); publisher.subscribe(subscriber); List items = List.of("1", "x", "2", "x", "3", "x"); // when assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until( () -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(items) ); }

Perhatikan, bahwa kami memanggil metode close () pada instance EndSubscriber. Ini akan memanggil callback onComplete () di bawahnya pada setiap Pelanggan dari Penerbit yang diberikan .

Menjalankan program itu akan menghasilkan keluaran sebagai berikut:

Got : 1 Got : x Got : 2 Got : x Got : 3 Got : x Done

4. Transformasi Pesan

Katakanlah kita ingin membangun logika serupa antara Penerbit dan Pelanggan , tetapi juga menerapkan beberapa transformasi.

Kami akan membuat kelas TransformProcessor yang mengimplementasikan Processor dan memperluas SubmissionPublisher - karena ini akan menjadi P ublisher dan S ubscriber.

Kami akan meneruskan Fungsi yang akan mengubah input menjadi output:

public class TransformProcessor extends SubmissionPublisher implements Flow.Processor { private Function function; private Flow.Subscription subscription; public TransformProcessor(Function function) { super(); this.function = function; } @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { submit(function.apply(item)); subscription.request(1); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { close(); } }

Sekarang mari kita tulis pengujian cepat dengan alur pemrosesan di mana Penerbit menerbitkan elemen String .

TransformProcessor kami akan mengurai String sebagai Integer - yang berarti konversi perlu terjadi di sini:

@Test public void whenSubscribeAndTransformElements_thenShouldConsumeAll() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); TransformProcessor transformProcessor = new TransformProcessor(Integer::parseInt); EndSubscriber subscriber = new EndSubscriber(); List items = List.of("1", "2", "3"); List expectedResult = List.of(1, 2, 3); // when publisher.subscribe(transformProcessor); transformProcessor.subscribe(subscriber); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until(() -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(expectedResult) ); }

Perhatikan, bahwa memanggil metode close () di dasar Publisher akan menyebabkan metode onComplete () di TransformProcessor dipanggil.

Perlu diingat bahwa semua penerbit dalam rantai pemrosesan harus ditutup dengan cara ini.

5. Mengontrol Permintaan Pesan Menggunakan Langganan

Katakanlah kita hanya ingin menggunakan elemen pertama dari Subscription, menerapkan beberapa logika dan menyelesaikan pemrosesan. Kita bisa menggunakan metode request () untuk mencapai ini.

Mari kita memodifikasi EndSubscriber kita untuk menggunakan hanya sejumlah N pesan. Kami akan meneruskan angka itu sebagai argumen konstruktor howMuchMessagesConsume :

public class EndSubscriber implements Subscriber { private AtomicInteger howMuchMessagesConsume; private Subscription subscription; public List consumedElements = new LinkedList(); public EndSubscriber(Integer howMuchMessagesConsume) { this.howMuchMessagesConsume = new AtomicInteger(howMuchMessagesConsume); } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { howMuchMessagesConsume.decrementAndGet(); System.out.println("Got : " + item); consumedElements.add(item); if (howMuchMessagesConsume.get() > 0) { subscription.request(1); } } //... }

Kita bisa meminta elemen selama kita mau.

Mari tulis tes di mana kita hanya ingin mengonsumsi satu elemen dari Langganan yang diberikan :

@Test public void whenRequestForOnlyOneElement_thenShouldConsumeOne() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); EndSubscriber subscriber = new EndSubscriber(1); publisher.subscribe(subscriber); List items = List.of("1", "x", "2", "x", "3", "x"); List expected = List.of("1"); // when assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until(() -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(expected) ); }

Although the publisher is publishing six elements, our EndSubscriber will consume only one element because it signals demand for processing only that single one.

By using the request() method on the Subscription, we can implement a more sophisticated back-pressure mechanism to control the speed of the message consumption.

6. Conclusion

In this article, we had a look at the Java 9 Reactive Streams.

We saw how to create a processing Flow consisting of a Publisher and a Subscriber. We created a more complex processing flow with the transformation of elements using Processors.

Terakhir, kami menggunakan Subscription untuk mengontrol permintaan elemen oleh Subscriber.

Penerapan semua contoh dan cuplikan kode ini dapat ditemukan di proyek GitHub - ini adalah proyek Maven, jadi semestinya mudah untuk mengimpor dan menjalankannya apa adanya.