ETL dengan Spring Cloud Data Flow

1. Ikhtisar

Spring Cloud Data Flow adalah toolkit cloud-native untuk membangun pipeline data dan proses batch secara real-time. Spring Cloud Data Flow siap digunakan untuk berbagai kasus penggunaan pemrosesan data seperti impor / ekspor sederhana, pemrosesan ETL, streaming peristiwa, dan analitik prediktif.

Dalam tutorial ini, kita akan mempelajari contoh Extract Transform and Load (ETL) real-time menggunakan pipeline aliran yang mengekstrak data dari database JDBC, mengubahnya menjadi POJO sederhana dan memuatnya ke dalam MongoDB.

2. ETL dan Pemrosesan Aliran Acara

ETL - mengekstrak, mengubah, dan memuat - biasanya disebut sebagai proses yang memuat data secara batch dari beberapa database dan sistem ke dalam gudang data umum. Di gudang data ini, dimungkinkan untuk melakukan pemrosesan analisis data yang berat tanpa mengorbankan kinerja sistem secara keseluruhan.

Namun, tren baru mengubah cara melakukannya. ETL masih berperan dalam mentransfer data ke gudang data dan danau data.

Saat ini, hal ini dapat dilakukan dengan streaming dalam arsitektur aliran peristiwa dengan bantuan Spring Cloud Data Flow .

3. Aliran Data Spring Cloud

Dengan Spring Cloud Data Flow (SCDF), developer dapat membuat pipeline data dalam dua cara:

  • Aplikasi streaming real-time berumur panjang menggunakan Spring Cloud Stream
  • Aplikasi tugas batch berumur pendek menggunakan Spring Cloud Task

Dalam artikel ini, kami akan membahas yang pertama, aplikasi streaming berumur panjang berdasarkan Spring Cloud Stream.

3.1. Aplikasi Spring Cloud Stream

Pipeline SCDF Stream terdiri dari langkah-langkah, di mana setiap langkah merupakan aplikasi yang dibangun dengan gaya Spring Boot menggunakan kerangka kerja mikro Spring Cloud Stream. Aplikasi ini terintegrasi dengan middleware pengiriman pesan seperti Apache Kafka atau RabbitMQ.

Aplikasi ini diklasifikasikan menjadi sumber, prosesor, dan sink. Dibandingkan dengan proses ETL, kita dapat mengatakan bahwa sumbernya adalah “ekstrak”, prosesor adalah “transformator” dan sink adalah bagian “beban”.

Dalam beberapa kasus, kita dapat menggunakan starter aplikasi dalam satu atau beberapa langkah pipeline. Artinya, kita tidak perlu mengimplementasikan aplikasi baru untuk suatu langkah, melainkan, konfigurasikan starter aplikasi yang sudah ada yang sudah diterapkan.

Daftar permulaan aplikasi dapat ditemukan di sini.

3.2. Server Aliran Data Cloud Musim Semi

Bagian terakhir dari arsitektur adalah Spring Cloud Data Flow Server . Server SCDF melakukan penerapan aplikasi dan aliran pipa menggunakan Spesifikasi Penyebar Cloud Spring. Spesifikasi ini mendukung ragam asli cloud SCDF dengan menerapkan ke berbagai runtime modern, seperti Kubernetes, Apache Mesos, Yarn, dan Cloud Foundry.

Selain itu, kami dapat menjalankan streaming sebagai penerapan lokal.

Informasi lebih lanjut tentang arsitektur SCDF dapat ditemukan di sini.

4. Pengaturan Lingkungan

Sebelum kita mulai, kita perlu memilih bagian dari penerapan kompleks ini . Bagian pertama yang harus didefinisikan adalah SCDF Server.

Untuk pengujian, kami akan menggunakan SCDF Server Local untuk pengembangan lokal . Untuk penerapan produksi, nanti kita dapat memilih runtime cloud-native, seperti SCDF Server Kubernetes. Kami dapat menemukan daftar runtime server di sini.

