Memulai Pemrosesan Streaming dengan Spring Cloud Data Flow

1. Perkenalan

Spring Cloud Data Flow adalah pemrograman dan model operasi cloud-native untuk layanan mikro data yang dapat disusun.

Dengan Spring Cloud Data Flow , developer dapat membuat dan mengatur pipeline data untuk kasus penggunaan umum seperti penyerapan data, analitik real-time, dan impor / ekspor data.

Pipeline data ini memiliki dua bentuk, pipeline data streaming dan batch.

Dalam kasus pertama, jumlah data yang tidak terbatas digunakan atau dihasilkan melalui middleware perpesanan. Sementara dalam kasus kedua, tugas berumur pendek memproses sekumpulan data yang terbatas dan kemudian dihentikan.

Artikel ini akan fokus pada pemrosesan streaming.

2. Gambaran Arsitektur

Komponen utama jenis arsitektur ini adalah Aplikasi , Server Aliran Data , dan waktu proses target.

Selain itu, selain komponen utama ini, kami juga biasanya memiliki Data Flow Shell dan perantara pesan di dalam arsitektur.

Mari kita lihat semua komponen ini lebih detail.

2.1. Aplikasi

Biasanya, pipeline data streaming menyertakan peristiwa konsumsi dari sistem eksternal, pemrosesan data, dan persistensi poliglot. Fase ini biasanya disebut sebagai Sumber , Prosesor , dan Sink dalam terminologi Spring Cloud :

  • Sumber: adalah aplikasi yang menggunakan peristiwa
  • Prosesor: menggunakan data dari Sumber , melakukan beberapa pemrosesan padanya, dan memancarkan data yang diproses ke aplikasi berikutnya dalam pipeline
  • Sink: menggunakan Sumber atau Prosesor dan menulis data ke lapisan persistensi yang diinginkan

Aplikasi ini dapat dikemas dalam dua cara:

  • Spring Boot uber-jar yang di-host di repositori maven, file, http atau implementasi sumber daya Spring lainnya (metode ini akan digunakan dalam artikel ini)
  • Buruh pelabuhan

Banyak sumber, prosesor, dan aplikasi sink untuk kasus penggunaan umum (misalnya jdbc, hdfs, http, router) telah disediakan dan siap digunakan oleh tim Spring Cloud Data Flow .

2.2. Runtime

Selain itu, waktu proses diperlukan agar aplikasi ini dapat dijalankan. Runtime yang didukung adalah:

  • Cloud Foundry
  • Apache BENANG
  • Kubernetes
  • Apache Mesos
  • Server Lokal untuk pengembangan (yang akan digunakan dalam artikel ini)

2.3. Server Arus Data

Komponen yang bertanggung jawab untuk menyebarkan aplikasi ke runtime adalah Data Flow Server . Ada jar yang dapat dijalankan Server Arus Data yang disediakan untuk setiap runtime target.

The Data Flow Server bertanggung jawab untuk menafsirkan:

  • Aliran DSL yang menggambarkan aliran logis data melalui beberapa aplikasi.
  • Manifes penerapan yang menjelaskan pemetaan aplikasi ke runtime.

2.4. Shell Arus Data

Data Flow Shell adalah klien untuk Data Flow Server. Shell memungkinkan kita untuk melakukan perintah DSL yang diperlukan untuk berinteraksi dengan server.

Sebagai contoh, DSL untuk menggambarkan aliran data dari sumber http ke sink jdbc akan ditulis sebagai “http | jdbc ”. Nama-nama ini di DSL terdaftar di Data Flow Server dan dipetakan ke artefak aplikasi yang dapat dihosting di repositori Maven atau Docker.

Spring juga menawarkan antarmuka grafis, bernama Flo , untuk membuat dan memantau pipeline data streaming. Namun, penggunaannya di luar pembahasan artikel ini.

2.5. Perantara Pesan

Seperti yang telah kita lihat pada contoh di bagian sebelumnya, kita telah menggunakan simbol pipa ke dalam definisi aliran data. Simbol pipa mewakili komunikasi antara dua aplikasi melalui middleware pengiriman pesan.

Ini berarti bahwa kita membutuhkan perantara pesan yang aktif dan berjalan di lingkungan target.

Dua broker middleware pengiriman pesan yang didukung adalah:

  • Apache Kafka
  • RabbitMQ

Jadi, sekarang kita memiliki gambaran umum tentang komponen arsitektur - saatnya membangun pipeline pemrosesan aliran pertama kita.

3. Pasang Broker Pesan

Seperti yang telah kita lihat, aplikasi di dalam pipeline membutuhkan middleware perpesanan untuk berkomunikasi. Untuk tujuan artikel ini, kita akan menggunakan RabbitMQ .

Untuk detail lengkap penginstalan, Anda dapat mengikuti instruksi di situs resmi.

4. Server Aliran Data Lokal

Untuk mempercepat proses pembuatan aplikasi kami, kami akan menggunakan Spring Initializr; dengan bantuannya, kami dapat memperoleh aplikasi Spring Boot dalam beberapa menit.

Setelah mencapai situs web, cukup pilih Grup dan nama Artefak .

Setelah ini selesai, klik tombol Hasilkan Proyek untuk mulai mengunduh artefak Maven.

Setelah pengunduhan selesai, ekstrak proyek dan impor sebagai proyek Maven dalam IDE pilihan Anda.

Mari tambahkan ketergantungan Maven ke proyek. Karena kita akan membutuhkan pustaka Server Lokal Dataflow , mari tambahkan dependensi spring-cloud-starter-dataflow-server-local:

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 

