Panduan untuk CountDownLatch di Java

1. Perkenalan

Dalam artikel ini, kami akan memberikan panduan ke class CountDownLatch dan mendemonstrasikan bagaimana class tersebut dapat digunakan dalam beberapa contoh praktis.

Pada dasarnya, dengan menggunakan CountDownLatch kita dapat menyebabkan utas diblokir hingga utas lain menyelesaikan tugas yang diberikan.

2. Penggunaan dalam Pemrograman Bersamaan

Sederhananya, CountDownLatch memiliki bidang penghitung , yang dapat Anda kurangi sesuai kebutuhan. Kami kemudian dapat menggunakannya untuk memblokir utas panggilan sampai itu dihitung mundur ke nol.

Jika kami melakukan beberapa pemrosesan paralel, kami dapat membuat instance CountDownLatch dengan nilai yang sama untuk penghitung sebagai sejumlah utas yang ingin kami tangani. Kemudian, kita bisa memanggil countdown () setelah setiap utas selesai, menjamin bahwa utas dependen yang memanggil await () akan diblokir hingga utas pekerja selesai.

3. Menunggu Kumpulan Untaian Selesai

Mari kita coba pola ini dengan membuat Worker dan menggunakan bidang CountDownLatch untuk memberi sinyal ketika sudah selesai:

public class Worker implements Runnable { private List outputScraper; private CountDownLatch countDownLatch; public Worker(List outputScraper, CountDownLatch countDownLatch) { this.outputScraper = outputScraper; this.countDownLatch = countDownLatch; } @Override public void run() { doSomeWork(); outputScraper.add("Counted down"); countDownLatch.countDown(); } }

Kemudian, mari buat pengujian untuk membuktikan bahwa kita bisa mendapatkan CountDownLatch untuk menunggu instance Worker selesai:

@Test public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch countDownLatch = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new Worker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); outputScraper.add("Latch released"); assertThat(outputScraper) .containsExactly( "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Latch released" ); }

Biasanya "Latch dirilis" akan selalu menjadi keluaran terakhir - karena bergantung pada rilis CountDownLatch .

Perhatikan bahwa jika kita tidak memanggil await () , kita tidak akan dapat menjamin urutan eksekusi thread, sehingga pengujian akan gagal secara acak.

4. Kumpulan Untaian Menunggu untuk Dimulai

Jika kita mengambil contoh sebelumnya, tetapi kali ini memulai ribuan utas, bukan lima, kemungkinan banyak utas sebelumnya telah selesai diproses bahkan sebelum kami memanggil start () pada utas selanjutnya. Hal ini dapat mempersulit untuk mencoba dan mereproduksi masalah konkurensi, karena kami tidak dapat menjalankan semua utas secara paralel.

Untuk menyiasatinya, mari kita membuat CountdownLatch bekerja secara berbeda dari pada contoh sebelumnya. Daripada memblokir thread induk sampai beberapa thread anak selesai, kita dapat memblokir setiap thread anak sampai thread lainnya dimulai.

Mari kita ubah metode run () kita sehingga memblok sebelum diproses:

public class WaitingWorker implements Runnable { private List outputScraper; private CountDownLatch readyThreadCounter; private CountDownLatch callingThreadBlocker; private CountDownLatch completedThreadCounter; public WaitingWorker( List outputScraper, CountDownLatch readyThreadCounter, CountDownLatch callingThreadBlocker, CountDownLatch completedThreadCounter) { this.outputScraper = outputScraper; this.readyThreadCounter = readyThreadCounter; this.callingThreadBlocker = callingThreadBlocker; this.completedThreadCounter = completedThreadCounter; } @Override public void run() { readyThreadCounter.countDown(); try { callingThreadBlocker.await(); doSomeWork(); outputScraper.add("Counted down"); } catch (InterruptedException e) { e.printStackTrace(); } finally { completedThreadCounter.countDown(); } } }

Sekarang, mari kita ubah pengujian kita sehingga memblokir sampai semua Pekerja telah memulai, membuka blokir Pekerja, dan kemudian memblokir sampai Pekerja selesai:

@Test public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch readyThreadCounter = new CountDownLatch(5); CountDownLatch callingThreadBlocker = new CountDownLatch(1); CountDownLatch completedThreadCounter = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new WaitingWorker( outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter))) .limit(5) .collect(toList()); workers.forEach(Thread::start); readyThreadCounter.await(); outputScraper.add("Workers ready"); callingThreadBlocker.countDown(); completedThreadCounter.await(); outputScraper.add("Workers complete"); assertThat(outputScraper) .containsExactly( "Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete" ); }

Pola ini sangat berguna untuk mencoba mereproduksi bug konkurensi, karena dapat digunakan untuk memaksa ribuan utas mencoba dan melakukan beberapa logika secara paralel.

5. Menghentikan CountdownLatch Lebih Awal

Terkadang, kami mungkin mengalami situasi di mana Pekerja berhenti karena kesalahan sebelum menghitung mundur CountDownLatch. Ini bisa mengakibatkannya tidak pernah mencapai nol dan await () tidak pernah berhenti:

@Override public void run() { if (true) { throw new RuntimeException("Oh dear, I'm a BrokenWorker"); } countDownLatch.countDown(); outputScraper.add("Counted down"); }

Mari modifikasi pengujian sebelumnya untuk menggunakan BrokenWorker, untuk menunjukkan bagaimana await () akan memblokir selamanya:

@Test public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch countDownLatch = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); }

Jelas, ini bukan perilaku yang kita inginkan - akan jauh lebih baik jika aplikasi terus berlanjut daripada memblokir tanpa batas.

Untuk menyiasati ini, mari tambahkan argumen batas waktu ke panggilan kita ke await ().

boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS); assertThat(completed).isFalse();

Seperti yang bisa kita lihat, pengujian pada akhirnya akan habis dan await () akan mengembalikan nilai salah .

6. Kesimpulan

Dalam panduan singkat ini, kami telah mendemonstrasikan bagaimana kami dapat menggunakan CountDownLatch untuk memblokir utas hingga utas lain menyelesaikan beberapa pemrosesan.

Kami juga telah menunjukkan bagaimana hal itu dapat digunakan untuk membantu men-debug masalah konkurensi dengan memastikan thread berjalan secara paralel.

Implementasi dari contoh-contoh ini dapat ditemukan di GitHub; ini adalah proyek berbasis Maven, jadi harus mudah dijalankan apa adanya.