Membangun Pipeline Data dengan Kafka, Spark Streaming, dan Cassandra

1. Ikhtisar

Apache Kafka adalah platform latensi rendah yang dapat diskalakan, berkinerja tinggi, dan memungkinkan membaca dan menulis aliran data seperti sistem perpesanan . Kita bisa mulai dengan Kafka di Jawa dengan cukup mudah.

Spark Streaming adalah bagian dari platform Apache Spark yang memungkinkan pemrosesan aliran data yang dapat diskalakan, throughput tinggi, dan toleran terhadap kesalahan . Meskipun ditulis dalam Scala, Spark menawarkan Java API untuk digunakan.

Apache Cassandra adalah penyimpanan data NoSQL kolom luas dan terdistribusi . Detail lebih lanjut tentang Cassandra tersedia di artikel kami sebelumnya.

Dalam tutorial ini, kami akan menggabungkan ini untuk membuat pipeline data yang sangat skalabel dan toleran terhadap kesalahan untuk aliran data real-time .

2. Instalasi

Untuk memulai, kita perlu Kafka, Spark dan Cassandra diinstal secara lokal di mesin kita untuk menjalankan aplikasi. Kita akan melihat cara mengembangkan pipeline data menggunakan platform ini seiring berjalannya waktu.

Namun, kami akan membiarkan semua konfigurasi default termasuk port untuk semua instalasi yang akan membantu menjalankan tutorial dengan lancar.

2.1. Kafka

Menginstal Kafka di komputer lokal kami cukup mudah dan dapat ditemukan sebagai bagian dari dokumentasi resmi. Kami akan menggunakan rilis 2.1.0 Kafka.

Selain itu, Kafka membutuhkan Apache Zookeeper untuk dijalankan, tetapi untuk tujuan tutorial ini, kami akan memanfaatkan instans Zookeeper node tunggal yang dikemas dengan Kafka.

Setelah kami berhasil memulai Zookeeper dan Kafka secara lokal dengan mengikuti panduan resmi, kami dapat melanjutkan untuk membuat topik kami, bernama "pesan":

 $KAFKA_HOME$\bin\windows\kafka-topics.bat --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic messages

Perhatikan bahwa skrip di atas adalah untuk platform Windows, tetapi ada juga skrip serupa yang tersedia untuk platform mirip Unix.

2.2. Percikan

Spark menggunakan pustaka klien Hadoop untuk HDFS dan YARN. Akibatnya, akan sangat sulit untuk menyusun versi yang kompatibel dari semua ini . Namun, unduhan resmi Spark sudah dikemas sebelumnya dengan versi populer Hadoop. Untuk tutorial ini, kami akan menggunakan paket versi 2.3.0 "yang dibuat sebelumnya untuk Apache Hadoop 2.7 dan yang lebih baru".

Setelah paket Spark yang tepat dibuka, skrip yang tersedia dapat digunakan untuk mengirimkan aplikasi. Kita akan melihatnya nanti saat kita mengembangkan aplikasi kita di Spring Boot.

2.3. Cassandra

DataStax menyediakan edisi komunitas Cassandra untuk berbagai platform termasuk Windows. Kami dapat mengunduh dan menginstal ini di mesin lokal kami dengan sangat mudah mengikuti dokumentasi resmi. Kami akan menggunakan versi 3.9.0.

Setelah kami berhasil menginstal dan memulai Cassandra di mesin lokal kami, kami dapat melanjutkan untuk membuat ruang kunci dan tabel kami. Ini dapat dilakukan menggunakan CQL Shell yang disertakan dengan instalasi kami:

CREATE KEYSPACE vocabulary WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; USE vocabulary; CREATE TABLE words (word text PRIMARY KEY, count int);

Perhatikan bahwa kami telah membuat namespace yang disebut kosakata dan tabel di dalamnya yang disebut kata dengan dua kolom, kata , dan hitungan .

3. Ketergantungan

Kita dapat mengintegrasikan dependensi Kafka dan Spark ke dalam aplikasi kita melalui Maven. Kami akan menarik dependensi ini dari Maven Central:

  • Core Spark
  • SQL Spark
  • Streaming Spark
  • Streaming Kafka Spark
  • Cassandra Spark
  • Cassandra Java Spark

Dan kami dapat menambahkannya ke pom kami sesuai:

 org.apache.spark spark-core_2.11 2.3.0 provided   org.apache.spark spark-sql_2.11 2.3.0 provided   org.apache.spark spark-streaming_2.11 2.3.0 provided   org.apache.spark spark-streaming-kafka-0-10_2.11 2.3.0   com.datastax.spark spark-cassandra-connector_2.11 2.3.0   com.datastax.spark spark-cassandra-connector-java_2.11 1.5.2 

Perhatikan bahwa beberapa dependensi ini ditandai sebagai disediakan dalam cakupan. Ini karena ini akan disediakan oleh instalasi Spark di mana kami akan mengirimkan aplikasi untuk dieksekusi menggunakan spark-submit.

4. Spark Streaming - Strategi Integrasi Kafka

Pada poin ini, ada baiknya berbicara secara singkat tentang strategi integrasi untuk Spark dan Kafka.

Kafka introduced new consumer API between versions 0.8 and 0.10. Hence, the corresponding Spark Streaming packages are available for both the broker versions. It's important to choose the right package depending upon the broker available and features desired.

4.1. Spark Streaming Kafka 0.8

The 0.8 version is the stable integration API with options of using the Receiver-based or the Direct Approach. We'll not go into the details of these approaches which we can find in the official documentation. An important point to note here is that this package is compatible with Kafka Broker versions 0.8.2.1 or higher.

4.2. Spark Streaming Kafka 0.10

