Panduan untuk Fork / Join Framework di Java

1. Ikhtisar

Kerangka kerja fork / join disajikan di Java 7. Ini menyediakan alat untuk membantu mempercepat pemrosesan paralel dengan mencoba menggunakan semua inti prosesor yang tersedia - yang dicapai melalui pendekatan divide and conquer .

Dalam praktiknya, ini berarti bahwa framework pertama "garpu" , secara rekursif memecah tugas menjadi subtugas independen yang lebih kecil hingga cukup sederhana untuk dieksekusi secara asinkron.

Setelah itu, bagian "gabung" dimulai , di mana hasil dari semua subtugas secara rekursif digabungkan menjadi satu hasil, atau dalam kasus tugas yang mengembalikan void, program hanya menunggu sampai setiap subtugas dijalankan.

Untuk menyediakan eksekusi paralel yang efektif, kerangka kerja fork / join menggunakan kumpulan utas yang disebut ForkJoinPool , yang mengelola utas pekerja berjenis ForkJoinWorkerThread .

2. ForkJoinPool

The ForkJoinPool adalah jantung dari kerangka. Ini adalah implementasi ExecutorService yang mengelola utas pekerja dan memberi kami alat untuk mendapatkan informasi tentang status dan kinerja kumpulan utas.

Utas pekerja hanya dapat menjalankan satu tugas dalam satu waktu, tetapi ForkJoinPool tidak membuat utas terpisah untuk setiap subtugas. Sebagai gantinya, setiap utas di kumpulan memiliki antrean berujung ganda (atau deque, dek yang diucapkan ) yang menyimpan tugas.

Arsitektur ini sangat penting untuk menyeimbangkan beban kerja thread dengan bantuan algoritme pencurian kerja.

2.1. Algoritma Pencurian Pekerjaan

Sederhananya - utas bebas mencoba "mencuri" pekerjaan dari deretan utas sibuk.

Secara default, thread pekerja mendapatkan tugas dari head deque-nya sendiri. Jika kosong, utas mengambil tugas dari ekor deque utas sibuk lain atau dari antrian entri global, karena di sinilah bagian terbesar dari pekerjaan kemungkinan besar ditempatkan.

Pendekatan ini meminimalkan kemungkinan utas bersaing untuk tugas. Ini juga mengurangi berapa kali utas harus mencari pekerjaan, karena ini bekerja pada bagian terbesar dari pekerjaan yang tersedia terlebih dahulu.

2.2. Instansiasi ForkJoinPool

Di Java 8, cara paling mudah untuk mendapatkan akses ke instance ForkJoinPool adalah dengan menggunakan metode statis commonPool (). Seperti namanya, ini akan memberikan referensi ke kumpulan umum, yang merupakan kumpulan utas default untuk setiap ForkJoinTask .

Menurut dokumentasi Oracle, menggunakan kumpulan umum yang telah ditentukan sebelumnya mengurangi konsumsi sumber daya, karena ini mencegah pembuatan kumpulan utas terpisah per tugas.

ForkJoinPool commonPool = ForkJoinPool.commonPool();

Perilaku yang sama dapat dicapai di Java 7 dengan membuat ForkJoinPool dan menetapkannya ke bidang statis publik dari kelas utilitas:

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

Sekarang dapat dengan mudah diakses:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

Dengan konstruktor ForkJoinPool , dimungkinkan untuk membuat kumpulan utas khusus dengan tingkat paralelisme tertentu, pabrik utas, dan penangan pengecualian. Pada contoh di atas, pool memiliki tingkat paralelisme 2. Ini berarti bahwa pool akan menggunakan 2 inti prosesor.

3. ForkJoinTask

ForkJoinTask adalah tipe dasar untuk tugas yang dijalankan di dalam ForkJoinPool. Dalam praktiknya, salah satu dari dua subclassnya harus diperpanjang: RecursiveAction untuk tugas void dan RecursiveTask untuk tugas yang mengembalikan nilai.Keduanya memiliki metode abstrak compute () di mana logika tugas ditentukan.

3.1. RecursiveAction - Contoh

Pada contoh di bawah ini, unit kerja yang akan diproses diwakili oleh String yang disebut beban kerja . Untuk tujuan demonstrasi, tugas ini adalah tugas yang tidak masuk akal: ia hanya memasukkan masukan huruf besar dan mencatatnya.

Untuk mendemonstrasikan perilaku forking dari kerangka kerja, contoh membagi tugas jika beban kerja .length () lebih besar dari ambang batas yang ditentukanmenggunakan metode createSubtask () .

String dibagi secara rekursif menjadi beberapa substring, membuat instance CustomRecursiveTask yang didasarkan pada substring ini.

Akibatnya, metode ini mengembalikan Daftar.

Daftar ini dikirimkan ke ForkJoinPool menggunakan metode invokeAll () :

