Pengantar Apache Kafka dengan Spring

Ketekunan teratas

Saya baru saja mengumumkan kursus Learn Spring baru , yang berfokus pada dasar-dasar Spring 5 dan Spring Boot 2:

>> LIHAT KURSUSnya

1. Ikhtisar

Apache Kafka adalah sistem pemrosesan aliran terdistribusi dan toleran terhadap kesalahan.

Dalam artikel ini, kami akan membahas dukungan Spring untuk Kafka dan level abstraksi yang disediakannya di atas API klien Kafka Java asli.

Spring Kafka menghadirkan model pemrograman template Spring yang sederhana dan khas dengan KafkaTemplate dan POJO berbasis pesan melalui anotasi @KafkaListener .

2. Instalasi dan Setup

Untuk mengunduh dan menginstal Kafka, silakan merujuk ke panduan resmi di sini.

Kita juga perlu menambahkan dependensi spring-kafka ke pom.xml kita :

 org.springframework.kafka spring-kafka 2.3.7.RELEASE 

Versi terbaru artefak ini dapat ditemukan di sini.

Aplikasi contoh kami adalah aplikasi Spring Boot.

Artikel ini mengasumsikan bahwa server mulai menggunakan konfigurasi default dan tidak ada port server yang diubah.

3. Konfigurasi Topik

Sebelumnya kami biasa menjalankan alat baris perintah untuk membuat topik di Kafka seperti:

$ bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic mytopic

Tetapi dengan diperkenalkannya AdminClient di Kafka, sekarang kita dapat membuat topik secara terprogram.

Kita perlu menambahkan kacang Musim Semi KafkaAdmin , yang secara otomatis akan menambahkan topik untuk semua kacang jenis NewTopic:

@Configuration public class KafkaTopicConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin() { Map configs = new HashMap(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return new NewTopic("baeldung", 1, (short) 1); } }

4. Memproduksi Pesan

Untuk membuat pesan, pertama-tama, kita perlu mengonfigurasi ProducerFactory yang menetapkan strategi untuk membuat instance Kafka Producer .

Kemudian kita memerlukan KafkaTemplate yang membungkus instance Producer dan menyediakan metode praktis untuk mengirim pesan ke topik Kafka.

Instans produsen aman untuk thread dan karenanya menggunakan satu instance di seluruh konteks aplikasi akan memberikan kinerja yang lebih tinggi. Akibatnya, instans KakfaTemplate juga aman untuk thread dan penggunaan satu instans direkomendasikan.

4.1. Konfigurasi Produsen

@Configuration public class KafkaProducerConfig { @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory(configProps); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate(producerFactory()); } }

4.2. Menerbitkan Pesan

Kami dapat mengirim pesan menggunakan kelas KafkaTemplate :

@Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String msg) { kafkaTemplate.send(topicName, msg); }

The kirim API mengembalikan ListenableFuture objek. Jika kita ingin memblokir thread pengirim dan mendapatkan hasil tentang pesan terkirim, kita bisa memanggil get API dari objek ListenableFuture . Utas akan menunggu hasilnya, tetapi akan memperlambat produsen.

Kafka adalah platform pemrosesan aliran cepat. Jadi, sebaiknya tangani hasil secara asinkron sehingga pesan berikutnya tidak menunggu hasil dari pesan sebelumnya. Kami dapat melakukan ini melalui panggilan balik:

public void sendMessage(String message) { ListenableFuture
    
      future = kafkaTemplate.send(topicName, message); future.addCallback(new ListenableFutureCallback
     
      () { @Override public void onSuccess(SendResult result) { System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]"); } @Override public void onFailure(Throwable ex) { System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); } }); }
     
    

5. Mengkonsumsi Pesan

5.1. Konfigurasi Konsumen

Untuk menggunakan pesan, kita perlu mengkonfigurasi ConsumerFactory dan KafkaListenerContainerFactory . Setelah kacang ini tersedia di pabrik kacang Spring, konsumen berbasis POJO dapat dikonfigurasi menggunakan penjelasan @KafkaListener .

Anotasi @EnableKafka diperlukan pada kelas konfigurasi untuk mengaktifkan deteksi anotasi @KafkaListener pada kacang yang dikelola pegas:

@EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory consumerFactory() { Map props = new HashMap(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put( ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory(props); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); return factory; } }

5.2. Mengkonsumsi Pesan

@KafkaListener(topics = "topicName", groupId = "foo") public void listenGroupFoo(String message) { System.out.println("Received Message in group foo: " + message); }

Beberapa pendengar dapat diterapkan untuk suatu topik , masing-masing dengan ID grup yang berbeda. Selanjutnya, satu konsumen dapat mendengarkan pesan dari berbagai topik:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring juga mendukung pengambilan satu atau beberapa header pesan menggunakan anotasi @Header di pemroses :

@KafkaListener(topics = "topicName") public void listenWithHeaders( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition); }

5.3. Consuming Messages from a Specific Partition

As you may have noticed, we had created the topic baeldung with only one partition. However, for a topic with multiple partitions, a @KafkaListener can explicitly subscribe to a particular partition of a topic with an initial offset:

@KafkaListener( topicPartitions = @TopicPartition(topic = "topicName", partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "0"), @PartitionOffset(partition = "3", initialOffset = "0")}), containerFactory = "partitionsKafkaListenerContainerFactory") public void listenToPartition( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition); }

Since the initialOffset has been sent to 0 in this listener, all the previously consumed messages from partitions 0 and three will be re-consumed every time this listener is initialized. If setting the offset is not required, we can use the partitions property of @TopicPartition annotation to set only the partitions without the offset:

@KafkaListener(topicPartitions = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4. Adding Message Filter for Listeners

Listeners can be configured to consume specific types of messages by adding a custom filter. This can be done by setting a RecordFilterStrategy to the KafkaListenerContainerFactory:

@Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setRecordFilterStrategy( record -> record.value().contains("World")); return factory; }

A listener can then be configured to use this container factory:

@KafkaListener( topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory") public void listenWithFilter(String message) { System.out.println("Received Message in filtered listener: " + message); }

In this listener, all the messages matching the filter will be discarded.

6. Custom Message Converters

So far we have only covered sending and receiving Strings as messages. However, we can also send and receive custom Java objects. This requires configuring appropriate serializer in ProducerFactory and deserializer in ConsumerFactory.

Let's look at a simple bean class, which we will send as messages:

public class Greeting { private String msg; private String name; // standard getters, setters and constructor }

6.1. Producing Custom Messages

In this example, we will use JsonSerializer. Let's look at the code for ProducerFactory and KafkaTemplate:

@Bean public ProducerFactory greetingProducerFactory() { // ... configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory(configProps); } @Bean public KafkaTemplate greetingKafkaTemplate() { return new KafkaTemplate(greetingProducerFactory()); }

This new KafkaTemplate can be used to send the Greeting message:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. Consuming Custom Messages

Similarly, let's modify the ConsumerFactory and KafkaListenerContainerFactory to deserialize the Greeting message correctly:

@Bean public ConsumerFactory greetingConsumerFactory() { // ... return new DefaultKafkaConsumerFactory( props, new StringDeserializer(), new JsonDeserializer(Greeting.class)); } @Bean public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(greetingConsumerFactory()); return factory; }

The spring-kafka JSON serializer and deserializer uses the Jackson library which is also an optional maven dependency for the spring-kafka project. So let's add it to our pom.xml:

 com.fasterxml.jackson.core jackson-databind 2.9.7 

Alih-alih menggunakan versi terbaru Jackson, disarankan untuk menggunakan versi yang ditambahkan ke pom.xml spring-kafka.

Terakhir, kita perlu menulis pendengar untuk menggunakan pesan Salam :

@KafkaListener( topics = "topicName", containerFactory = "greetingKafkaListenerContainerFactory") public void greetingListener(Greeting greeting) { // process greeting message }

7. Kesimpulan

Pada artikel ini, kami membahas dasar-dasar dukungan Spring untuk Apache Kafka. Kami telah melihat sekilas kelas-kelas yang digunakan untuk mengirim dan menerima pesan.

Kode sumber lengkap untuk artikel ini dapat ditemukan di GitHub. Sebelum menjalankan kode, pastikan server Kafka berjalan dan topik dibuat secara manual.

Ketekunan bawah

Saya baru saja mengumumkan kursus Learn Spring baru , yang berfokus pada dasar-dasar Spring 5 dan Spring Boot 2:

>> LIHAT KURSUSnya