Sekarang, mari kita periksa persyaratan sistem untuk menjalankan server ini.

4.1. Persyaratan sistem

Untuk menjalankan Server SCDF, kita harus menentukan dan menyiapkan dua dependensi:

  • middleware olahpesan, dan
  • RDBMS.

Untuk middleware perpesanan, kami akan bekerja dengan RabbitMQ, dan kami memilih PostgreSQL sebagai RDBMS untuk menyimpan definisi aliran pipeline kami.

Untuk menjalankan RabbitMQ, unduh versi terbaru di sini dan mulai instans RabbitMQ menggunakan konfigurasi default atau jalankan perintah Docker berikut:

docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

Sebagai langkah penyiapan terakhir, instal dan jalankan PostgreSQL RDBMS pada port default 5432. Setelah ini, buat database tempat SCDF dapat menyimpan definisi alirannya menggunakan skrip berikut:

CREATE DATABASE dataflow;

4.2. Spring Cloud Data Flow Server Lokal

Untuk menjalankan SCDF Server Local, kita bisa memilih untuk memulai server menggunakan docker-compose , atau kita bisa memulainya sebagai aplikasi Java.

Di sini, kami akan menjalankan SCDF Server Local sebagai aplikasi Java. Untuk mengkonfigurasi aplikasi, kita harus mendefinisikan konfigurasi sebagai parameter aplikasi Java. Kita membutuhkan Java 8 di jalur Sistem.

Untuk meng-host jars dan dependensi, kita perlu membuat folder home untuk SCDF Server kita dan mendownload distribusi SCDF Server Local ke dalam folder ini. Anda dapat mengunduh distribusi SCDF Server Lokal terbaru di sini.

Juga, kita perlu membuat folder lib dan meletakkan driver JDBC di sana. Versi terbaru dari driver PostgreSQL tersedia di sini.

Terakhir, jalankan server lokal SCDF:

$java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \ --spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \ --spring.datasource.username=postgres_username \ --spring.datasource.password=postgres_password \ --spring.datasource.driver-class-name=org.postgresql.Driver \ --spring.rabbitmq.host=127.0.0.1 \ --spring.rabbitmq.port=5672 \ --spring.rabbitmq.username=guest \ --spring.rabbitmq.password=guest

Kami dapat memeriksa apakah itu berjalan dengan melihat URL ini:

// localhost: 9393 / dasbor

4.3. Spring Cloud Data Flow Shell

SCDF Shell adalah alat baris perintah yang memudahkan untuk membuat dan menerapkan aplikasi dan pipeline kami . Perintah Shell ini dijalankan melalui REST API Server Cloud Data Flow Spring.

Unduh versi terbaru dari jar ke folder utama SCDF Anda, tersedia di sini. Setelah selesai, jalankan perintah berikut (perbarui versi sesuai kebutuhan):

$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar ____ ____ _ __ / ___| _ __ _ __(_)_ __ __ _ / ___| | ___ _ _ __| | \___ \| '_ \| '__| | '_ \ / _` | | | | |/ _ \| | | |/ _` | ___) | |_) | | | | | | | (_| | | |___| | (_) | |_| | (_| | |____/| .__/|_| |_|_| |_|\__, | \____|_|\___/ \__,_|\__,_| ____ |_| _ __|___/ __________ | _ \ __ _| |_ __ _ | ___| | _____ __ \ \ \ \ \ \ | | | |/ _` | __/ _` | | |_ | |/ _ \ \ /\ / / \ \ \ \ \ \ | |_| | (_| | || (_| | | _| | | (_) \ V V / / / / / / / |____/ \__,_|\__\__,_| |_| |_|\___/ \_/\_/ /_/_/_/_/_/ Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help". dataflow:>

Jika bukannya " dataflow:>" Anda mendapatkan " server-tidak dikenal:>" di baris terakhir, Anda tidak menjalankan Server SCDF di localhost. Dalam kasus ini, jalankan perintah berikut untuk menyambung ke host lain:

server-unknown:>dataflow config server //{host}

