Klien MQTT di Java

1. Ikhtisar

Dalam tutorial ini, kita akan melihat bagaimana kita bisa menambahkan pesan MQTT dalam proyek Java menggunakan perpustakaan yang disediakan oleh proyek Eclipse Paho.

2. Primer MQTT

MQTT (MQ Telemetry Transport) adalah protokol pengiriman pesan yang dibuat untuk menjawab kebutuhan akan metode sederhana dan ringan untuk mentransfer data ke / dari perangkat berdaya rendah, seperti yang digunakan dalam aplikasi industri.

Dengan meningkatnya popularitas perangkat IoT (Internet of Things), MQTT telah mengalami peningkatan penggunaan, yang mengarah ke standarisasi oleh OASIS dan ISO.

Protokol mendukung pola pesan tunggal, yaitu pola Publikasikan-Langganan: setiap pesan yang dikirim oleh klien berisi "topik" terkait yang digunakan oleh broker untuk merutekannya ke klien yang berlangganan. Nama topik bisa berupa string sederhana seperti " oiltemp " atau string mirip jalur " motor / 1 / rpm ".

Untuk menerima pesan, klien berlangganan ke satu atau beberapa topik menggunakan nama persisnya atau string yang berisi salah satu karakter pengganti yang didukung ("#" untuk topik multi-level dan "+" untuk satu level ").

3. Pengaturan Proyek

Untuk menyertakan pustaka Paho dalam proyek Maven, kita harus menambahkan ketergantungan berikut:

 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0 

Versi terbaru modul pustaka Java Eclipse Paho dapat diunduh dari Maven Central.

4. Pengaturan Klien

Saat menggunakan perpustakaan Paho, hal pertama yang perlu kita lakukan untuk mengirim dan / atau menerima pesan dari pialang MQTT adalah mendapatkan implementasi antarmuka IMqttClient . Antarmuka ini berisi semua metode yang dibutuhkan oleh aplikasi untuk membuat koneksi ke server, mengirim dan menerima pesan.

Paho keluar dari kotak dengan dua implementasi dari antarmuka ini, yang asinkron ( MqttAsyncClient ) dan yang sinkron ( MqttClient ).Dalam kasus kami, kami akan fokus pada versi sinkron, yang memiliki semantik sederhana.

Penyiapannya sendiri adalah proses dua langkah: pertama kita membuat instance kelas MqttClient dan kemudian kita menghubungkannya ke server kita. Subbagian berikut merinci langkah-langkah tersebut.

4.1. Membuat Instans IMqttClient Baru

Cuplikan kode berikut menunjukkan cara membuat instance sinkron IMqttClient baru :

String publisherId = UUID.randomUUID().toString(); IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);

Dalam kasus ini, kami menggunakan konstruktor paling sederhana yang tersedia, yang mengambil alamat titik akhir dari broker MQTT kami dan pengidentifikasi klien , yang secara unik mengidentifikasi klien kami.

Dalam kasus kami, kami menggunakan UUID acak, sehingga pengidentifikasi klien baru akan dibuat di setiap proses.

Paho juga menyediakan konstruktor tambahan yang dapat kita gunakan untuk menyesuaikan mekanisme persistensi yang digunakan untuk menyimpan pesan yang tidak diakui dan / atau ScheduledExecutorService yang digunakan untuk menjalankan tugas latar belakang yang diperlukan oleh implementasi mesin protokol.

Titik akhir server yang kami gunakan adalah pialang MQTT publik yang dihosting oleh proyek Paho , yang memungkinkan siapa pun yang memiliki koneksi internet untuk menguji klien tanpa memerlukan autentikasi apa pun.

4.2. Menghubungkan ke Server

Instance MqttClient kami yang baru dibuat tidak terhubung ke server. Kami melakukannya dengan memanggil metode connect () , secara opsional meneruskan instance MqttConnectOptions yang memungkinkan kami menyesuaikan beberapa aspek protokol.

Secara khusus, kami dapat menggunakan opsi tersebut untuk menyampaikan informasi tambahan seperti kredensial keamanan, mode pemulihan sesi, mode sambungan ulang, dan sebagainya.

Kelas MqttConnectionOptions mengekspos opsi tersebut sebagai properti sederhana yang dapat kita setel menggunakan metode penyetel normal. Kita hanya perlu mengatur properti yang diperlukan untuk skenario kita - sisanya akan menggunakan nilai default.

Kode yang digunakan untuk membuat koneksi ke server biasanya terlihat seperti ini:

MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(true); options.setCleanSession(true); options.setConnectionTimeout(10); publisher.connect(options);

