Pesan yang Andal dengan JGroups

1. Ikhtisar

JGroups adalah Java API untuk pertukaran pesan yang andal. Ini fitur antarmuka sederhana yang menyediakan:

  • tumpukan protokol yang fleksibel, termasuk TCP dan UDP
  • fragmentasi dan penyusunan ulang pesan besar
  • unicast dan multicast yang andal
  • deteksi kegagalan
  • Alur kontrol

Serta banyak fitur lainnya.

Dalam tutorial ini, kita akan membuat aplikasi sederhana untuk bertukar pesan String antar aplikasi dan menyediakan status bersama ke aplikasi baru saat mereka bergabung dalam jaringan.

2. Penyiapan

2.1. Ketergantungan Maven

Kita perlu menambahkan satu dependensi ke pom.xml kita :

 org.jgroups jgroups 4.0.10.Final  

Versi terbaru perpustakaan dapat diperiksa di Maven Central.

2.2. Jaringan

JGroups akan mencoba menggunakan IPV6 secara default. Bergantung pada konfigurasi sistem kami, ini dapat mengakibatkan aplikasi tidak dapat berkomunikasi.

Untuk menghindari ini, kami akan menyetel java.net.preferIPv4Stack ke properti true saat menjalankan aplikasi kami di sini:

java -Djava.net.preferIPv4Stack=true com.baeldung.jgroups.JGroupsMessenger 

3. JChannels

Koneksi kami ke jaringan JGroups adalah JChannel. Saluran tersebut bergabung dengan cluster dan mengirim serta menerima pesan, serta informasi tentang status jaringan.

3.1. Membuat Channel

Kami membuat JChannel dengan jalur ke file konfigurasi. Jika kita menghilangkan nama file, itu akan mencari udp.xml di direktori kerja saat ini.

Kami akan membuat saluran dengan file konfigurasi bernama secara eksplisit:

JChannel channel = new JChannel("src/main/resources/udp.xml"); 

Konfigurasi JGroups bisa sangat rumit, tetapi konfigurasi UDP dan TCP default sudah cukup untuk sebagian besar aplikasi. Kami telah menyertakan file untuk UDP dalam kode kami dan akan menggunakannya untuk tutorial ini.

Untuk informasi lebih lanjut tentang konfigurasi transport, lihat manual JGroups di sini.

3.2. Menghubungkan Saluran

Setelah kita membuat saluran kita, kita perlu bergabung dengan sebuah cluster. Sebuah cluster adalah sekelompok node yang bertukar pesan.

Bergabung dengan cluster membutuhkan nama cluster:

channel.connect("Baeldung"); 

Node pertama yang mencoba untuk bergabung dengan sebuah cluster akan membuatnya jika tidak ada. Kami akan melihat proses ini beraksi di bawah.

3.3. Menamai Saluran

Node diidentifikasi dengan nama sehingga rekan dapat mengirim pesan terarah dan menerima pemberitahuan tentang siapa yang masuk dan keluar dari cluster. JGroups akan memberikan nama secara otomatis, atau kita dapat mengaturnya sendiri:

channel.name("user1");

Kami akan menggunakan nama-nama ini di bawah, untuk melacak kapan node masuk dan keluar dari cluster.

3.4. Menutup Channel

Pembersihan saluran sangat penting jika kita ingin rekan-rekan menerima pemberitahuan tepat waktu bahwa kita telah keluar.

Kami menutup JChannel dengan metode tutupnya :

channel.close()

4. Perubahan Tampilan Cluster

Dengan JChannel yang dibuat, kami sekarang siap untuk melihat status peer di cluster dan bertukar pesan dengan mereka.

JGroups mempertahankan status cluster di dalam kelas View . Setiap saluran memiliki satu Tampilan jaringan. Saat tampilan berubah, ini dikirim melalui callback viewAccepted () .

Untuk tutorial ini, kami akan memperluas kelas ReceiverAdaptor API yang menerapkan semua metode antarmuka yang diperlukan untuk aplikasi.

Ini adalah cara yang direkomendasikan untuk mengimplementasikan callback.

Mari tambahkan viewAccepted ke aplikasi kita:

public void viewAccepted(View newView) { private View lastView; if (lastView == null) { System.out.println("Received initial view:"); newView.forEach(System.out::println); } else { System.out.println("Received new view."); List newMembers = View.newMembers(lastView, newView); System.out.println("New members: "); newMembers.forEach(System.out::println); List exMembers = View.leftMembers(lastView, newView); System.out.println("Exited members:"); exMembers.forEach(System.out::println); } lastView = newView; } 

