CyclicBarrier di Java

1. Perkenalan

CyclicBarriers adalah konstruksi sinkronisasi yang diperkenalkan dengan Java 5 sebagai bagian dari paket java.util.concurrent .

Dalam artikel ini, kita akan mempelajari implementasi ini dalam skenario konkurensi.

2. Java Concurrency - Synchronizer

The java.util.concurrent paket berisi beberapa kelas yang membantu mengelola satu set benang yang berkolaborasi satu sama lain. Beberapa di antaranya adalah:

  • CyclicBarrier
  • Phaser
  • CountDownLatch
  • Penukar
  • Tiang sinyal
  • SynchronousQueue

Kelas-kelas ini menawarkan fungsionalitas di luar kotak untuk pola interaksi umum antar utas.

Jika kita memiliki sekumpulan utas yang berkomunikasi satu sama lain dan menyerupai salah satu pola umum, kita dapat menggunakan kembali kelas perpustakaan yang sesuai (juga disebut Synchronizers ) daripada mencoba membuat skema khusus menggunakan satu set kunci dan kondisi objek dan kata kunci tersinkronisasi .

Mari fokus pada CyclicBarrier ke depan.

3. CyclicBarrier

Sebuah CyclicBarrier adalah sinkronisasi yang memungkinkan satu set benang untuk menunggu satu sama lain untuk mencapai titik eksekusi yang umum, juga disebut penghalang .

CyclicBarriers digunakan dalam program di mana kami memiliki sejumlah utas tetap yang harus menunggu satu sama lain untuk mencapai titik yang sama sebelum melanjutkan eksekusi.

Penghalang ini disebut siklik karena dapat digunakan kembali setelah utas menunggu dilepaskan.

4. Penggunaan

Konstruktor untuk CyclicBarrier sederhana. Dibutuhkan satu bilangan bulat yang menunjukkan jumlah utas yang perlu memanggil metode await () pada instance penghalang untuk menandakan mencapai titik eksekusi bersama:

public CyclicBarrier(int parties)

Utas yang perlu menyinkronkan eksekusinya juga disebut partai dan memanggil metode await () adalah bagaimana kita dapat mendaftarkan bahwa utas tertentu telah mencapai titik penghalang.

Panggilan ini sinkron dan thread yang memanggil metode ini menangguhkan eksekusi sampai sejumlah thread tertentu memanggil metode yang sama pada barrier. Situasi di mana jumlah utas yang diperlukan disebut await () , disebut tripping the barrier .

Secara opsional, kita bisa meneruskan argumen kedua ke konstruktor, yang merupakan instance Runnable . Ini memiliki logika yang akan dijalankan oleh utas terakhir yang melewati penghalang:

public CyclicBarrier(int parties, Runnable barrierAction)

5. Implementasi

Untuk melihat CyclicBarrier beraksi, mari pertimbangkan skenario berikut:

Ada operasi yang dilakukan sejumlah utas dan menyimpan hasil yang sesuai dalam daftar. Saat semua utas selesai melakukan tindakannya, salah satunya (biasanya utas terakhir yang melewati penghalang) mulai memproses data yang diambil oleh masing-masing utas ini.

Mari terapkan kelas utama tempat semua aksi terjadi:

public class CyclicBarrierDemo { private CyclicBarrier cyclicBarrier; private List
    
