Panduan untuk Akka Streams

1. Ikhtisar

Pada artikel ini, kita akan melihat perpustakaan aliran akka yang dibangun di atas kerangka aktor Akka, yang mengikuti manifesto aliran reaktif. Akka Streams API memungkinkan kita dengan mudah menyusun aliran transformasi data dari langkah-langkah independen.

Selain itu, semua pemrosesan dilakukan dengan cara reaktif, non-pemblokiran, dan asinkron.

2. Ketergantungan Maven

Untuk memulai, kita perlu menambahkan pustaka akka-stream dan akka-stream-testkit ke pom.xml kita :

 com.typesafe.akka akka-stream_2.11 2.5.2   com.typesafe.akka akka-stream-testkit_2.11 2.5.2 

3. API Aliran Akka

Untuk bekerja dengan Akka Streams, kita perlu mengetahui konsep inti API:

  • Sumber - titik masuk ke pemrosesan di pustaka akka-stream - kita dapat membuat instance kelas ini dari berbagai sumber; misalnya, kita dapat menggunakan metode single () jika kita ingin membuat Sumber dari satu String , atau kita dapat membuat Sumber dari Iterable elemen
  • Flow - blok penyusun pemrosesan utama - setiapinstance Flow memiliki satu input dan satu nilai output
  • Materializer - kita dapat menggunakannya jika kita ingin Flow kita memiliki beberapa efek samping seperti logging atau menyimpan hasil ; paling umum, kami akan meneruskanalias NotUsed sebagai Materializer untuk menunjukkan bahwa Flow kamiseharusnya tidak memiliki efek samping
  • Operasi sink - ketika kita membangun Flow, itu tidak dijalankan sampai kita akan mendaftarkan operasi Sink padanya - ini adalah operasi terminal yang memicu semua komputasi di seluruh Flow

4. Membuat Arus di Aliran Akka

Mari kita mulai dengan membangun contoh sederhana, di mana kami akan menunjukkan cara untuk membuat dan menggabungkan beberapa Arus s - untuk memproses aliran bilangan bulat dan menghitung jendela bergerak rata-rata pasangan bilangan bulat dari sungai.

Kami akan mengurai String bilangan bulat yang dipisahkan titik koma sebagai input untuk membuat Sumber akka-stream kami sebagai contoh.

4.1. Menggunakan Arus untuk Mengurai Input

Pertama, mari buat kelas DataImporter yang akan mengambil instance ActorSystem yang akan kita gunakan nanti untuk membuat Flow :

