Menggabungkan Observable di RxJava

1. Perkenalan

Dalam tutorial singkat ini, kita akan membahas berbagai cara menggabungkan Observable di RxJava.

Jika Anda baru mengenal RxJava, lihat tutorial intro ini terlebih dahulu.

Sekarang, mari kita langsung masuk.

2. Dapat diamati

Urutan yang dapat diamati , atau hanya Dapat diamati , adalah representasi dari aliran data asinkron.

Ini didasarkan pada pola Observer dimana sebuah objek yang disebut Observer , berlangganan item yang dipancarkan oleh Observable .

Langganan ini tidak memblokir karena Pengamat berdiri untuk bereaksi terhadap apa pun yang akan dipancarkan Observable di masa mendatang. Ini, pada gilirannya, memfasilitasi konkurensi.

Berikut demonstrasi sederhana di RxJava:

Observable .from(new String[] { "John", "Doe" }) .subscribe(name -> System.out.println("Hello " + name))

3. Menggabungkan Observable

Saat memprogram menggunakan kerangka kerja reaktif, itu adalah kasus penggunaan umum untuk menggabungkan berbagai Observable .

Dalam aplikasi web, misalnya, kita mungkin perlu mendapatkan dua kumpulan aliran data asinkron yang tidak bergantung satu sama lain.

Alih-alih menunggu streaming sebelumnya selesai sebelum meminta streaming berikutnya, kita dapat memanggil keduanya secara bersamaan dan berlangganan streaming gabungan.

Di bagian ini, kita akan membahas beberapa cara berbeda untuk menggabungkan beberapa Observable di RxJava dan kasus penggunaan berbeda yang diterapkan setiap metode.

3.1. Menggabungkan

Kita bisa menggunakan operator penggabungan untuk menggabungkan output dari beberapa Observable sehingga berfungsi seperti:

@Test public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() { TestSubscriber testSubscriber = new TestSubscriber(); Observable.merge( Observable.from(new String[] {"Hello", "World"}), Observable.from(new String[] {"I love", "RxJava"}) ).subscribe(testSubscriber); testSubscriber.assertValues("Hello", "World", "I love", "RxJava"); }

3.2. MergeDelayError

The mergeDelayError metode adalah sama dengan merge karena menggabungkan beberapa diamati menjadi satu, tetapi jika kesalahan terjadi selama penggabungan, memungkinkan bebas dari kesalahan item untuk melanjutkan sebelum menyebarkan kesalahan :

@Test public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() { TestSubscriber testSubscriber = new TestSubscriber(); Observable.mergeDelayError( Observable.from(new String[] { "hello", "world" }), Observable.error(new RuntimeException("Some exception")), Observable.from(new String[] { "rxjava" }) ).subscribe(testSubscriber); testSubscriber.assertValues("hello", "world", "rxjava"); testSubscriber.assertError(RuntimeException.class); }

Contoh di atas memancarkan semua nilai bebas kesalahan :

hello world rxjava

Perhatikan bahwa jika kita menggunakan merge daripada mergeDelayError , Stringrxjava” tidak akan dipancarkan karena merge segera menghentikan aliran data dari Observables saat terjadi kesalahan.

3.3. Zip

The zip metode penyuluhan menyatukan dua urutan nilai-nilai sebagai pasangan :

@Test public void givenTwoObservables_whenZipped_thenReturnCombinedResults() { List zippedStrings = new ArrayList(); Observable.zip( Observable.from(new String[] { "Simple", "Moderate", "Complex" }), Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}), (str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add); assertThat(zippedStrings).isNotEmpty(); assertThat(zippedStrings.size()).isEqualTo(3); assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy"); }

3.4. Zip Dengan Interval

Dalam contoh ini, kami akan membuat zip aliran dengan interval yang pada dasarnya akan menunda emisi elemen aliran pertama:

@Test public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() { TestSubscriber testSubscriber = new TestSubscriber(); Observable data = Observable.just("one", "two", "three", "four", "five"); Observable interval = Observable.interval(1L, TimeUnit.SECONDS); Observable .zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData)) .toBlocking().subscribe(testSubscriber); testSubscriber.assertCompleted(); testSubscriber.assertValueCount(5); testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five"); }

4. Ringkasan

Pada artikel ini, kami telah melihat beberapa metode untuk menggabungkan Observable dengan RxJava. Anda dapat mempelajari metode lain seperti combLatest , join , groupJoin , switchOnNext , dalam dokumentasi resmi RxJava.

Seperti biasa, kode sumber untuk artikel ini tersedia di repo GitHub kami.