Pengantar RxJava

1. Ikhtisar

Pada artikel ini, kita akan fokus menggunakan Reactive Extensions (Rx) di Java untuk membuat dan menggunakan urutan data.

Sekilas, API ini mungkin terlihat mirip dengan Java 8 Streams, tetapi kenyataannya, jauh lebih fleksibel dan lancar, menjadikannya paradigma pemrograman yang hebat.

Jika Anda ingin membaca lebih lanjut tentang RxJava, lihat artikel ini.

2. Penyiapan

Untuk menggunakan RxJava dalam proyek Maven kita, kita perlu menambahkan dependensi berikut ke pom.xml kita :

 io.reactivex rxjava ${rx.java.version} 

Atau, untuk proyek Gradle:

compile 'io.reactivex.rxjava:rxjava:x.y.z'

3. Konsep Reaktif Fungsional

Di satu sisi, pemrograman fungsional adalah proses membangun perangkat lunak dengan menyusun fungsi murni, menghindari status bersama, data yang dapat berubah, dan efek samping.

Di sisi lain, pemrograman reaktif adalah paradigma pemrograman asinkron yang berkaitan dengan aliran data dan penyebaran perubahan.

Bersama-sama, pemrograman reaktif fungsional membentuk kombinasi teknik fungsional dan reaktif yang dapat mewakili pendekatan elegan untuk pemrograman berbasis peristiwa - dengan nilai yang berubah dari waktu ke waktu dan di mana konsumen bereaksi terhadap data saat masuk.

Teknologi ini menyatukan implementasi yang berbeda dari prinsip intinya, beberapa penulis menghasilkan dokumen yang mendefinisikan kosakata umum untuk mendeskripsikan jenis aplikasi baru.

3.1. Manifesto Reaktif

Manifesto Reaktif adalah dokumen online yang menjabarkan standar tinggi untuk aplikasi dalam industri pengembangan perangkat lunak. Sederhananya, sistem reaktif adalah:

  • Responsif - sistem harus merespons secara tepat waktu
  • Message Driven - sistem harus menggunakan async message-passing antar komponen untuk memastikan kopling longgar
  • Elastis - sistem harus tetap responsif di bawah beban tinggi
  • Tangguh - sistem harus tetap responsif saat beberapa komponen gagal

4. Dapat diamati

Ada dua jenis kunci yang perlu dipahami saat bekerja dengan Rx:

  • Dapat diamati mewakili objek apa pun yang bisa mendapatkan data dari sumber data dan yang statusnya mungkin menarik sehingga objek lain dapat mendaftarkan minat
  • Sebuah pengamat adalah setiap objek yang ingin diberitahu ketika keadaan perubahan obyek lain

Seorang pengamat berlangganan urutan Observable . Urutan mengirimkan item ke pengamat satu per satu.

The pengamat menangani masing-masing sebelum memproses berikutnya. Jika banyak event datang secara asynchronous, mereka harus disimpan dalam antrian atau dibuang.

Di Rx , pengamat tidak akan pernah dipanggil dengan item yang rusak atau dipanggil sebelum callback dikembalikan untuk item sebelumnya.

4.1. Jenis yang Dapat Diamati

Ada dua jenis:

  • Non-Blocking - eksekusi asynchronous didukung dan diizinkan untuk berhenti berlangganan di titik mana pun dalam aliran acara. Pada artikel ini, kami akan lebih fokus pada jenis jenis ini
  • Pemblokiran - semua panggilan pengamat onNext akan sinkron, dan tidak mungkin berhenti berlangganan di tengah aliran acara. Kami selalu dapat mengubah Observable menjadi Blocking Observable , menggunakan metode toBlocking:
BlockingObservable blockingObservable = observable.toBlocking();

4.2. Operator

Sebuah Operator adalah fungsi yang mengambil satu O bservable (sumber) sebagai argumen pertama dan mengembalikan lain diamati (tujuan). Kemudian untuk setiap item yang dipancarkan sumber yang dapat diamati, itu akan menerapkan fungsi ke item itu, dan kemudian memancarkan hasilnya ke tujuan yang dapat diamati .