Sekarang, Shell terhubung ke Server SCDF, dan kami dapat menjalankan perintah kami.

Hal pertama yang perlu kita lakukan di Shell adalah mengimpor permulaan aplikasi. Temukan versi terbaru di sini untuk RabbitMQ + Maven di Spring Boot 2.0.x, dan jalankan perintah berikut (sekali lagi, perbarui versinya, di sini " Darwin-SR1 ", sesuai kebutuhan):

$ dataflow:>app import --uri //bit.ly/Darwin-SR1-stream-applications-rabbit-maven

Untuk memeriksa aplikasi yang diinstal, jalankan perintah Shell berikut:

$ dataflow:> app list

Hasilnya, kita akan melihat tabel yang berisi semua aplikasi yang diinstal.

Selain itu, SCDF menawarkan antarmuka grafis, bernama Flo , yang dapat diakses dengan alamat ini: // localhost: 9393 / dashboard . Namun, penggunaannya tidak termasuk dalam cakupan artikel ini.

5. Menyusun Pipa ETL

Sekarang mari kita buat pipa aliran kita. Untuk melakukan ini, kami akan menggunakan starter aplikasi Sumber JDBC untuk mengekstrak informasi dari database relasional kami.

Selain itu, kita akan membuat prosesor kustom untuk mengubah struktur informasi dan sink kustom untuk memuat data kita ke dalam MongoDB.

5.1. Ekstrak - Mempersiapkan Basis Data Relasional untuk Ekstraksi

Mari buat database dengan nama crm dan tabel dengan nama pelanggan :

CREATE DATABASE crm;
CREATE TABLE customer ( id bigint NOT NULL, imported boolean DEFAULT false, customer_name character varying(50), PRIMARY KEY(id) )

Perhatikan bahwa kami menggunakan bendera yang diimpor , yang akan menyimpan rekaman mana yang telah diimpor. Kami juga dapat menyimpan informasi ini di tabel lain, jika perlu.

Sekarang, mari masukkan beberapa data:

INSERT INTO customer(id, customer_name, imported) VALUES (1, 'John Doe', false);

5.2. Transform - Memetakan Bidang JDBC ke Struktur Bidang MongoDB

Untuk langkah transformasi, kita akan melakukan terjemahan sederhana dari field customer_name dari tabel sumber, ke nama field baru . Transformasi lain dapat dilakukan di sini, tetapi mari kita pertahankan contoh singkatnya.

To do this, we'll create a new project with the name customer-transform. The easiest way to do this is by using the Spring Initializr site to create the project. After reaching the website, choose a Group and an Artifact name. We'll use com.customer and customer-transform, respectively.

Once this is done, click on the button “Generate Project” to download the project. Then, unzip the project and import it into your favorite IDE, and add the following dependency to the pom.xml:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 

Now we're set to start coding the field name conversion. To do this, we'll create the Customer class to act as an adapter. This class will receive the customer_name via the setName() method and will output its value via getName method.

The @JsonProperty annotations will do the transformation while deserializing from JSON to Java:

public class Customer { private Long id; private String name; @JsonProperty("customer_name") public void setName(String name) { this.name = name; } @JsonProperty("name") public String getName() { return name; } // Getters and Setters }

The processor needs to receive data from an input, do the transformation and bind the outcome to an output channel. Let's create a class to do this:

import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.integration.annotation.Transformer; @EnableBinding(Processor.class) public class CustomerProcessorConfiguration { @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Customer convertToPojo(Customer payload) { return payload; } }

In the above code, we can observe that the transformation occurs automatically. The input receives the data as JSON and Jackson deserialize it into a Customer object using the set methods.

The opposite is for the output, the data is serialized to JSON using the get methods.

5.3. Load – Sink in MongoDB

Similarly to the transform step, we'll create another maven project, now with the name customer-mongodb-sink. Again, access the Spring Initializr, for the Group choose com.customer, and for the Artifact choose customer-mongodb-sink. Then, type MongoDB in the dependencies search box and download the project.

Next, unzip and import it to your favorite IDE.

Then, add the same extra dependency as in the customer-transform project.

Now we'll create another Customer class, for receiving input in this step:

import org.springframework.data.mongodb.core.mapping.Document; @Document(collection="customer") public class Customer { private Long id; private String name; // Getters and Setters }

For sinking the Customer, we'll create a Listener class that will save the customer entity using the CustomerRepository:

@EnableBinding(Sink.class) public class CustomerListener { @Autowired private CustomerRepository repository; @StreamListener(Sink.INPUT) public void save(Customer customer) { repository.save(customer); } }

And the CustomerRepository, in this case, is a MongoRepository from Spring Data:

import org.springframework.data.mongodb.repository.MongoRepository; import org.springframework.stereotype.Repository; @Repository public interface CustomerRepository extends MongoRepository { } 

5.4. Stream Definition

Now, both custom applications are ready to be registered on SCDF Server. To accomplish this, compile both projects using the Maven command mvn install.

We then register them using the Spring Cloud Data Flow Shell:

app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT
app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT

Finally, let's check if the applications are stored at SCDF, run the application list command in the shell:

app list

As a result, we should see both applications in the resulting table.

5.4.1. Stream Pipeline Domain-Specific Language – DSL

A DSL defines the configuration and data flow between the applications. The SCDF DSL is simple. In the first word, we define the name of the application, followed by the configurations.

Also, the syntax is a Unix-inspired Pipeline syntax, that uses vertical bars, also known as “pipes”, to connect multiple applications:

http --port=8181 | log

This creates an HTTP application served in port 8181 which sends any received body payload to a log.

Now, let's see how to create the DSL stream definition of the JDBC Source.

5.4.2. JDBC Source Stream Definition

The key configurations for the JDBC Source are query and update.query will select unread records while update will change a flag to prevent the current records from being reread.

Also, we'll define the JDBC Source to poll in a fixed delay of 30 seconds and polling maximum 1000 rows. Finally, we'll define the configurations of connection, like driver, username, password and connection URL:

jdbc  --query='SELECT id, customer_name FROM public.customer WHERE imported = false' --update='UPDATE public.customer SET imported = true WHERE id in (:id)' --max-rows-per-poll=1000 --fixed-delay=30 --time-unit=SECONDS --driver-class-name=org.postgresql.Driver --url=jdbc:postgresql://localhost:5432/crm --username=postgres --password=postgres

More JDBC Source configuration properties can be found here.

5.4.3. Customer MongoDB Sink Stream Definition

As we didn't define the connection configurations in application.properties of customer-mongodb-sink, we'll configure through DSL parameters.

Our application is fully based on the MongoDataAutoConfiguration. You can check out the other possible configurations here. Basically, we'll define the spring.data.mongodb.uri:

customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main

5.4.4. Create and Deploy the Stream

First, to create the final stream definition, go back to the Shell and execute the following command (without line breaks, they have just been inserted for readability):

stream create --name jdbc-to-mongodb --definition "jdbc --query='SELECT id, customer_name FROM public.customer WHERE imported=false' --fixed-delay=30 --max-rows-per-poll=1000 --update='UPDATE customer SET imported=true WHERE id in (:id)' --time-unit=SECONDS --password=postgres --driver-class-name=org.postgresql.Driver --username=postgres --url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main" 

This stream DSL defines a stream named jdbc-to-mongodb. Next, we'll deploy the stream by its name:

stream deploy --name jdbc-to-mongodb 

Finally, we should see the locations of all available logs in the log output:

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6. Conclusion

In this article, we've seen a full example of an ETL data pipeline using Spring Cloud Data Flow.

Yang paling penting, kami melihat konfigurasi starter aplikasi, membuat pipeline streaming ETL menggunakan Spring Cloud Data Flow Shell, dan mengimplementasikan aplikasi kustom untuk membaca, mengubah, dan menulis data.

Seperti biasa, kode contoh dapat ditemukan di proyek GitHub.