Pengantar KafkaStreams di Java

1. Ikhtisar

Pada artikel ini, kita akan melihat perpustakaan KafkaStreams .

KafkaStreams direkayasa oleh pencipta Apache Kafka . Tujuan utama dari perangkat lunak ini adalah untuk memungkinkan pemrogram membuat aplikasi streaming yang efisien, real-time, yang dapat berfungsi sebagai Microservices.

KafkaStreams memungkinkan kita untuk menggunakan dari topik Kafka, menganalisis atau mengubah data, dan berpotensi, mengirimkannya ke topik Kafka lainnya.

Untuk mendemonstrasikan KafkaStreams, kami akan membuat aplikasi sederhana yang membaca kalimat dari suatu topik, menghitung kemunculan kata dan mencetak hitungan per kata.

Penting untuk diperhatikan bahwa pustaka KafkaStreams tidak reaktif dan tidak mendukung operasi asinkron dan penanganan tekanan balik.

2. Ketergantungan Maven

Untuk mulai menulis logika pemrosesan Stream menggunakan KafkaStreams, kita perlu menambahkan dependensi ke kafka-streams dan kafka-clients :

 org.apache.kafka kafka-streams 1.0.0   org.apache.kafka kafka-clients 1.0.0  

Kita juga perlu menginstal Apache Kafka dan memulai karena kita akan menggunakan topik Kafka. Topik ini akan menjadi sumber data untuk pekerjaan streaming kami.

Kami dapat mengunduh Kafka dan dependensi lain yang diperlukan dari situs resmi.

3. Mengonfigurasi Input KafkaStreams

Hal pertama yang akan kita lakukan adalah mendefinisikan topik masukan Kafka.

Kita dapat menggunakan alat Confluent yang telah kita unduh - alat ini berisi Server Kafka. Ini juga berisi produser konsol-kafka yang dapat kita gunakan untuk mempublikasikan pesan ke Kafka.

Untuk memulai, mari jalankan cluster Kafka kami:

./confluent start

Setelah Kafka dimulai, kita dapat menentukan sumber data dan nama aplikasi kita menggunakan APPLICATION_ID_CONFIG :

String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties(); streamsConfiguration.put( StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");

Parameter konfigurasi penting adalah BOOTSTRAP_SERVER_CONFIG. Ini adalah URL ke instance Kafka lokal kami yang baru saja kami mulai:

private String bootstrapServers = "localhost:9092"; streamsConfiguration.put( StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

Selanjutnya, kita perlu meneruskan jenis kunci dan nilai pesan yang akan dikonsumsi dari inputTopic:

streamsConfiguration.put( StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put( StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

Pemrosesan aliran sering kali stateful. Saat kita ingin menyimpan hasil antara, kita perlu menentukan parameter STATE_DIR_CONFIG .

Dalam pengujian kami, kami menggunakan sistem file lokal:

streamsConfiguration.put( StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); 

4. Membangun Topologi Streaming

Setelah kami menentukan topik masukan kami, kami dapat membuat Topologi Streaming - itu adalah definisi tentang bagaimana acara harus ditangani dan diubah.

Dalam contoh kami, kami ingin menerapkan penghitung kata. Untuk setiap kalimat yang dikirim ke inputTopic, kami ingin membaginya menjadi kata-kata dan menghitung kemunculan setiap kata.

Kita dapat menggunakan sebuah instance dari kelas KStreamsBuilder untuk mulai membangun topologi kita:

KStreamBuilder builder = new KStreamBuilder(); KStream textLines = builder.stream(inputTopic); Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) .groupBy((key, word) -> word) .count();

Untuk menerapkan jumlah kata, pertama-tama, kita perlu membagi nilai menggunakan ekspresi reguler.

Metode split mengembalikan array. Kami menggunakan flatMapValues ​​() untuk meratakannya. Jika tidak, kami akan berakhir dengan daftar array, dan akan merepotkan untuk menulis kode menggunakan struktur seperti itu.

Akhirnya, kami menggabungkan nilai untuk setiap kata dan memanggil count () yang akan menghitung kemunculan kata tertentu.

5. Penanganan Hasil

Kami sudah menghitung jumlah kata dari pesan masukan kami. Sekarang mari kita mencetak hasil pada output standar menggunakan metode foreach () :

wordCounts .foreach((w, c) -> System.out.println("word: " + w + " -> " + c));

Pada produksi, seringkali pekerjaan streaming seperti itu dapat mempublikasikan output ke topik Kafka lainnya.

Kita bisa melakukan ini menggunakan metode to ():

String outputTopic = "outputTopic"; Serde stringSerde = Serdes.String(); Serde longSerde = Serdes.Long(); wordCounts.to(stringSerde, longSerde, outputTopic);

Kelas Serde memberi kita serializers yang telah dikonfigurasi sebelumnya untuk jenis Java yang akan digunakan untuk membuat serial objek menjadi array byte. Array byte kemudian akan dikirim ke topik Kafka.

Kami menggunakan String sebagai kunci topik kami dan Long sebagai nilai untuk hitungan aktual. Metode to () akan menyimpan data yang dihasilkan ke outputTopic .

6. Memulai Pekerjaan KafkaStream

Sampai saat ini, kami membangun topologi yang dapat dieksekusi. Namun, pekerjaan itu belum dimulai.

Kita perlu memulai pekerjaan kita secara eksplisit dengan memanggil metode start () pada instance KafkaStreams :

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); Thread.sleep(30000); streams.close();

Perhatikan bahwa kami menunggu 30 detik hingga pekerjaan selesai. Dalam skenario dunia nyata, pekerjaan itu akan berjalan sepanjang waktu, memproses peristiwa dari Kafka saat mereka tiba.

Kami dapat menguji pekerjaan kami dengan menerbitkan beberapa acara ke topik Kafka kami.

Mari kita mulai produser konsol-kafka dan secara manual mengirim beberapa acara ke inputTopic kami :

./kafka-console-producer --topic inputTopic --broker-list localhost:9092 >"this is a pony" >"this is a horse and pony" 

Dengan cara ini, kami menerbitkan dua acara ke Kafka. Aplikasi kita akan menggunakan event tersebut dan akan mencetak output berikut:

word: -> 1 word: this -> 1 word: is -> 1 word: a -> 1 word: pony -> 1 word: -> 2 word: this -> 2 word: is -> 2 word: a -> 2 word: horse -> 1 word: and -> 1 word: pony -> 2

Kita dapat melihat bahwa ketika pesan pertama sampai, kata pony hanya muncul sekali. Tetapi ketika kami mengirim pesan kedua, kata pony terjadi untuk pencetakan kedua kalinya: “ word: pony -> 2 ″ .

6. Kesimpulan

Artikel ini membahas cara membuat aplikasi pemrosesan aliran utama menggunakan Apache Kafka sebagai sumber data dan pustaka KafkaStreams sebagai pustaka pemrosesan aliran.

Semua contoh dan cuplikan kode ini dapat ditemukan di proyek GitHub - ini adalah proyek Maven, jadi semestinya mudah untuk mengimpor dan menjalankannya apa adanya.