This is currently in an experimental state and is compatible with Kafka Broker versions 0.10.0 or higher only. This package offers the Direct Approach only, now making use of the new Kafka consumer API. We can find more details about this in the official documentation. Importantly, it is not backward compatible with older Kafka Broker versions.

Please note that for this tutorial, we'll make use of the 0.10 package. The dependency mentioned in the previous section refers to this only.

5. Developing a Data Pipeline

We'll create a simple application in Java using Spark which will integrate with the Kafka topic we created earlier. The application will read the messages as posted and count the frequency of words in every message. This will then be updated in the Cassandra table we created earlier.

Let's quickly visualize how the data will flow:

5.1. Getting JavaStreamingContext

Firstly, we'll begin by initializing the JavaStreamingContext which is the entry point for all Spark Streaming applications:

SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("WordCountingApp"); sparkConf.set("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = new JavaStreamingContext( sparkConf, Durations.seconds(1));

5.2. Getting DStream from Kafka

Now, we can connect to the Kafka topic from the JavaStreamingContext:

Map kafkaParams = new HashMap(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection topics = Arrays.asList("messages"); JavaInputDStream
    
      messages = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams));
    

Please note that we've to provide deserializers for key and value here. For common data types like String, the deserializer is available by default. However, if we wish to retrieve custom data types, we'll have to provide custom deserializers.

Here, we've obtained JavaInputDStream which is an implementation of Discretized Streams or DStreams, the basic abstraction provided by Spark Streaming. Internally DStreams is nothing but a continuous series of RDDs.

5.3. Processing Obtained DStream

We'll now perform a series of operations on the JavaInputDStream to obtain word frequencies in the messages:

JavaPairDStream results = messages .mapToPair( record -> new Tuple2(record.key(), record.value()) ); JavaDStream lines = results .map( tuple2 -> tuple2._2() ); JavaDStream words = lines .flatMap( x -> Arrays.asList(x.split("\\s+")).iterator() ); JavaPairDStream wordCounts = words .mapToPair( s -> new Tuple2(s, 1) ).reduceByKey( (i1, i2) -> i1 + i2 );

5.4. Persisting Processed DStream into Cassandra

Finally, we can iterate over the processed JavaPairDStream to insert them into our Cassandra table:

wordCounts.foreachRDD( javaRdd -> { Map wordCountMap = javaRdd.collectAsMap(); for (String key : wordCountMap.keySet()) { List wordList = Arrays.asList(new Word(key, wordCountMap.get(key))); JavaRDD rdd = streamingContext.sparkContext().parallelize(wordList); javaFunctions(rdd).writerBuilder( "vocabulary", "words", mapToRow(Word.class)).saveToCassandra(); } } );

5.5. Running the Application

As this is a stream processing application, we would want to keep this running:

streamingContext.start(); streamingContext.awaitTermination();

6. Leveraging Checkpoints

In a stream processing application, it's often useful to retain state between batches of data being processed.

For example, in our previous attempt, we are only able to store the current frequency of the words. What if we want to store the cumulative frequency instead? Spark Streaming makes it possible through a concept called checkpoints.

We'll now modify the pipeline we created earlier to leverage checkpoints:

Please note that we'll be using checkpoints only for the session of data processing. This does not provide fault-tolerance. However, checkpointing can be used for fault tolerance as well.

There are a few changes we'll have to make in our application to leverage checkpoints. This includes providing the JavaStreamingContext with a checkpoint location:

streamingContext.checkpoint("./.checkpoint");

Here, we are using the local filesystem to store checkpoints. However, for robustness, this should be stored in a location like HDFS, S3 or Kafka. More on this is available in the official documentation.

Next, we'll have to fetch the checkpoint and create a cumulative count of words while processing every partition using a mapping function:

JavaMapWithStateDStream
    
      cumulativeWordCounts = wordCounts .mapWithState( StateSpec.function( (word, one, state) -> { int sum = one.orElse(0) + (state.exists() ? state.get() : 0); Tuple2 output = new Tuple2(word, sum); state.update(sum); return output; } ) );
    

Once we get the cumulative word counts, we can proceed to iterate and save them in Cassandra as before.

Please note that while data checkpointing is useful for stateful processing, it comes with a latency cost. Hence, it's necessary to use this wisely along with an optimal checkpointing interval.

7. Understanding Offsets

If we recall some of the Kafka parameters we set earlier:

kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false);

These basically mean that we don't want to auto-commit for the offset and would like to pick the latest offset every time a consumer group is initialized. Consequently, our application will only be able to consume messages posted during the period it is running.

If we want to consume all messages posted irrespective of whether the application was running or not and also want to keep track of the messages already posted, we'll have to configure the offset appropriately along with saving the offset state, though this is a bit out of scope for this tutorial.

This is also a way in which Spark Streaming offers a particular level of guarantee like “exactly once”. This basically means that each message posted on Kafka topic will only be processed exactly once by Spark Streaming.

8. Deploying Application

We can deploy our application using the Spark-submit script which comes pre-packed with the Spark installation:

$SPARK_HOME$\bin\spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local[2] \target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Harap dicatat bahwa toples yang kami buat menggunakan Maven harus berisi dependensi yang tidak ditandai sebagai disediakan dalam cakupan.

Setelah kami mengirimkan aplikasi ini dan memposting beberapa pesan di topik Kafka yang kami buat sebelumnya, kami akan melihat jumlah kata kumulatif yang diposting di tabel Cassandra yang kami buat sebelumnya.

9. Kesimpulan

Singkatnya, dalam tutorial ini, kita belajar cara membuat pipeline data sederhana menggunakan Kafka, Spark Streaming, dan Cassandra. Kami juga mempelajari cara memanfaatkan checkpoint di Spark Streaming untuk mempertahankan status antar batch.

Seperti biasa, kode untuk contoh tersedia di GitHub.