Pengantar Bus Reaktor Proyek

1. Ikhtisar

Dalam artikel singkat ini, kami akan memperkenalkan bus reaktor dengan menyiapkan skenario kehidupan nyata untuk aplikasi reaktif berbasis peristiwa.

2. Dasar-dasar Reaktor Proyek

2.1. Mengapa Reaktor?

Aplikasi modern perlu menangani sejumlah besar permintaan bersamaan dan memproses sejumlah besar data. Standar, kode pemblokiran tidak lagi cukup untuk memenuhi persyaratan ini.

Pola desain reaktif adalah pendekatan arsitektur berbasis peristiwa untuk penanganan asinkron dari sejumlah besar permintaan layanan bersamaan yang berasal dari satu atau beberapa penangan layanan.

Project Reactor didasarkan pada pola ini dan memiliki tujuan yang jelas dan ambisius untuk membangun aplikasi non-pemblokiran dan reaktif di JVM .

2.2. Contoh Skenario

Sebelum kita mulai, berikut adalah beberapa skenario menarik di mana memanfaatkan gaya arsitektur reaktif akan masuk akal, hanya untuk mendapatkan gambaran tentang di mana kita dapat menerapkannya:

  • Layanan notifikasi untuk platform belanja online besar seperti Amazon
  • Layanan pemrosesan transaksi besar untuk sektor perbankan
  • Bisnis perdagangan saham di mana harga saham berubah secara bersamaan

3. Ketergantungan Maven

Mari mulai menggunakan Project Reactor Bus dengan menambahkan dependensi berikut ke pom.xml kita :

 io.projectreactor reactor-bus 2.0.8.RELEASE 

Kami dapat memeriksa bus reaktor versi terbaru di Maven Central.

4. Membangun Aplikasi Demo

Untuk lebih memahami manfaat dari pendekatan berbasis reaktor, mari kita lihat contoh praktis.

Kami akan membangun aplikasi sederhana yang bertanggung jawab untuk mengirimkan pemberitahuan kepada pengguna platform belanja online. Misalnya, jika pengguna melakukan pemesanan baru, aplikasi akan mengirimkan konfirmasi pesanan melalui email atau SMS.

Penerapan sinkron yang khas secara alami akan dibatasi oleh throughput layanan email atau SMS. Oleh karena itu, lonjakan lalu lintas, seperti hari libur umumnya akan menjadi masalah.

Dengan pendekatan reaktif, kami dapat merancang sistem kami agar lebih fleksibel dan beradaptasi lebih baik terhadap kegagalan atau batas waktu yang mungkin terjadi di sistem eksternal, seperti server gateway.

Mari kita lihat aplikasinya - dimulai dengan aspek yang lebih tradisional dan beralih ke konstruksi yang lebih reaktif.

4.1. POJO sederhana

Pertama, mari buat kelas POJO untuk mewakili data notifikasi:

public class NotificationData { private long id; private String name; private String email; private String mobile; // getter and setter methods }

4.2. Lapisan Layanan

Sekarang mari kita tentukan lapisan layanan sederhana:

public interface NotificationService { void initiateNotification(NotificationData notificationData) throws InterruptedException; }

Dan implementasinya, simulasi operasi yang berjalan lama:

@Service public class NotificationServiceimpl implements NotificationService { @Override public void initiateNotification(NotificationData notificationData) throws InterruptedException { System.out.println("Notification service started for " + "Notification ID: " + notificationData.getId()); Thread.sleep(5000); System.out.println("Notification service ended for " + "Notification ID: " + notificationData.getId()); } }

Perhatikan bahwa untuk menggambarkan skenario kehidupan nyata dari pengiriman pesan melalui SMS atau email gateway, kami sengaja memperkenalkan lima detik delay di initiateNotification metode dengan Thread.sleep (5000).

Akibatnya, ketika sebuah utas mencapai layanan, itu akan diblokir selama lima detik.

4.3. Konsumen

Sekarang mari beralih ke aspek yang lebih reaktif dari aplikasi kita dan menerapkan konsumen - yang kemudian akan kita petakan ke bus kejadian reaktor:

@Service public class NotificationConsumer implements Consumer
    