Di sini, kami menentukan opsi koneksi kami sehingga:

  • Perpustakaan akan secara otomatis mencoba menyambung kembali ke server jika terjadi kegagalan jaringan
  • Ini akan membuang pesan yang tidak terkirim dari proses sebelumnya
  • Batas waktu koneksi disetel ke 10 detik

5. Mengirim Pesan

Mengirim pesan menggunakan MqttClient yang sudah terhubung sangat mudah. Kami menggunakan salah satu varian metode publish () untuk mengirim payload, yang selalu berupa larik byte, ke topik tertentu , menggunakan salah satu opsi kualitas layanan berikut:

  • 0 - semantik "paling banyak sekali", juga dikenal sebagai "api-dan-lupakan". Gunakan opsi ini jika kehilangan pesan dapat diterima, karena tidak memerlukan pengakuan atau ketekunan apa pun
  • 1 - semantik "setidaknya sekali". Gunakan opsi ini jika kehilangan pesan tidak dapat diterima dan pelanggan Anda dapat menangani duplikasi
  • 2 - semantik "tepat sekali". Gunakan opsi ini jika kehilangan pesan tidak dapat diterima dan pelanggan Anda tidak dapat menangani duplikat

Dalam proyek sampel kami, kelas EngineTemperatureSensor memainkan peran sebagai sensor tiruan yang menghasilkan pembacaan suhu baru setiap kali kami memanggil metode call () .

Kelas ini mengimplementasikan antarmuka Callable sehingga kita dapat dengan mudah menggunakannya dengan salah satu implementasi ExecutorService yang tersedia dalam paket java.util.concurrent :

public class EngineTemperatureSensor implements Callable { // ... private members omitted public EngineTemperatureSensor(IMqttClient client) { this.client = client; } @Override public Void call() throws Exception { if ( !client.isConnected()) { return null; } MqttMessage msg = readEngineTemp(); msg.setQos(0); msg.setRetained(true); client.publish(TOPIC,msg); return null; } private MqttMessage readEngineTemp() { double temp = 80 + rnd.nextDouble() * 20.0; byte[] payload = String.format("T:%04.2f",temp) .getBytes(); return new MqttMessage(payload); } }

The MqttMessage merangkum muatan itu sendiri, meminta Kualitas-of-Service dan juga yang ditahan bendera untuk pesan. Bendera ini menunjukkan kepada pialang bahwa ia harus menyimpan pesan ini sampai dikonsumsi oleh pelanggan.

We can use this feature to implement a “last known good” behavior, so when a new subscriber connects to the server, it will receive the retained message right away.

6. Receiving Messages

In order to receive messages from the MQTT broker, we need to use one of the subscribe() method variants, which allow us to specify:

  • One or more topic filters for messages we want to receive
  • The associated QoS
  • The callback handler to process received messages

In the following example, we show how to add a message listener to an existing IMqttClient instance to receive messages from a given topic. We use a CountDownLatch as a synchronization mechanism between our callback and the main execution thread, decrementing it every time a new message arrives.

In the sample code, we've used a different IMqttClient instance to receive messages. We did it just to make more clear which client does what, but this is not a Paho limitation – if you want, you can use the same client for publishing and receiving messages:

CountDownLatch receivedSignal = new CountDownLatch(10); subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { byte[] payload = msg.getPayload(); // ... payload handling omitted receivedSignal.countDown(); }); receivedSignal.await(1, TimeUnit.MINUTES);

The subscribe() variant used above takes an IMqttMessageListener instance as its second argument.

In our case, we use a simple lambda function that processes the payload and decrements a counter. If not enough messages arrive in the specified time window (1 minute), the await() method will throw an exception.

When using Paho, we don't need to explicitly acknowledge message receipt. If the callback returns normally, Paho assumes it a successful consumption and sends an acknowledgment to the server.

If the callback throws an Exception, the client will be shut down. Please note that this will result in loss of any messages sent with QoS level of 0.

Messages sent with QoS level 1 or 2 will be resent by the server once the client is reconnected and subscribes to the topic again.

7. Conclusion

In this article, we demonstrated how we can add support for the MQTT protocol in our Java applications using the library provided by the Eclipse Paho project.

Pustaka ini menangani semua detail protokol tingkat rendah, memungkinkan kami untuk fokus pada aspek lain dari solusi kami, sambil menyisakan ruang yang cukup untuk menyesuaikan aspek penting dari fitur internalnya, seperti persistensi pesan.

Kode yang ditampilkan dalam artikel ini tersedia di GitHub.