Setiap View berisi objek List of Address , yang mewakili setiap anggota cluster. JGroups menawarkan metode praktis untuk membandingkan satu tampilan dengan yang lain, yang kami gunakan untuk mendeteksi anggota baru atau keluar dari cluster.

5. Mengirim Pesan

Penanganan pesan di JGroups sangatlah mudah. Sebuah Pesan berisi byte array dan Alamat benda sesuai dengan pengirim dan penerima.

Untuk tutorial ini kami menggunakan Strings read from the command line, tetapi mudah untuk melihat bagaimana aplikasi dapat bertukar tipe data lainnya.

5.1. Broadcast Messages

A Message is created with a destination and a byte array; JChannel sets the sender for us. If the target is null, the entire cluster will receive the message.

We'll accept text from the command line and send it to the cluster:

System.out.print("Enter a message: "); String line = in.readLine().toLowerCase(); Message message = new Message(null, line.getBytes()); channel.send(message); 

If we run multiple instances of our program and send this message (after we implement the receive() method below), all of them would receive it, including the sender.

5.2. Blocking Our Messages

If we don't want to see our messages, we can set a property for that:

channel.setDiscardOwnMessages(true); 

When we run the previous test, the message sender does not receive its broadcast message.

5.3. Direct Messages

Sending a direct message requires a valid Address. If we're referring to nodes by name, we need a way to look up an Address. Fortunately, we have the View for that.

The current View is always available from the JChannel:

private Optional getAddress(String name) { View view = channel.view(); return view.getMembers().stream() .filter(address -> name.equals(address.toString())) .findAny(); } 

Address names are available via the class toString() method, so we merely search the List of cluster members for the name we want.

So we can accept a name on from the console, find the associated destination, and send a direct message:

Address destination = null; System.out.print("Enter a destination: "); String destinationName = in.readLine().toLowerCase(); destination = getAddress(destinationName) .orElseThrow(() -> new Exception("Destination not found"); Message message = new Message(destination, "Hi there!"); channel.send(message); 

6. Receiving Messages

We can send messages, now let's add try to receive them now.

Let's override ReceiverAdaptor's empty receive method:

public void receive(Message message) { String line = Message received from: " + message.getSrc() + " to: " + message.getDest() + " -> " + message.getObject(); System.out.println(line); } 

Since we know the message contains a String, we can safely pass getObject() to System.out.

7. State Exchange

When a node enters the network, it may need to retrieve state information about the cluster. JGroups provides a state transfer mechanism for this.

When a node joins the cluster, it simply calls getState(). The cluster usually retrieves the state from the oldest member in the group – the coordinator.

Let's add a broadcast message count to our application. We'll add a new member variable and increment it inside receive():

private Integer messageCount = 0; public void receive(Message message) { String line = "Message received from: " + message.getSrc() + " to: " + message.getDest() + " -> " + message.getObject(); System.out.println(line); if (message.getDest() == null) { messageCount++; System.out.println("Message count: " + messageCount); } } 

We check for a null destination because if we count direct messages, each node will have a different number.

Next, we override two more methods in ReceiverAdaptor:

public void setState(InputStream input) { try { messageCount = Util.objectFromStream(new DataInputStream(input)); } catch (Exception e) { System.out.println("Error deserialing state!"); } System.out.println(messageCount + " is the current messagecount."); } public void getState(OutputStream output) throws Exception { Util.objectToStream(messageCount, new DataOutputStream(output)); } 

Similar to messages, JGroups transfers state as an array of bytes.

JGroups supplies an InputStream to the coordinator to write the state to, and an OutputStream for the new node to read. The API provides convenience classes for serializing and deserializing the data.

Note that in production code access to state information must be thread-safe.

Finally, we add the call to getState() to our startup, after we connect to the cluster:

channel.connect(clusterName); channel.getState(null, 0); 

getState() accepts a destination from which to request the state and a timeout in milliseconds. A null destination indicates the coordinator and 0 means do not timeout.

Saat kami menjalankan aplikasi ini dengan sepasang node dan bertukar pesan siaran, kami melihat kenaikan jumlah pesan.

Kemudian jika kita menambahkan klien ketiga atau berhenti dan memulai salah satunya, kita akan melihat node yang baru terhubung mencetak jumlah pesan yang benar.

8. Kesimpulan

Dalam tutorial ini, kami menggunakan JGroups untuk membuat aplikasi untuk bertukar pesan. Kami menggunakan API untuk memantau node mana yang terhubung ke dan meninggalkan cluster dan juga untuk mentransfer status cluster ke node baru ketika bergabung.

Contoh kode, seperti biasa, dapat ditemukan di GitHub.