public class CustomRecursiveAction extends RecursiveAction { private String workload = ""; private static final int THRESHOLD = 4; private static Logger logger = Logger.getAnonymousLogger(); public CustomRecursiveAction(String workload) { this.workload = workload; } @Override protected void compute() { if (workload.length() > THRESHOLD) { ForkJoinTask.invokeAll(createSubtasks()); } else { processing(workload); } } private List createSubtasks() { List subtasks = new ArrayList(); String partOne = workload.substring(0, workload.length() / 2); String partTwo = workload.substring(workload.length() / 2, workload.length()); subtasks.add(new CustomRecursiveAction(partOne)); subtasks.add(new CustomRecursiveAction(partTwo)); return subtasks; } private void processing(String work) { String result = work.toUpperCase(); logger.info("This result - (" + result + ") - was processed by " + Thread.currentThread().getName()); } }

Pola ini dapat digunakan untuk mengembangkan kelas RecursiveAction Anda sendiri . Untuk melakukan ini, buat objek yang mewakili jumlah total pekerjaan, pilih ambang batas yang sesuai, tentukan metode untuk membagi pekerjaan, dan tentukan metode untuk melakukan pekerjaan.

3.2. RecursiveTask

Untuk tugas yang mengembalikan nilai, logikanya di sini serupa, kecuali bahwa hasil untuk setiap subtugas digabungkan dalam satu hasil:

public class CustomRecursiveTask extends RecursiveTask { private int[] arr; private static final int THRESHOLD = 20; public CustomRecursiveTask(int[] arr) { this.arr = arr; } @Override protected Integer compute() { if (arr.length > THRESHOLD) { return ForkJoinTask.invokeAll(createSubtasks()) .stream() .mapToInt(ForkJoinTask::join) .sum(); } else { return processing(arr); } } private Collection createSubtasks() { List dividedTasks = new ArrayList(); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, 0, arr.length / 2))); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, arr.length / 2, arr.length))); return dividedTasks; } private Integer processing(int[] arr) { return Arrays.stream(arr) .filter(a -> a > 10 && a  a * 10) .sum(); } }

Dalam contoh ini, pekerjaan diwakili oleh larik yang disimpan dalam bidang arr dari kelas CustomRecursiveTask . Metode createSubtasks () secara rekursif membagi tugas menjadi bagian-bagian yang lebih kecil hingga setiap bagian lebih kecil dari ambang batas . Kemudian, metode invokeAll () mengirimkan subtugas ke kumpulan umum dan mengembalikan daftar Future .

Untuk memicu eksekusi, metode join () dipanggil untuk setiap subtugas.

Dalam contoh ini, ini dilakukan menggunakan API Stream Java 8 ; metode sum () digunakan sebagai representasi dari penggabungan sub hasil menjadi hasil akhir.

4. Mengirimkan Tugas ke ForkJoinPool

Untuk mengirimkan tugas ke kumpulan utas, beberapa pendekatan dapat digunakan.

Metode submit () atau execute () (kasus penggunaannya sama):

forkJoinPool.execute(customRecursiveTask); int result = customRecursiveTask.join();

Metode invoke () membagi tugas dan menunggu hasilnya, dan tidak memerlukan penggabungan manual:

int result = forkJoinPool.invoke(customRecursiveTask);

Metode invokeAll () adalah cara paling mudah untuk mengirimkan urutan ForkJoinTasks ke ForkJoinPool. Dibutuhkan tugas sebagai parameter (dua tugas, var args, atau koleksi), garpu kemudian mengembalikan kumpulan objek Future sesuai urutan pembuatannya.

Alternatifnya, Anda bisa menggunakan metode fork () dan join () terpisah . Metode fork () mengirimkan tugas ke sebuah kumpulan, tetapi tidak memicu eksekusinya. Metode join () harus digunakan untuk tujuan ini. Dalam kasus RecursiveAction , join () mengembalikan apa pun kecuali null ; untuk RecursiveTask, ini mengembalikan hasil eksekusi tugas:

customRecursiveTaskFirst.fork(); result = customRecursiveTaskLast.join();

Dalam contoh RecursiveTask kami menggunakan metode invokeAll () untuk mengirimkan urutan subtugas ke kumpulan. Pekerjaan yang sama dapat dilakukan dengan fork () dan join () , meskipun hal ini memiliki konsekuensi untuk pengurutan hasil.

Untuk menghindari kebingungan, sebaiknya gunakan metode invokeAll () untuk mengirimkan lebih dari satu tugas ke ForkJoinPool.

5. Kesimpulan

Menggunakan kerangka kerja fork / join dapat mempercepat pemrosesan tugas besar, tetapi untuk mencapai hasil ini, beberapa pedoman harus diikuti:

  • Gunakan kumpulan thread sesedikit mungkin - dalam banyak kasus, keputusan terbaik adalah menggunakan satu kumpulan thread per aplikasi atau sistem
  • Gunakan kumpulan benang umum default, jika tidak diperlukan penyetelan khusus
  • Gunakan ambang batas yang wajar untuk memisahkan ForkJoinTask menjadi subtugas
  • Hindari pemblokiran apa pun di ForkJoinTasks Anda

Contoh yang digunakan dalam artikel ini tersedia di repositori GitHub tertaut.