      partialResults = Collections.synchronizedList(new ArrayList()); private Random random = new Random(); private int NUM_PARTIAL_RESULTS; private int NUM_WORKERS; // ... }
    

Kelas ini cukup mudah - NUM_WORKERS adalah jumlah utas yang akan dieksekusi dan NUM_PARTIAL_RESULTS adalah jumlah hasil yang akan dihasilkan oleh masing-masing utas pekerja.

Terakhir, kami memiliki parsialResults yang merupakan daftar yang akan menyimpan hasil dari setiap utas pekerja ini. Perhatikan bahwa daftar ini adalah SynchronizedList karena beberapa utas akan menulis padanya pada saat yang sama, dan metode add () tidak aman untuk utas di ArrayList biasa .

Sekarang mari kita menerapkan logika dari setiap utas pekerja:

public class CyclicBarrierDemo { // ... class NumberCruncherThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); List partialResult = new ArrayList(); // Crunch some numbers and store the partial result for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) { Integer num = random.nextInt(10); System.out.println(thisThreadName + ": Crunching some numbers! Final result - " + num); partialResult.add(num); } partialResults.add(partialResult); try { System.out.println(thisThreadName + " waiting for others to reach barrier."); cyclicBarrier.await(); } catch (InterruptedException e) { // ... } catch (BrokenBarrierException e) { // ... } } } }

Kami sekarang akan menerapkan logika yang berjalan saat penghalang telah tersandung.

Untuk menyederhanakan, mari tambahkan semua angka di daftar hasil parsial:

public class CyclicBarrierDemo { // ... class AggregatorThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); System.out.println( thisThreadName + ": Computing sum of " + NUM_WORKERS + " workers, having " + NUM_PARTIAL_RESULTS + " results each."); int sum = 0; for (List threadResult : partialResults) { System.out.print("Adding "); for (Integer partialResult : threadResult) { System.out.print(partialResult+" "); sum += partialResult; } System.out.println(); } System.out.println(thisThreadName + ": Final result = " + sum); } } }

Langkah terakhir adalah membuat CyclicBarrier dan memulai dengan metode main () :

public class CyclicBarrierDemo { // Previous code public void runSimulation(int numWorkers, int numberOfPartialResults) { NUM_PARTIAL_RESULTS = numberOfPartialResults; NUM_WORKERS = numWorkers; cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread()); System.out.println("Spawning " + NUM_WORKERS + " worker threads to compute " + NUM_PARTIAL_RESULTS + " partial results each"); for (int i = 0; i < NUM_WORKERS; i++) { Thread worker = new Thread(new NumberCruncherThread()); worker.setName("Thread " + i); worker.start(); } } public static void main(String[] args) { CyclicBarrierDemo demo = new CyclicBarrierDemo(); demo.runSimulation(5, 3); } } 

Dalam kode di atas, kami menginisialisasi penghalang siklik dengan 5 utas yang masing-masing menghasilkan 3 bilangan bulat sebagai bagian dari penghitungannya dan menyimpan yang sama dalam daftar yang dihasilkan.

Setelah penghalang tersandung, utas terakhir yang tersandung penghalang menjalankan logika yang ditentukan dalam AggregatorThread, yaitu - tambahkan semua angka yang dihasilkan oleh utas.

6. Hasil

Berikut adalah keluaran dari satu eksekusi program di atas - setiap eksekusi dapat membuat hasil yang berbeda karena utas dapat muncul dalam urutan yang berbeda:

Spawning 5 worker threads to compute 3 partial results each Thread 0: Crunching some numbers! Final result - 6 Thread 0: Crunching some numbers! Final result - 2 Thread 0: Crunching some numbers! Final result - 2 Thread 0 waiting for others to reach barrier. Thread 1: Crunching some numbers! Final result - 2 Thread 1: Crunching some numbers! Final result - 0 Thread 1: Crunching some numbers! Final result - 5 Thread 1 waiting for others to reach barrier. Thread 3: Crunching some numbers! Final result - 6 Thread 3: Crunching some numbers! Final result - 4 Thread 3: Crunching some numbers! Final result - 0 Thread 3 waiting for others to reach barrier. Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 0 Thread 2 waiting for others to reach barrier. Thread 4: Crunching some numbers! Final result - 9 Thread 4: Crunching some numbers! Final result - 3 Thread 4: Crunching some numbers! Final result - 5 Thread 4 waiting for others to reach barrier. Thread 4: Computing final sum of 5 workers, having 3 results each. Adding 6 2 2 Adding 2 0 5 Adding 6 4 0 Adding 1 1 0 Adding 9 3 5 Thread 4: Final result = 46 

Seperti yang ditunjukkan oleh output di atas, Thread 4 adalah thread yang melewati penghalang dan juga menjalankan logika agregasi akhir. Juga tidak perlu bahwa utas benar-benar dijalankan dalam urutan seperti yang diperlihatkan contoh di atas.

7. Kesimpulan

Dalam artikel ini, kami melihat apa itu CyclicBarrier , dan situasi seperti apa yang dapat membantu.

Kami juga menerapkan skenario di mana kami membutuhkan sejumlah utas tetap untuk mencapai titik eksekusi tetap, sebelum melanjutkan dengan logika program lainnya.

Seperti biasa, kode untuk tutorial dapat ditemukan di GitHub.