Operator dapat dirangkai bersama untuk membuat aliran data kompleks yang memfilter peristiwa berdasarkan kriteria tertentu. Beberapa operator dapat diterapkan pada observasi yang sama .

Tidaklah sulit untuk masuk ke situasi di mana Observable memancarkan item lebih cepat daripada yang dapat dikonsumsi oleh operator atau pengamat . Anda dapat membaca lebih lanjut tentang tekanan balik di sini.

4.3. Buat Observable

Operator dasar hanya menghasilkan Observable yang memancarkan satu instance generik sebelum menyelesaikan, String “Hello”. Ketika kami ingin mendapatkan informasi dari Observable , kami mengimplementasikan antarmuka pengamat dan kemudian memanggil subscribe pada Observable yang diinginkan :

Observable observable = Observable.just("Hello"); observable.subscribe(s -> result = s); assertTrue(result.equals("Hello"));

4.4. OnNext, OnError, dan OnCompleted

Ada tiga metode pada antarmuka pengamat yang ingin kita ketahui:

  1. OnNext dipanggil ke pengamat kami setiap kali acara baru dipublikasikan ke Observable terlampir . Ini adalah metode di mana kita akan melakukan beberapa tindakan pada setiap kejadian
  2. OnCompleted dipanggil ketika urutan kejadian yang terkait dengan Observable selesai, menunjukkan bahwa kita tidak boleh mengharapkan panggilan onNext lagi pada pengamat kita
  3. OnError dipanggil ketika pengecualian yang tidak tertangani dilemparkan selama kode kerangka kerja RxJava atau kode penanganan kejadian kami

Nilai kembali untuk metode berlangganan Observables adalah antarmuka berlangganan :

String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; Observable observable = Observable.from(letters); observable.subscribe( i -> result += i, //OnNext Throwable::printStackTrace, //OnError () -> result += "_Completed" //OnCompleted ); assertTrue(result.equals("abcdefg_Completed"));

5. Transformasi yang Dapat Diamati dan Operator Bersyarat

5.1. Peta

M Operator ap mengubah item yang dipancarkan oleh diamati dengan menerapkan fungsi untuk setiap item.

Mari kita asumsikan ada larik string yang dideklarasikan yang berisi beberapa huruf dari alfabet dan kami ingin mencetaknya dalam mode kapital:

Observable.from(letters) .map(String::toUpperCase) .subscribe(letter -> result += letter); assertTrue(result.equals("ABCDEFG"));

FlatMap dapat digunakan untuk meratakan Observable setiap kali kita berakhir dengan Observable bersarang .

Detail lebih lanjut tentang perbedaan antara map dan flatMap dapat ditemukan di sini.

Dengan asumsi kita memiliki metode yang mengembalikan Observable dari daftar string. Sekarang kami akan mencetak untuk setiap string dari Observable baru daftar judul berdasarkan apa yang dilihat Pelanggan :

Observable getTitle() { return Observable.from(titleList); } Observable.just("book1", "book2") .flatMap(s -> getTitle()) .subscribe(l -> result += l); assertTrue(result.equals("titletitle"));

5.2. Pindai

The scan operator applies a function to each item emitted by an Observable sequentially and emits each successive value.

It allows us to carry forward state from event to event:

String[] letters = {"a", "b", "c"}; Observable.from(letters) .scan(new StringBuilder(), StringBuilder::append) .subscribe(total -> result += total.toString()); assertTrue(result.equals("aababc"));

5.3. GroupBy

Group by operator allows us to classify the events in the input Observable into output categories.

Let's assume that we created an array of integers from 0 to 10, then apply group by that will divide them into the categories even and odd:

Observable.from(numbers) .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD") .subscribe(group -> group.subscribe((number) -> { if (group.getKey().toString().equals("EVEN")) { EVEN[0] += number; } else { ODD[0] += number; } }) ); assertTrue(EVEN[0].equals("0246810")); assertTrue(ODD[0].equals("13579"));

5.4. Filter

The operator filter emits only those items from an observable that pass a predicate test.

So let's filter in an integer array for the odd numbers:

Observable.from(numbers) .filter(i -> (i % 2 == 1)) .subscribe(i -> result += i); assertTrue(result.equals("13579"));

5.5. Conditional Operators

DefaultIfEmpty emits item from the source Observable, or a default item if the source Observable is empty:

Observable.empty() .defaultIfEmpty("Observable is empty") .subscribe(s -> result += s); assertTrue(result.equals("Observable is empty"));

The following code emits the first letter of the alphabet ‘a' because the array letters is not empty and this is what it contains in the first position:

Observable.from(letters) .defaultIfEmpty("Observable is empty") .first() .subscribe(s -> result += s); assertTrue(result.equals("a"));

TakeWhile operator discards items emitted by an Observable after a specified condition becomes false:

Observable.from(numbers) .takeWhile(i -> i  sum[0] += s); assertTrue(sum[0] == 10);

Of course, there more others operators that could cover our needs like Contain, SkipWhile, SkipUntil, TakeUntil, etc.

6. Connectable Observables

A ConnectableObservable resembles an ordinary Observable, except that it doesn't begin emitting items when it is subscribed to, but only when the connect operator is applied to it.

In this way, we can wait for all intended observers to subscribe to the Observable before the Observable begins emitting items:

String[] result = {""}; ConnectableObservable connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); connectable.subscribe(i -> result[0] += i); assertFalse(result[0].equals("01")); connectable.connect(); Thread.sleep(500); assertTrue(result[0].equals("01"));