      { @Autowired private NotificationService notificationService; @Override public void accept(Event notificationDataEvent) { NotificationData notificationData = notificationDataEvent.getData(); try { notificationService.initiateNotification(notificationData); } catch (InterruptedException e) { // ignore } } }
    

Seperti yang bisa kita lihat, konsumen yang kita buat mengimplementasikan antarmuka Konsumen . Logika utama berada di metode terima .

Ini adalah pendekatan serupa yang dapat kita temui dalam implementasi pendengar Spring pada umumnya.

4.4. Pengendali

Akhirnya, sekarang setelah kita dapat menggunakan event, mari kita buat juga.

Kami akan melakukannya dalam pengontrol sederhana:

@Controller public class NotificationController { @Autowired private EventBus eventBus; @GetMapping("/startNotification/{param}") public void startNotification(@PathVariable Integer param) { for (int i = 0; i < param; i++) { NotificationData data = new NotificationData(); data.setId(i); eventBus.notify("notificationConsumer", Event.wrap(data)); System.out.println( "Notification " + i + ": notification task submitted successfully"); } } }

Ini cukup jelas - kami memancarkan acara melalui EventBus di sini.

Misalnya, jika klien menekan URL dengan nilai param sepuluh, maka sepuluh peristiwa akan dikirim melalui bus peristiwa.

4.5. Konfigurasi Java

Sekarang mari kita gabungkan semuanya dan membuat aplikasi Spring Boot sederhana.

Pertama, kita perlu mengkonfigurasi kacang EventBus dan Environment :

@Configuration public class Config { @Bean public Environment env() { return Environment.initializeIfEmpty().assignErrorJournal(); } @Bean public EventBus createEventBus(Environment env) { return EventBus.create(env, Environment.THREAD_POOL); } }

Dalam kasus kami, kami membuat instance EventBus dengan kumpulan utas default yang tersedia di lingkungan .

Atau, kita dapat menggunakan contoh Dispatcher yang disesuaikan :

EventBus evBus = EventBus.create( env, Environment.newDispatcher( REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT, DispatcherType.THREAD_POOL_EXECUTOR));

Sekarang, kami siap membuat kode aplikasi utama:

import static reactor.bus.selector.Selectors.$; @SpringBootApplication public class NotificationApplication implements CommandLineRunner { @Autowired private EventBus eventBus; @Autowired private NotificationConsumer notificationConsumer; @Override public void run(String... args) throws Exception { eventBus.on($("notificationConsumer"), notificationConsumer); } public static void main(String[] args) { SpringApplication.run(NotificationApplication.class, args); } }

Dalam metode run kami, kami mendaftarkan notificationConsumer untuk dipicu ketika notifikasi cocok dengan selektor yang diberikan .

Perhatikan bagaimana kami menggunakan impor statis dari atribut $ untuk membuat objek Selector .

5. Uji Aplikasi

Sekarang mari buat tes untuk melihat NotificationApplication kita beraksi:

@RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class NotificationApplicationIntegrationTest { @LocalServerPort private int port; @Test public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() { RestTemplate restTemplate = new RestTemplate(); restTemplate.getForObject("//localhost:" + port + "/startNotification/10", String.class); } }

Seperti yang bisa kita lihat, segera setelah permintaan dijalankan, kesepuluh tugas dikirimkan secara instan tanpa membuat pemblokiran apa pun . Dan setelah dikirim, acara notifikasi diproses secara paralel.

Notification 0: notification task submitted successfully Notification 1: notification task submitted successfully Notification 2: notification task submitted successfully Notification 3: notification task submitted successfully Notification 4: notification task submitted successfully Notification 5: notification task submitted successfully Notification 6: notification task submitted successfully Notification 7: notification task submitted successfully Notification 8: notification task submitted successfully Notification 9: notification task submitted successfully Notification service started for Notification ID: 1 Notification service started for Notification ID: 2 Notification service started for Notification ID: 3 Notification service started for Notification ID: 0 Notification service ended for Notification ID: 1 Notification service ended for Notification ID: 0 Notification service started for Notification ID: 4 Notification service ended for Notification ID: 3 Notification service ended for Notification ID: 2 Notification service started for Notification ID: 6 Notification service started for Notification ID: 5 Notification service started for Notification ID: 7 Notification service ended for Notification ID: 4 Notification service started for Notification ID: 8 Notification service ended for Notification ID: 6 Notification service ended for Notification ID: 5 Notification service started for Notification ID: 9 Notification service ended for Notification ID: 7 Notification service ended for Notification ID: 8 Notification service ended for Notification ID: 9

Penting untuk diingat bahwa dalam skenario kami, tidak perlu memproses peristiwa ini dalam urutan tertentu.

6. Kesimpulan

Dalam tutorial singkat ini, kami telah membuat aplikasi sederhana berbasis peristiwa . Kami juga telah melihat cara mulai menulis kode yang lebih reaktif dan non-pemblokiran.

Namun, skenario ini hanya menggores permukaan subjek dan hanya mewakili dasar yang baik untuk mulai bereksperimen dengan paradigma reaktif .

Seperti biasa, kode sumber tersedia di GitHub.