Pengantar Apache Flink dengan Java

1. Ikhtisar

Apache Flink adalah kerangka kerja pemrosesan Big Data yang memungkinkan pemrogram memproses data dalam jumlah besar dengan cara yang sangat efisien dan skalabel.

Pada artikel ini, kami akan memperkenalkan beberapa konsep API inti dan transformasi data standar yang tersedia di API Java Apache Flink . Gaya fasih dari API ini membuatnya mudah untuk bekerja dengan konstruksi sentral Flink - koleksi terdistribusi.

Pertama, kita akan melihat transformasi API DataSet Flink dan menggunakannya untuk mengimplementasikan program penghitungan kata. Kemudian kita akan melihat sekilas DataStream API Flink , yang memungkinkan Anda memproses aliran peristiwa secara real-time.

2. Ketergantungan Maven

Untuk memulai, kita perlu menambahkan dependensi Maven ke pustaka flink-java dan flink-test-utils :

 org.apache.flink flink-java 1.2.0   org.apache.flink flink-test-utils_2.10 1.2.0 test 

3. Konsep Inti API

Saat bekerja dengan Flink, kita perlu mengetahui beberapa hal yang berhubungan dengan API-nya:

  • Setiap program Flink melakukan transformasi pada kumpulan data terdistribusi. Berbagai fungsi untuk mengubah data disediakan, termasuk pemfilteran, pemetaan, penggabungan, pengelompokan, dan agregasi
  • Sebuah wastafel operasi di Flink memicu eksekusi sungai untuk menghasilkan hasil yang diinginkan dari program ini , seperti menyimpan hasilnya ke sistem file atau mencetaknya ke standard output
  • Transformasi Flink bersifat malas, artinya tidak akan dieksekusi hingga operasi sink dijalankan
  • Apache Flink API mendukung dua mode operasi - batch dan real-time. Jika Anda berurusan dengan sumber data terbatas yang dapat diproses dalam mode batch, Anda akan menggunakan API DataSet . Jika Anda ingin memproses aliran data tanpa batas secara real-time, Anda perlu menggunakan DataStream API

4. Transformasi API DataSet

Titik masuk ke program Flink adalah turunan dari kelas ExecutionEnvironment - ini menentukan konteks di mana program dijalankan.

Mari buat ExecutionEnvironment untuk memulai pemrosesan kita:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Perhatikan bahwa ketika Anda meluncurkan aplikasi di komputer lokal, itu akan melakukan pemrosesan di JVM lokal. Jika Anda ingin mulai memproses pada sekelompok mesin, Anda perlu menginstal Apache Flink pada mesin tersebut dan mengkonfigurasi ExecutionEnvironment yang sesuai.

4.1. Membuat DataSet

Untuk mulai melakukan transformasi data, kita perlu menyediakan program kita dengan data.

Mari buat instance kelas DataSet menggunakan ExecutionEnvironement kami :

DataSet amounts = env.fromElements(1, 29, 40, 50);

Anda dapat membuat Kumpulan Data dari berbagai sumber, seperti Apache Kafka, CSV, file, atau hampir semua sumber data lainnya.

4.2. Filter dan Kurangi

Setelah Anda membuat instance dari kelas DataSet , Anda dapat menerapkan transformasi padanya.

Misalkan Anda ingin memfilter angka yang berada di atas ambang tertentu dan selanjutnya menjumlahkan semuanya . Anda bisa menggunakan transformasi filter () dan reduce () untuk mencapai ini:

int threshold = 30; List collect = amounts .filter(a -> a > threshold) .reduce((integer, t1) -> integer + t1) .collect(); assertThat(collect.get(0)).isEqualTo(90); 

Perhatikan bahwa metode collect () adalah operasi sink yang memicu transformasi data sebenarnya.

4.3. Peta

Misalkan Anda memiliki objek DataSet of Person :