Sekarang kita perlu membuat anotasi kelas utama Spring Boot dengan anotasi @EnableDataFlowServer :

@EnableDataFlowServer @SpringBootApplication public class SpringDataFlowServerApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowServerApplication.class, args); } } 

Itu saja. Server Aliran Data Lokal kami siap dijalankan:

mvn spring-boot:run

Aplikasi akan boot pada port 9393.

5. Shell Arus Data

Sekali lagi, buka Spring Initializr dan pilih nama Grup dan Artefak .

Setelah kami mengunduh dan mengimpor proyek, mari tambahkan dependensi spring-cloud-dataflow-shell:

 org.springframework.cloud spring-cloud-dataflow-shell 

Sekarang kita perlu menambahkan anotasi @EnableDataFlowShell ke kelas utama Spring Boot :

@EnableDataFlowShell @SpringBootApplication public class SpringDataFlowShellApplication { public static void main(String[] args) { SpringApplication.run(SpringDataFlowShellApplication.class, args); } } 

Sekarang kita dapat menjalankan shell:

mvn spring-boot:run

After the shell is running, we can type the help command in the prompt to see a complete list of command that we can perform.

6. The Source Application

Similarly, on Initializr, we'll now create a simple application and add a Stream Rabbit dependency called spring-cloud-starter-stream-rabbit:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 

We'll then add the @EnableBinding(Source.class) annotation to the Spring Boot main class:

@EnableBinding(Source.class) @SpringBootApplication public class SpringDataFlowTimeSourceApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeSourceApplication.class, args); } }

Now we need to define the source of the data that must be processed. This source could be any potentially endless workload (internet-of-things sensor data, 24/7 event processing, online transaction data ingest).

In our sample application, we produce one event (for simplicity a new timestamp) every 10 seconds with a Poller.

The @InboundChannelAdapter annotation sends a message to the source’s output channel, using the return value as the payload of the message:

@Bean @InboundChannelAdapter( value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1") ) public MessageSource timeMessageSource() { return () -> MessageBuilder.withPayload(new Date().getTime()).build(); } 

Our data source is ready.

7. The Processor Application

Next- we'll create an application and add a Stream Rabbit dependency.

We'll then add the @EnableBinding(Processor.class) annotation to the Spring Boot main class:

@EnableBinding(Processor.class) @SpringBootApplication public class SpringDataFlowTimeProcessorApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeProcessorApplication.class, args); } }

Next, we need to define a method to process the data that coming from the source application.

To define a transformer, we need to annotate this method with @Transformer annotation:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(Long timestamp) { DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy"); String date = dateFormat.format(timestamp); return date; }

It converts a timestamp from the ‘input' channel to a formatted date which will be sent to the ‘output' channel.

8. The Sink Application

The last application to create is the Sink application.

Again, go to the Spring Initializr and choose a Group, an Artifact name. After downloading the project let's add a Stream Rabbit dependency.

Then add the @EnableBinding(Sink.class) annotation to the Spring Boot main class:

@EnableBinding(Sink.class) @SpringBootApplication public class SpringDataFlowLoggingSinkApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowLoggingSinkApplication.class, args); } }

Now we need a method to intercept the messages coming from the processor application.

To do this, we need to add the @StreamListener(Sink.INPUT) annotation to our method:

@StreamListener(Sink.INPUT) public void loggerSink(String date) { logger.info("Received: " + date); }

The method simply prints the timestamp transformed in a formatted date to a log file.

9. Register a Stream App

The Spring Cloud Data Flow Shell allow us to Register a Stream App with the App Registry using the app register command.

We must provide a unique name, application type, and a URI that can be resolved to the app artifact. For the type, specify “source“, “processor“, or “sink“.

When providing a URI with the maven scheme, the format should conform to the following:

maven://:[:[:]]:

To register the Source, Processor and Sink applications previously created , go to the Spring Cloud Data Flow Shell and issue the following commands from the prompt:

app register --name time-source --type source --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT app register --name time-processor --type processor --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT app register --name logging-sink --type sink --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT 

10. Create and Deploy the Stream

To create a new stream definition go to the Spring Cloud Data Flow Shell and execute the following shell command:

stream create --name time-to-log --definition 'time-source | time-processor | logging-sink'

This defines a stream named time-to-log based on the DSL expression ‘time-source | time-processor | logging-sink'.

Then to deploy the stream execute the following shell command:

stream deploy --name time-to-log

The Data Flow Server resolves time-source, time-processor, and logging-sink to maven coordinates and uses those to launch the time-source, time-processor and logging-sink applications of the stream.

If the stream is correctly deployed you’ll see in the Data Flow Server logs that the modules have been started and tied together:

2016-08-24 12:29:10.516 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink 2016-08-24 12:29:17.600 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-processor instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor 2016-08-24 12:29:23.280 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-source instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

11. Reviewing the Result

In this example, the source simply sends the current timestamp as a message each second, the processor format it and the log sink outputs the formatted timestamp using the logging framework.

The log files are located within the directory displayed in the Data Flow Server’s log output, as shown above. To see the result, we can tail the log:

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log 2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01 2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11 2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12. Conclusion

In this article, we have seen how to build a data pipeline for stream processing through the use of Spring Cloud Data Flow.

Selain itu, kami melihat peran aplikasi Sumber , Prosesor , dan Tenggelam di dalam aliran dan cara memasang dan mengikat modul ini di dalam Server Aliran Data melalui penggunaan Data Flow Shell .

Kode contoh dapat ditemukan di proyek GitHub.