Tepat Sekali Diproses di Kafka dengan Java

1. Ikhtisar

Dalam tutorial ini, kita akan melihat bagaimana Kafka memastikan pengiriman yang tepat satu kali antara aplikasi produsen dan konsumen melalui Transactional API yang baru diperkenalkan.

Selain itu, kami akan menggunakan API ini untuk menerapkan produsen dan konsumen transaksional untuk mencapai pengiriman ujung ke ujung tepat satu kali dalam contoh WordCount.

2. Pengiriman Pesan di Kafka

Karena berbagai kegagalan, sistem perpesanan tidak dapat menjamin pengiriman pesan antara aplikasi produsen dan konsumen. Bergantung pada bagaimana aplikasi klien berinteraksi dengan sistem tersebut, semantik pesan berikut dimungkinkan:

  • Jika sistem perpesanan tidak akan pernah menduplikasi pesan tetapi mungkin melewatkan pesan sesekali, kami menyebutnya paling banyak sekali
  • Atau, jika pesan tidak akan pernah terlewatkan tetapi mungkin menduplikasi pesan sesekali, kami menyebutnya setidaknya sekali
  • Tapi, jika itu selalu mengirimkan semua pesan tanpa duplikasi, itu persis sekali

Awalnya, Kafka hanya mendukung pengiriman pesan paling banyak sekali dan paling tidak satu kali.

Namun, pengenalan Transaksi antara broker Kafka dan aplikasi klien memastikan pengiriman tepat satu kali di Kafka . Untuk memahaminya dengan lebih baik, mari kita tinjau API klien transaksional dengan cepat.

3. Ketergantungan Maven

Untuk bekerja dengan API transaksi, kami membutuhkan klien Java Kafka di pom kami:

 org.apache.kafka kafka-clients 2.0.0 

4. Sebuah loop konsumsi-transformasi-produksi Transaksional

Sebagai contoh, kami akan menggunakan pesan dari topik masukan, kalimat .

Kemudian untuk setiap kalimat, kita akan menghitung setiap kata dan mengirimkan jumlah kata individu ke topik keluaran, hitungan .

Dalam contoh, kami akan berasumsi bahwa sudah ada data transaksional yang tersedia di topik kalimat .

4.1. Produsen Sadar Transaksi

Jadi mari kita tambahkan dulu produser Kafka yang khas.

Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "localhost:9092");

Selain itu, kita perlu menentukan transactional.id dan mengaktifkan idempotence :

producerProps.put("enable.idempotence", "true"); producerProps.put("transactional.id", "prod-1"); KafkaProducer producer = new KafkaProducer(producerProps);

Karena kami telah mengaktifkan idempotensi, Kafka akan menggunakan id transaksi ini sebagai bagian dari algoritmanya untuk menghapus duplikat pesan apa pun yang dikirim produser ini , memastikan idempotensi.

Sederhananya, jika produser secara tidak sengaja mengirim pesan yang sama ke Kafka lebih dari sekali, pengaturan ini memungkinkannya untuk pemberitahuan.

Yang perlu kita lakukan adalah memastikan ID transaksi berbeda untuk setiap produsen , meskipun konsisten saat dimulai ulang.

4.2. Mengaktifkan Produser untuk Transaksi

Setelah kami siap, maka kami juga perlu memanggil initTransaction untuk mempersiapkan produsen menggunakan transaksi:

producer.initTransactions();

Ini mendaftarkan produsen dengan broker sebagai salah satu yang dapat menggunakan transaksi, mengidentifikasinya dengan transactional.id dan nomor urut, atau epoch . Pada gilirannya, broker akan menggunakan ini untuk menulis tindakan apa pun ke log transaksi.

Dan akibatnya, broker akan menghapus tindakan apa pun dari log yang dimiliki produsen dengan id transaksi yang sama dan periode sebelumnya , dengan anggapan tindakan tersebut berasal dari transaksi yang tidak berfungsi.

4.3. Konsumen yang Sadar Transaksi

Saat kita menggunakan, kita dapat membaca semua pesan pada partisi topik secara berurutan. Padahal, kami dapat menunjukkan dengan isolation.level bahwa kami harus menunggu untuk membaca pesan transaksional sampai transaksi terkait telah dilakukan :

Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "localhost:9092"); consumerProps.put("group.id", "my-group-id"); consumerProps.put("enable.auto.commit", "false"); consumerProps.put("isolation.level", "read_committed"); KafkaConsumer consumer = new KafkaConsumer(consumerProps); consumer.subscribe(singleton(“sentences”));