public class DataImporter { private ActorSystem actorSystem; // standard constructors, getters... }

Berikutnya, mari kita membuat parseLine metode yang akan menghasilkan Daftar dari Integer dari input delimited kami String. Perlu diingat bahwa kami menggunakan Java Stream API di sini hanya untuk parsing:

private List parseLine(String line) { String[] fields = line.split(";"); return Arrays.stream(fields) .map(Integer::parseInt) .collect(Collectors.toList()); }

Flow awal kami akan menerapkan parseLine ke input kami untuk membuat Flow dengan tipe input String dan tipe output Integer :

private Flow parseContent() { return Flow.of(String.class) .mapConcat(this::parseLine); }

Ketika kita memanggil metode parseLine () , kompilator tahu bahwa argumen ke fungsi lambda itu akan menjadi String - sama dengan tipe masukan ke Flow kita .

Perhatikan bahwa kita menggunakan mapConcat () metode - setara dengan Java 8 flatMap () metode - karena kita ingin meratakan Daftar dari Integer dikembalikan oleh parseLine () menjadi Arus dari Integer sehingga langkah-langkah selanjutnya dalam pengolahan kami tidak perlu untuk menangani Daftar .

4.2. Menggunakan Arus untuk Melakukan Perhitungan

Pada titik ini, kami memiliki Arus bilangan bulat yang diurai. Sekarang, kita perlu mengimplementasikan logika yang akan mengelompokkan semua elemen input menjadi pasangan dan menghitung rata-rata pasangan tersebut .

Sekarang, kita akan membuat Arus dari Integer s dan kelompok mereka menggunakan dikelompokkan () metode .

Selanjutnya, kami ingin menghitung rata-rata.

Karena kita tidak tertarik dalam urutan di mana mereka rata-rata akan diproses, kita dapat memiliki rata-rata dihitung secara paralel menggunakan beberapa thread dengan menggunakan mapAsyncUnordered () metode , melewati jumlah benang sebagai argumen untuk metode ini.

Tindakan yang akan diteruskan sebagai lambda ke Flow perlu mengembalikan CompletableFuture karena tindakan tersebut akan dihitung secara asinkron di utas terpisah:

private Flow computeAverage() { return Flow.of(Integer.class) .grouped(2) .mapAsyncUnordered(8, integers -> CompletableFuture.supplyAsync(() -> integers.stream() .mapToDouble(v -> v) .average() .orElse(-1.0))); }

Kami menghitung rata-rata dalam delapan utas paralel. Perhatikan bahwa kami menggunakan Java 8 Stream API untuk menghitung rata-rata.

4.3. Menyusun Beberapa Aliran menjadi Aliran Tunggal

The Aliran API adalah abstraksi fasih yang memungkinkan kita untuk menulis beberapa Arus contoh untuk mencapai tujuan pemrosesan akhir kami . Kita dapat memiliki aliran granular di mana satu, misalnya, sedang mengurai JSON, yang lain melakukan beberapa transformasi, dan yang lainnya sedang mengumpulkan beberapa statistik.

Perincian seperti itu akan membantu kami membuat kode yang lebih dapat diuji karena kami dapat menguji setiap langkah pemrosesan secara independen.

Kami membuat dua aliran di atas yang dapat bekerja secara independen satu sama lain. Sekarang, kami ingin menyusunnya bersama.

Pertama, kami ingin mengurai String input kami , dan selanjutnya, kami ingin menghitung rata-rata pada aliran elemen.

Kita bisa menyusun arus kita menggunakan metode via () :

Flow calculateAverage() { return Flow.of(String.class) .via(parseContent()) .via(computeAverage()); }

Kami membuat Arus yang memiliki tipe input String dan dua aliran lainnya setelahnya. The parseContent () Arus mengambil String input dan mengembalikan sebuah Integer sebagai output. The computeAverage () Arus mengambil bahwa Integer dan menghitung kembali rata-rata ganda sebagai jenis output.

5. Menambahkan Sink ke Arus

Seperti yang kami sebutkan, hingga titik ini seluruh Arus belum dijalankan karena malas. Untuk memulai eksekusi Flow kita perlu mendefinisikan Sink . The Sink operasi dapat, misalnya, menyimpan data ke dalam database, atau hasil kirim ke beberapa layanan web eksternal.

Misalkan kita memiliki kelas AverageRepository dengan metode save () berikut yang menulis hasil ke database kita:

CompletionStage save(Double average) { return CompletableFuture.supplyAsync(() -> { // write to database return average; }); }

Sekarang, kami ingin membuat operasi Sink yang menggunakan metode ini untuk menyimpan hasil pemrosesan Flow kami . Untuk membuat Tenggelam kita , pertama-tama kita perlu membuat Arus yang mengambil hasil dari pemrosesan kita sebagai tipe masukan . Selanjutnya, kami ingin menyimpan semua hasil kami ke database.

Sekali lagi, kita tidak peduli tentang pengurutan elemen, jadi kita bisa melakukan operasi save () secara paralel menggunakan metode mapAsyncUnordered () .

Untuk membuat Sink dari Flow kita perlu memanggil toMat () dengan Sink.ignore () sebagai argumen pertama dan Keep.right () sebagai argumen kedua karena kita ingin mengembalikan status pemrosesan:

private Sink
    
      storeAverages() { return Flow.of(Double.class) .mapAsyncUnordered(4, averageRepository::save) .toMat(Sink.ignore(), Keep.right()); }
    

6. Mendefinisikan Sumber untuk Arus

Hal terakhir yang perlu kita lakukan adalah membuat Source dari input String . Kita bisa menerapkan Arus countAverage () ke sumber ini menggunakan metode via () .

Kemudian, untuk menambahkan Sink ke pemrosesan, kita perlu memanggil metode runWith () dan meneruskan storeAverages () Sink yang baru saja kita buat:

CompletionStage calculateAverageForContent(String content) { return Source.single(content) .via(calculateAverage()) .runWith(storeAverages(), ActorMaterializer.create(actorSystem)) .whenComplete((d, e) -> { if (d != null) { System.out.println("Import finished "); } else { e.printStackTrace(); } }); }

Note that when the processing is finished we are adding the whenComplete() callback, in which we can perform some action depending on the outcome of the processing.

7. Testing Akka Streams

We can test our processing using the akka-stream-testkit.

The best way to test the actual logic of the processing is to test all Flow logic and use TestSink to trigger the computation and assert on the results.

In our test, we are creating the Flow that we want to test, and next, we are creating a Source from the test input content:

@Test public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() { // given Flow tested = new DataImporter(actorSystem).calculateAverage(); String input = "1;9;11;0"; // when Source flow = Source.single(input).via(tested); // then flow .runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem)) .request(4) .expectNextUnordered(5d, 5.5); }

We are checking that we are expecting four input arguments, and two results that are averages can arrive in any order because our processing is done in the asynchronous and parallel way.

8. Conclusion

In this article, we were looking at the akka-stream library.

We defined a process that combines multiple Flows to calculate moving average of elements. Then, we defined a Source that is an entry point of the stream processing and a Sink that triggers the actual processing.

Finally, we wrote a test for our processing using the akka-stream-testkit.

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.