7. Single

Single is like an Observable who, instead of emitting a series of values, emits one value or an error notification.

With this source of data, we can only use two methods to subscribe:

  • OnSuccess returns a Single that also calls a method we specify
  • OnError also returns a Single that immediately notifies subscribers of an error
String[] result = {""}; Single single = Observable.just("Hello") .toSingle() .doOnSuccess(i -> result[0] += i) .doOnError(error -> { throw new RuntimeException(error.getMessage()); }); single.subscribe(); assertTrue(result[0].equals("Hello"));

8. Subjects

A Subject is simultaneously two elements, a subscriber and an observable. As a subscriber, a subject can be used to publish the events coming from more than one observable.

And because it's also observable, the events from multiple subscribers can be reemitted as its events to anyone observing it.

In the next example, we'll look at how the observers will be able to see the events that occur after they subscribe:

Integer subscriber1 = 0; Integer subscriber2 = 0; Observer getFirstObserver() { return new Observer() { @Override public void onNext(Integer value) { subscriber1 += value; } @Override public void onError(Throwable e) { System.out.println("error"); } @Override public void onCompleted() { System.out.println("Subscriber1 completed"); } }; } Observer getSecondObserver() { return new Observer() { @Override public void onNext(Integer value) { subscriber2 += value; } @Override public void onError(Throwable e) { System.out.println("error"); } @Override public void onCompleted() { System.out.println("Subscriber2 completed"); } }; } PublishSubject subject = PublishSubject.create(); subject.subscribe(getFirstObserver()); subject.onNext(1); subject.onNext(2); subject.onNext(3); subject.subscribe(getSecondObserver()); subject.onNext(4); subject.onCompleted(); assertTrue(subscriber1 + subscriber2 == 14)

9. Resource Management

Using operation allows us to associate resources, such as a JDBC database connection, a network connection, or open files to our observables.

Di sini kami menyajikan dalam komentar langkah-langkah yang perlu kami lakukan untuk mencapai tujuan ini dan juga contoh penerapan:

String[] result = {""}; Observable values = Observable.using( () -> "MyResource", r -> { return Observable.create(o -> { for (Character c : r.toCharArray()) { o.onNext(c); } o.onCompleted(); }); }, r -> System.out.println("Disposed: " + r) ); values.subscribe( v -> result[0] += v, e -> result[0] += e ); assertTrue(result[0].equals("MyResource"));

10. Kesimpulan

Pada artikel ini, kami telah membahas cara menggunakan pustaka RxJava dan juga cara menjelajahi fitur-fiturnya yang paling penting.

Kode sumber lengkap untuk proyek termasuk semua contoh kode yang digunakan di sini dapat ditemukan di Github.