Penjadwal di RxJava

1. Ikhtisar

Pada artikel ini, kita akan fokus pada berbagai jenis Penjadwal yang akan kita gunakan dalam menulis program multithreading berdasarkan metode subscribeOn dan observOn dari RxJava Observable .

Penjadwal memberikan kesempatan untuk menentukan di mana dan kemungkinan kapan harus melaksanakan tugas yang terkait dengan pengoperasian rantai yang Dapat Diamati .

Kami dapat memperoleh Penjadwal dari metode pabrik yang dijelaskan di Penjadwal kelas .

2. Perilaku Threading Default

Secara default, Rx adalah single-threaded yang menyiratkan bahwa Observable dan rantai operator yang dapat kita terapkan padanya akan memberi tahu pengamatnya pada thread yang sama tempat metode subscribe () dipanggil.

Metode observOn dan subscribeOn mengambil sebuah argumen Penjadwal, yang, seperti namanya, adalah alat yang dapat kita gunakan untuk menjadwalkan tindakan individu.

Kami akan membuat implementasi Scheduler kami dengan menggunakan metode create Worker , yang mengembalikan Scheduler.Worker. Seorang pekerja menerima tindakan dan menjalankannya secara berurutan pada satu utas.

Di satu sisi, seorang pekerja adalah S cheduler itu sendiri, tetapi kami tidak akan menyebutnya sebagai Penjadwal untuk menghindari kebingungan.

2.1. Menjadwalkan Tindakan

Kami dapat menjadwalkan pekerjaan di Penjadwal apa pun dengan membuat pekerja baru dan menjadwalkan beberapa tindakan:

Scheduler scheduler = Schedulers.immediate(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> result += "action"); Assert.assertTrue(result.equals("action"));

Tindakan tersebut kemudian dimasukkan ke dalam antrean di utas tempat pekerja ditugaskan.

2.2. Membatalkan Tindakan

Scheduler.Worker memperluas Langganan . Memanggil metode berhenti berlangganan pada seorang pekerja akan mengakibatkan antrian dikosongkan dan semua tugas yang tertunda dibatalkan. Kita bisa melihatnya dengan contoh:

Scheduler scheduler = Schedulers.newThread(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += "First_Action"; worker.unsubscribe(); }); worker.schedule(() -> result += "Second_Action"); Assert.assertTrue(result.equals("First_Action"));

Tugas kedua tidak pernah dijalankan karena tugas sebelumnya membatalkan seluruh operasi. Tindakan yang sedang dalam proses dijalankan akan terhenti.

3. Schedulers.newThread

Penjadwal ini memulai utas baru setiap kali diminta melalui subscribeOn () atau observOn () .

Ini hampir tidak pernah menjadi pilihan yang baik, bukan hanya karena latensi yang terlibat saat memulai utas, tetapi juga karena utas ini tidak digunakan kembali:

Observable.just("Hello") .observeOn(Schedulers.newThread()) .doOnNext(s -> result2 += Thread.currentThread().getName() ) .observeOn(Schedulers.newThread()) .subscribe(s -> result1 += Thread.currentThread().getName() ); Thread.sleep(500); Assert.assertTrue(result1.equals("RxNewThreadScheduler-1")); Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));

Saat Pekerja selesai, utas berhenti begitu saja. Penjadwal ini hanya dapat digunakan jika tugas-tugasnya berbutir-butir kasar: perlu banyak waktu untuk menyelesaikannya, tetapi jumlahnya sangat sedikit sehingga utas tidak mungkin digunakan kembali sama sekali.

Scheduler scheduler = Schedulers.newThread(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "_Start"; worker.schedule(() -> result += "_worker_"); result += "_End"; }); Thread.sleep(3000); Assert.assertTrue(result.equals( "RxNewThreadScheduler-1_Start_End_worker_"));

Saat kami menjadwalkan pekerja di NewThreadScheduler, kami melihat bahwa pekerja terikat ke utas tertentu.

4. Schedulers.immediate

Schedulers.immediate adalah penjadwal khusus yang memanggil tugas dalam thread klien dengan cara memblokir, bukan secara asinkron dan kembali ketika tindakan selesai:

Scheduler scheduler = Schedulers.immediate(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "_Start"; worker.schedule(() -> result += "_worker_"); result += "_End"; }); Thread.sleep(500); Assert.assertTrue(result.equals( "main_Start_worker__End"));

Faktanya, berlangganan Observable melalui Scheduler langsung biasanya memiliki efek yang sama dengan tidak berlangganan S cheduler tertentu sama sekali:

Observable.just("Hello") .subscribeOn(Schedulers.immediate()) .subscribe(s -> result += Thread.currentThread().getName() ); Thread.sleep(500); Assert.assertTrue(result.equals("main"));

5. Schedulers.trampoline

The trampolin Scheduler sangat mirip dengan segera karena juga jadwal tugas di thread yang sama, secara efektif menghalangi.

Namun, tugas yang akan datang dijalankan ketika semua tugas yang dijadwalkan sebelumnya selesai:

Observable.just(2, 4, 6, 8) .subscribeOn(Schedulers.trampoline()) .subscribe(i -> result += "" + i); Observable.just(1, 3, 5, 7, 9) .subscribeOn(Schedulers.trampoline()) .subscribe(i -> result += "" + i); Thread.sleep(500); Assert.assertTrue(result.equals("246813579"));

Segera menjalankan tugas tertentu, sedangkan trampolin menunggu tugas saat ini selesai.

The trampolin 's pekerja mengeksekusi setiap tugas pada benang yang dijadwalkan tugas pertama. Panggilan pertama untuk menjadwalkan diblokir hingga antrian dikosongkan:

Scheduler scheduler = Schedulers.trampoline(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "Start"; worker.schedule(() -> { result += "_middleStart"; worker.schedule(() -> result += "_worker_" ); result += "_middleEnd"; }); result += "_mainEnd"; }); Thread.sleep(500); Assert.assertTrue(result .equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Schedulers.from

Penjadwal secara internal lebih kompleks daripada Pelaksana dari java.util.concurrent - jadi diperlukan abstraksi terpisah.

Tetapi karena mereka secara konseptual sangat mirip, tidak mengherankan ada pembungkus yang dapat mengubah Pelaksana menjadi Penjadwal menggunakan metode dari pabrik:

private ThreadFactory threadFactory(String pattern) { return new ThreadFactoryBuilder() .setNameFormat(pattern) .build(); } @Test public void givenExecutors_whenSchedulerFrom_thenReturnElements() throws InterruptedException { ExecutorService poolA = newFixedThreadPool( 10, threadFactory("Sched-A-%d")); Scheduler schedulerA = Schedulers.from(poolA); ExecutorService poolB = newFixedThreadPool( 10, threadFactory("Sched-B-%d")); Scheduler schedulerB = Schedulers.from(poolB); Observable observable = Observable.create(subscriber -> { subscriber.onNext("Alfa"); subscriber.onNext("Beta"); subscriber.onCompleted(); });; observable .subscribeOn(schedulerA) .subscribeOn(schedulerB) .subscribe( x -> result += Thread.currentThread().getName() + x + "_", Throwable::printStackTrace, () -> result += "_Completed" ); Thread.sleep(2000); Assert.assertTrue(result.equals( "Sched-A-0Alfa_Sched-A-0Beta__Completed")); }

SchedulerB digunakan untuk waktu yang singkat, tetapi menjadwalkan tindakan baru pada schedulerA , yang melakukan semua pekerjaan. Dengan demikian, beberapa metode subscribeOn tidak hanya diabaikan, tetapi juga memperkenalkan overhead kecil.

7. Schedulers.io

This Scheduler is similar to the newThread except for the fact that already started threads are recycled and can possibly handle future requests.

This implementation works similarly to ThreadPoolExecutor from java.util.concurrent with an unbounded pool of threads. Every time a new worker is requested, either a new thread is started (and later kept idle for some time) or the idle one is reused:

Observable.just("io") .subscribeOn(Schedulers.io()) .subscribe(i -> result += Thread.currentThread().getName()); Assert.assertTrue(result.equals("RxIoScheduler-2"));

We need to be careful with unbounded resources of any kind – in case of slow or unresponsive external dependencies like web services, ioscheduler might start an enormous number of threads, leading to our very own application becoming unresponsive.

In practice, following Schedulers.io is almost always a better choice.

8. Schedulers.computation

Computation Scheduler by default limits the number of threads running in parallel to the value of availableProcessors(), as found in the Runtime.getRuntime() utility class.

So we should use a computation scheduler when tasks are entirely CPU-bound; that is, they require computational power and have no blocking code.

It uses an unbounded queue in front of every thread, so if the task is scheduled, but all cores are occupied, it will be queued. However, the queue just before each thread will keep growing:

Observable.just("computation") .subscribeOn(Schedulers.computation()) .subscribe(i -> result += Thread.currentThread().getName()); Assert.assertTrue(result.equals("RxComputationScheduler-1"));

If for some reason, we need a different number of threads than the default, we can always use the rx.scheduler.max-computation-threads system property.

By taking fewer threads we can ensure that there is always one or more CPU cores idle, and even under heavy load, computation thread pool does not saturate the server. It's simply not possible to have more computation threads than cores.

9. Schedulers.test

This Scheduler is used only for testing purposes, and we'll never see it in production code. Its main advantage is the ability to advance the clock, simulating time passing by arbitrarily:

List letters = Arrays.asList("A", "B", "C"); TestScheduler scheduler = Schedulers.test(); TestSubscriber subscriber = new TestSubscriber(); Observable tick = Observable .interval(1, TimeUnit.SECONDS, scheduler); Observable.from(letters) .zipWith(tick, (string, index) -> index + "-" + string) .subscribeOn(scheduler) .subscribe(subscriber); subscriber.assertNoValues(); subscriber.assertNotCompleted(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); subscriber.assertNoErrors(); subscriber.assertValueCount(1); subscriber.assertValues("0-A"); scheduler.advanceTimeTo(3, TimeUnit.SECONDS); subscriber.assertCompleted(); subscriber.assertNoErrors(); subscriber.assertValueCount(3); assertThat( subscriber.getOnNextEvents(), hasItems("0-A", "1-B", "2-C"));

10. Default Schedulers

Some Observable operators in RxJava have alternate forms that allow us to set which Scheduler the operator will use for its operation. Others don't operate on any particular Scheduler or operate on a particular default Scheduler.

For example, the delay operator takes upstream events and pushes them downstream after a given time. Obviously, it cannot hold the original thread during that period, so it must use a different Scheduler:

ExecutorService poolA = newFixedThreadPool( 10, threadFactory("Sched1-")); Scheduler schedulerA = Schedulers.from(poolA); Observable.just('A', 'B') .delay(1, TimeUnit.SECONDS, schedulerA) .subscribe(i -> result+= Thread.currentThread().getName() + i + " "); Thread.sleep(2000); Assert.assertTrue(result.equals("Sched1-A Sched1-B "));

Without supplying a custom schedulerA, all operators below delay would use the computation Scheduler.

Other important operators that support custom Schedulers are buffer, interval, range, timer, skip, take, timeout, and several others. If we don't provide a Scheduler to such operators, computation scheduler is utilized, which is a safe default in most cases.

11. Conclusion

In truly reactive applications, for which all long-running operations are asynchronous, very few threads and thus Schedulers are needed.

Penjadwal penguasaan sangat penting untuk menulis kode yang dapat diskalakan dan aman menggunakan RxJava. Perbedaan antara subscribeOn dan observOn sangat penting di bawah beban tinggi di mana setiap tugas harus dijalankan tepat seperti yang kita harapkan.

Last but not least, kita harus yakin bahwa Penjadwal yang digunakan di hilir dapat mengikuti iklan lo yang dihasilkan oleh Penjadwal upstrea m. Untuk informasi lebih lanjut, ada artikel tentang tekanan balik ini.

Penerapan semua contoh dan cuplikan kode ini dapat ditemukan di proyek GitHub - ini adalah proyek Maven, jadi semestinya mudah untuk mengimpor dan menjalankannya apa adanya.