Pengiriman Pesan RabbitMQ dengan Spring AMQP

1. Perkenalan

Dalam tutorial ini, kita akan menjelajahi konsep fanout dan pertukaran topik dengan Spring AMQP dan RabbitMQ.

Pada tingkat tinggi, pertukaran fanout akan menyiarkan pesan yang sama ke semua antrian terikat , sementara pertukaran topik menggunakan kunci perutean untuk meneruskan pesan ke antrian atau antrian terikat tertentu .

Sebelum membaca Messaging With Spring AMQP direkomendasikan untuk tutorial ini.

2. Menyiapkan Fanout Exchange

Mari kita siapkan satu pertukaran fanout dengan dua antrian terikat padanya. Ketika kami mengirim pesan ke pertukaran ini, kedua antrian akan menerima pesan tersebut. Pertukaran fanout kami mengabaikan kunci perutean apa pun yang disertakan dengan pesan.

Spring AMQP memungkinkan kita untuk mengumpulkan semua deklarasi antrian, pertukaran, dan binding dalam objek Declarables :

@Bean public Declarables fanoutBindings() { Queue fanoutQueue1 = new Queue("fanout.queue1", false); Queue fanoutQueue2 = new Queue("fanout.queue2", false); FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange"); return new Declarables( fanoutQueue1, fanoutQueue2, fanoutExchange, bind(fanoutQueue1).to(fanoutExchange), BindingBuilder.bind(fanoutQueue2).to(fanoutExchange)); }

3. Menyiapkan Pertukaran Topik

Sekarang, kami juga akan menyiapkan pertukaran topik dengan dua antrean, masing-masing dengan pola pengikatan yang berbeda:

@Bean public Declarables topicBindings() { Queue topicQueue1 = new Queue(topicQueue1Name, false); Queue topicQueue2 = new Queue(topicQueue2Name, false); TopicExchange topicExchange = new TopicExchange(topicExchangeName); return new Declarables( topicQueue1, topicQueue2, topicExchange, BindingBuilder .bind(topicQueue1) .to(topicExchange).with("*.important.*"), BindingBuilder .bind(topicQueue2) .to(topicExchange).with("#.error")); }

Pertukaran topik memungkinkan kita untuk mengikat antrian ke sana dengan pola kunci yang berbeda. Ini sangat fleksibel dan memungkinkan kita untuk mengikat banyak antrian dengan pola yang sama atau bahkan beberapa pola ke antrian yang sama.

Ketika kunci perutean pesan cocok dengan polanya, itu akan ditempatkan dalam antrian. Jika antrean memiliki beberapa binding yang cocok dengan kunci perutean pesan, hanya satu salinan pesan yang ditempatkan di antrean.

Pola penjilidan kami dapat menggunakan tanda bintang ("*") untuk mencocokkan kata di posisi tertentu atau tanda pagar ("#") untuk mencocokkan nol atau lebih kata.

Jadi, topicQueue1 kita akan menerima pesan yang memiliki kunci perutean yang memiliki pola tiga kata dengan kata tengah menjadi "penting" - misalnya: "pengguna.important.error" atau "blog.important.notification".

Dan, topicQueue2 kami akan menerima pesan yang memiliki kunci perutean yang diakhiri dengan kesalahan kata; Contoh yang cocok adalah "error" , "user.important.error" atau "blog.post.save.error".

4. Menyiapkan Produser

Kami akan menggunakan metode convertAndSend dari RabbitTemplate untuk mengirim pesan sampel kami:

 String message = " payload is broadcast"; return args -> { rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, "topic important warn" + message); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, "topic important error" + message); };

The RabbitTemplate menyediakan banyak kelebihan beban convertAndSend () metode untuk jenis pertukaran yang berbeda.

Saat kami mengirim pesan ke pertukaran fanout, kunci perutean diabaikan, dan pesan tersebut diteruskan ke semua antrean terikat.

Saat kami mengirim pesan ke pertukaran topik, kami perlu meneruskan kunci perutean. Berdasarkan kunci perutean ini, pesan akan dikirim ke antrean tertentu.

5. Konfigurasi Konsumen

Terakhir, mari kita siapkan empat konsumen - satu untuk setiap antrian - untuk mengambil pesan yang dihasilkan:

 @RabbitListener(queues = {FANOUT_QUEUE_1_NAME}) public void receiveMessageFromFanout1(String message) { System.out.println("Received fanout 1 message: " + message); } @RabbitListener(queues = {FANOUT_QUEUE_2_NAME}) public void receiveMessageFromFanout2(String message) { System.out.println("Received fanout 2 message: " + message); } @RabbitListener(queues = {TOPIC_QUEUE_1_NAME}) public void receiveMessageFromTopic1(String message) { System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message); } @RabbitListener(queues = {TOPIC_QUEUE_2_NAME}) public void receiveMessageFromTopic2(String message) { System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message); }

Kami mengonfigurasi konsumen menggunakan anotasi @RabbitListener . Satu-satunya argumen yang diberikan di sini adalah nama antrian. Di sini, konsumen tidak mengetahui tentang pertukaran atau kunci perutean.

6. Menjalankan Contoh

Proyek sampel kami adalah aplikasi Spring Boot, sehingga akan menginisialisasi aplikasi bersama dengan koneksi ke RabbitMQ dan mengatur semua antrian, pertukaran, dan binding.

Secara default, aplikasi kita mengharapkan instance RabbitMQ berjalan di localhost pada port 5672. Kita dapat memodifikasi ini dan default lainnya di application.yaml .

Proyek kami mengekspos titik akhir HTTP di URI - / broadcast - yang menerima POST dengan pesan di badan permintaan.

Saat kita mengirim permintaan ke URI ini dengan body "Test", kita akan melihat sesuatu yang mirip dengan ini di output:

Received fanout 1 message: fanout payload is broadcast Received topic 1 (*.important.*) message: topic important warn payload is broadcast Received topic 2 (#.error) message: topic important error payload is broadcast Received fanout 2 message: fanout payload is broadcast Received topic 1 (*.important.*) message: topic important error payload is broadcast

Urutan di mana kita akan melihat pesan-pesan ini, tentu saja, tidak dijamin.

7. Kesimpulan

Dalam tutorial singkat ini, kami membahas fanout dan pertukaran topik dengan Spring AMQP dan RabbitMQ.

Kode sumber lengkap dan semua cuplikan kode untuk tutorial ini tersedia di repositori GitHub.