private static class Person { private int age; private String name; // standard constructors/getters/setters }

Selanjutnya, mari buat Kumpulan Data dari objek-objek ini:

DataSet personDataSource = env.fromCollection( Arrays.asList( new Person(23, "Tom"), new Person(75, "Michael")));

Misalkan Anda ingin mengekstrak hanya bidang usia dari setiap objek koleksi. Anda bisa menggunakan transformasi map () untuk mendapatkan hanya bidang tertentu dari kelas Person :

List ages = personDataSource .map(p -> p.age) .collect(); assertThat(ages).hasSize(2); assertThat(ages).contains(23, 75);

4.4. Ikuti

Jika Anda memiliki dua dataset, Anda mungkin ingin menggabungkannya di beberapa field id . Untuk ini, Anda dapat menggunakan transformasi join () .

Mari buat kumpulan transaksi dan alamat pengguna:

Tuple3 address = new Tuple3(1, "5th Avenue", "London"); DataSet
    
      addresses = env.fromElements(address); Tuple2 firstTransaction = new Tuple2(1, "Transaction_1"); DataSet
     
       transactions = env.fromElements(firstTransaction, new Tuple2(12, "Transaction_2")); 
     
    

Bidang pertama di kedua tupel adalah tipe Integer , dan ini adalah bidang id tempat kami ingin menggabungkan kedua kumpulan data.

Untuk melakukan logika penggabungan sebenarnya, kita perlu mengimplementasikan antarmuka KeySelector untuk alamat dan transaksi:

private static class IdKeySelectorTransaction implements KeySelector
    
      { @Override public Integer getKey(Tuple2 value) { return value.f0; } } private static class IdKeySelectorAddress implements KeySelector
     
       { @Override public Integer getKey(Tuple3 value) { return value.f0; } }
     
    

Setiap selektor hanya mengembalikan bidang tempat gabungan harus dilakukan.

Sayangnya, ekspresi lambda tidak dapat digunakan di sini karena Flink memerlukan info tipe umum.

Selanjutnya, mari kita terapkan logika penggabungan menggunakan pemilih tersebut:

List
    
     > joined = transactions.join(addresses) .where(new IdKeySelectorTransaction()) .equalTo(new IdKeySelectorAddress()) .collect(); assertThat(joined).hasSize(1); assertThat(joined).contains(new Tuple2(firstTransaction, address)); 
    

4.5. Menyortir

Katakanlah Anda memiliki koleksi Tuple2 berikut :

Tuple2 secondPerson = new Tuple2(4, "Tom"); Tuple2 thirdPerson = new Tuple2(5, "Scott"); Tuple2 fourthPerson = new Tuple2(200, "Michael"); Tuple2 firstPerson = new Tuple2(1, "Jack"); DataSet
    
      transactions = env.fromElements( fourthPerson, secondPerson, thirdPerson, firstPerson); 
    

Jika Anda ingin mengurutkan koleksi ini berdasarkan bidang pertama tupel, Anda dapat menggunakan transformasi sortPartitions () :

List
    
      sorted = transactions .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING) .collect(); assertThat(sorted) .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);
    

5. Jumlah Kata

Masalah jumlah kata adalah masalah yang umum digunakan untuk menunjukkan kemampuan kerangka kerja pemrosesan Big Data. Solusi dasar melibatkan penghitungan kemunculan kata dalam input teks. Mari gunakan Flink untuk mengimplementasikan solusi untuk masalah ini.

Sebagai langkah pertama dalam solusi kami, kami membuat kelas LineSplitter yang membagi masukan kami menjadi token (kata), mengumpulkan Tuple2 pasangan nilai kunci untuk setiap token . Di masing-masing tupel ini, kuncinya adalah kata yang ditemukan dalam teks, dan nilainya adalah bilangan bulat satu (1).

This class implements the FlatMapFunction interface that takes String as an input and produces a Tuple2:

public class LineSplitter implements FlatMapFunction
    
      { @Override public void flatMap(String value, Collector
     
       out) { Stream.of(value.toLowerCase().split("\\W+")) .filter(t -> t.length() > 0) .forEach(token -> out.collect(new Tuple2(token, 1))); } }
     
    

We call the collect() method on the Collector class to push data forward in the processing pipeline.

Our next and final step is to group the tuples by their first elements (words) and then perform a sum aggregate on the second elements to produce a count of the word occurrences:

public static DataSet
    
      startWordCount( ExecutionEnvironment env, List lines) throws Exception { DataSet text = env.fromCollection(lines); return text.flatMap(new LineSplitter()) .groupBy(0) .aggregate(Aggregations.SUM, 1); }
    

We are using three types of the Flink transformations: flatMap(), groupBy(), and aggregate().

Let's write a test to assert that the word count implementation is working as expected:

List lines = Arrays.asList( "This is a first sentence", "This is a second sentence with a one word"); DataSet
    
      result = WordCount.startWordCount(env, lines); List
     
       collect = result.collect(); assertThat(collect).containsExactlyInAnyOrder( new Tuple2("a", 3), new Tuple2("sentence", 2), new Tuple2("word", 1), new Tuple2("is", 2), new Tuple2("this", 2), new Tuple2("second", 1), new Tuple2("first", 1), new Tuple2("with", 1), new Tuple2("one", 1));
     
    

6. DataStream API

6.1. Creating a DataStream

Apache Flink also supports the processing of streams of events through its DataStream API. If we want to start consuming events, we first need to use the StreamExecutionEnvironment class:

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

Next, we can create a stream of events using the executionEnvironment from a variety of sources. It could be some message bus like Apache Kafka, but in this example, we will simply create a source from a couple of string elements:

DataStream dataStream = executionEnvironment.fromElements( "This is a first sentence", "This is a second sentence with a one word");

We can apply transformations to every element of the DataStream like in the normal DataSet class:

SingleOutputStreamOperator upperCase = text.map(String::toUpperCase);

To trigger the execution, we need to invoke a sink operation such as print() that will just print the result of transformations to the standard output, following with the execute() method on the StreamExecutionEnvironment class:

upperCase.print(); env.execute();

It will produce the following output:

1> THIS IS A FIRST SENTENCE 2> THIS IS A SECOND SENTENCE WITH A ONE WORD

6.2. Windowing of Events

When processing a stream of events in real time, you may sometimes need to group events together and apply some computation on a window of those events.

Suppose we have a stream of events, where each event is a pair consisting of the event number and the timestamp when the event was sent to our system, and that we can tolerate events that are out-of-order but only if they are no more than twenty seconds late.

For this example, let's first create a stream simulating two events that are several minutes apart and define a timestamp extractor that specifies our lateness threshold:

SingleOutputStreamOperator
    
      windowed = env.fromElements( new Tuple2(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()), new Tuple2(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond())) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor 
     
      (Time.seconds(20)) { @Override public long extractTimestamp(Tuple2 element) { return element.f1 * 1000; } });
     
    

Next, let's define a window operation to group our events into five-second windows and apply a transformation on those events:

SingleOutputStreamOperator
    
      reduced = windowed .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .maxBy(0, true); reduced.print();
    

It will get the last element of every five-second window, so it prints out:

1> (15,1491221519)

Note that we do not see the second event because it arrived later than the specified lateness threshold.

7. Conclusion

In this article, we introduced the Apache Flink framework and looked at some of the transformations supplied with its API.

We implemented a word count program using Flink's fluent and functional DataSet API. Then we looked at the DataStream API and implemented a simple real-time transformation on a stream of events.

Penerapan semua contoh dan cuplikan kode ini dapat ditemukan di GitHub - ini adalah proyek Maven, jadi semestinya mudah untuk mengimpor dan menjalankannya apa adanya.