Menggunakan nilai read_committed memastikan bahwa kita tidak membaca pesan transaksional apa pun sebelum transaksi selesai.

Nilai default isolation.level adalah read_uncommitted.

4.4. Konsumsi dan Transformasi melalui Transaksi

Sekarang kita memiliki produsen dan konsumen yang dikonfigurasi untuk menulis dan membaca secara transaksional, kita dapat menggunakan catatan dari topik masukan kita dan menghitung setiap kata di setiap catatan:

ConsumerRecords records = consumer.poll(ofSeconds(60)); Map wordCountMap = records.records(new TopicPartition("input", 0)) .stream() .flatMap(record -> Stream.of(record.value().split(" "))) .map(word -> Tuple.of(word, 1)) .collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));

Perhatikan, bahwa tidak ada yang transaksional tentang kode di atas. Tetapi, karena kami menggunakan read_committed, itu berarti bahwa tidak ada pesan yang ditulis ke topik input dalam transaksi yang sama akan dibaca oleh konsumen ini sampai semuanya ditulis.

Sekarang, kita dapat mengirim jumlah kata yang dihitung ke topik keluaran.

Mari kita lihat bagaimana kita bisa menghasilkan hasil kita, juga secara transaksional.

4.5. Kirim API

Untuk mengirim hitungan kami sebagai pesan baru, tetapi dalam transaksi yang sama, kami memanggil beginTransaction :

producer.beginTransaction();

Kemudian, kita dapat menulis masing-masing ke topik "hitungan" kita dengan kuncinya adalah kata dan hitungan adalah nilainya:

wordCountMap.forEach((key,value) -> producer.send(new ProducerRecord("counts",key,value.toString())));

Note that because the producer can partition the data by the key, this means that transactional messages can span multiple partitions, each being read by separate consumers. Therefore, Kafka broker will store a list of all updated partitions for a transaction.

Note also that, within a transaction, a producer can use multiple threads to send records in parallel.

4.6. Committing Offsets

And finally, we need to commit our offsets that we just finished consuming. With transactions, we commit the offsets back to the input topic we read them from, like normal. Also though, we send them to the producer's transaction.

We can do all of this in a single call, but we first need to calculate the offsets for each topic partition:

Map offsetsToCommit = new HashMap(); for (TopicPartition partition : records.partitions()) { List
    
      partitionedRecords = records.records(partition); long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1)); }
    

Note that what we commit to the transaction is the upcoming offset, meaning we need to add 1.

Then we can send our calculated offsets to the transaction:

producer.sendOffsetsToTransaction(offsetsToCommit, "my-group-id");

4.7. Committing or Aborting the Transaction

And, finally, we can commit the transaction, which will atomically write the offsets to the consumer_offsets topic as well as to the transaction itself:

producer.commitTransaction();

This flushes any buffered message to the respective partitions. In addition, the Kafka broker makes all messages in that transaction available to the consumers.

Of course, if anything goes wrong while we are processing, for example, if we catch an exception, we can call abortTransaction:

try { // ... read from input topic // ... transform // ... write to output topic producer.commitTransaction(); } catch ( Exception e ) { producer.abortTransaction(); }

And drop any buffered messages and remove the transaction from the broker.

If we neither commit nor abort before the broker-configured max.transaction.timeout.ms, the Kafka broker will abort the transaction itself. The default value for this property is 900,000 milliseconds or 15 minutes.

5. Other consume-transform-produce Loops

What we've just seen is a basic consume-transform-produce loop which reads and writes to the same Kafka cluster.

Conversely, applications that must read and write to different Kafka clusters must use the older commitSync and commitAsync API. Typically, applications will store consumer offsets into their external state storage to maintain transactionality.

6. Conclusion

Untuk aplikasi data-kritis, pemrosesan ujung-ke-ujung tepat sekali seringkali sangat penting.

Dalam tutorial ini, kami melihat bagaimana kami menggunakan Kafka untuk melakukan hal ini, menggunakan transaksi , dan kami menerapkan contoh penghitungan kata berbasis transaksi untuk menggambarkan prinsip tersebut.

Jangan ragu untuk memeriksa semua contoh kode di GitHub.