Panduan untuk java.util.concurrent.BlockingQueue

1. Ikhtisar

Pada artikel ini, kita akan melihat salah satu konstruksi java.util.concurrent yang paling berguna untuk menyelesaikan masalah produsen-konsumen secara bersamaan. Kita akan melihat API dari antarmuka BlockingQueue dan bagaimana metode dari antarmuka itu membuat penulisan program bersamaan lebih mudah.

Nanti di artikel, kami akan menunjukkan contoh program sederhana yang memiliki beberapa utas produsen dan beberapa utas konsumen.

2. Jenis BlockingQueue

Kita dapat membedakan dua jenis BlockingQueue :

  • antrian tak terbatas - dapat tumbuh hampir tanpa batas
  • antrian terbatas - dengan kapasitas maksimal yang ditentukan

2.1. Antrean Tidak Terbatas

Membuat antrian tak terbatas itu sederhana:

BlockingQueue blockingQueue = new LinkedBlockingDeque();

Kapasitas pemblokiranQueue akan disetel ke Integer.MAX_VALUE. Semua operasi yang menambahkan elemen ke antrian tak terbatas tidak akan pernah diblokir, sehingga dapat berkembang menjadi ukuran yang sangat besar.

Hal terpenting saat mendesain program produsen-konsumen menggunakan unbounded BlockingQueue adalah konsumen harus dapat menggunakan pesan secepat produsen menambahkan pesan ke antrian. Jika tidak, memori dapat terisi dan kami akan mendapatkan pengecualian OutOfMemory .

2.2. Antrian Terikat

Jenis antrian kedua adalah antrian berbatas. Kita dapat membuat antrian seperti itu dengan meneruskan kapasitas sebagai argumen ke konstruktor:

BlockingQueue blockingQueue = new LinkedBlockingDeque(10);

Di sini kita memiliki blockingQueue yang memiliki kapasitas sama dengan 10. Ini berarti bahwa ketika seorang produser mencoba menambahkan elemen ke antrian yang sudah penuh, bergantung pada metode yang digunakan untuk menambahkannya ( offer () , add () atau put () ), ini akan memblokir hingga ruang untuk memasukkan objek tersedia. Jika tidak, operasi akan gagal.

Menggunakan antrian terbatas adalah cara yang baik untuk mendesain program bersamaan karena ketika kita memasukkan elemen ke antrian yang sudah penuh, operasi itu harus menunggu sampai konsumen menyusul dan menyediakan beberapa ruang di antrian. Ini memberi kita pembatasan tanpa usaha dari pihak kita.

3. BlockingQueue API

Ada dua jenis metode dalam antarmuka BlockingQueue - metode yang bertanggung jawab untuk menambahkan elemen ke antrean dan metode yang mengambil elemen tersebut. Setiap metode dari kedua grup tersebut berperilaku berbeda jika antriannya penuh / kosong.

3.1. Menambahkan Elemen

  • add () - mengembalikan nilai true jika penyisipan berhasil, jika tidak, akan menampilkan IllegalStateException
  • put () - memasukkan elemen yang ditentukan ke dalam antrian, menunggu slot kosong jika perlu
  • offer () - mengembalikan nilai true jika penyisipan berhasil, jika tidak salah
  • offer (E e, long timeout, TimeUnit unit) - mencoba memasukkan elemen ke dalam antrian dan menunggu slot yang tersedia dalam batas waktu yang ditentukan

3.2. Mengambil Elemen

  • take () - menunggu elemen head dari antrian dan menghapusnya. Jika antrian kosong, ia memblokir dan menunggu elemen tersedia
  • poll (waktu tunggu lama, unit TimeUnit) - mengambil dan menghapus kepala antrian, menunggu hingga waktu tunggu yang ditentukan jika perlu agar elemen tersedia. Mengembalikan nol setelah batas waktu

Metode ini adalah blok bangunan terpenting dari antarmuka BlockingQueue saat membuat program produsen-konsumen.

4. Contoh Produsen-Konsumen Multithreaded

Mari buat program yang terdiri dari dua bagian - Produsen dan Konsumen.

Produser akan menghasilkan nomor acak dari 0 hingga 100 dan akan memasukkan nomor tersebut ke dalam BlockingQueue . Kami akan memiliki 4 utas produser dan menggunakan metode put () untuk memblokir sampai ada ruang yang tersedia di antrian.

Hal penting yang harus diingat adalah kita harus menghentikan utas konsumen menunggu elemen muncul dalam antrian tanpa batas.

Teknik yang baik untuk memberi sinyal dari produsen kepada konsumen bahwa tidak ada lagi pesan yang harus diproses adalah dengan mengirimkan pesan khusus yang disebut pil racun. Kita perlu mengirim pil racun sebanyak konsumen kita. Kemudian ketika konsumen akan mengambil pesan pil racun khusus dari antrian, itu akan menyelesaikan eksekusi dengan anggun.

Mari kita lihat kelas produser:

public class NumbersProducer implements Runnable { private BlockingQueue numbersQueue; private final int poisonPill; private final int poisonPillPerProducer; public NumbersProducer(BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) { this.numbersQueue = numbersQueue; this.poisonPill = poisonPill; this.poisonPillPerProducer = poisonPillPerProducer; } public void run() { try { generateNumbers(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void generateNumbers() throws InterruptedException { for (int i = 0; i < 100; i++) { numbersQueue.put(ThreadLocalRandom.current().nextInt(100)); } for (int j = 0; j < poisonPillPerProducer; j++) { numbersQueue.put(poisonPill); } } }

Konstruktor produsen kami menganggap BlockingQueue yang digunakan untuk mengoordinasikan pemrosesan antara produsen dan konsumen sebagai argumen . Kami melihat bahwa metode generateNumbers () akan menempatkan 100 elemen dalam antrian. Dibutuhkan juga pesan poison pill, untuk mengetahui jenis pesan apa yang harus dimasukkan ke dalam antrian ketika eksekusi akan selesai. Pesan itu perlu menempatkan waktu poisonPillPerProducer ke dalam antrian.

Setiap konsumen akan mengambil elemen dari BlockingQueue menggunakan metode take () sehingga akan memblokir hingga ada elemen dalam antrian. Setelah mengambil Integer dari antrian ia memeriksa apakah pesannya adalah pil racun, jika ya maka eksekusi utas selesai. Jika tidak, itu akan mencetak hasil pada keluaran standar bersama dengan nama utas saat ini.

Ini akan memberi kami wawasan tentang cara kerja konsumen kami:

public class NumbersConsumer implements Runnable { private BlockingQueue queue; private final int poisonPill; public NumbersConsumer(BlockingQueue queue, int poisonPill) { this.queue = queue; this.poisonPill = poisonPill; } public void run() { try { while (true) { Integer number = queue.take(); if (number.equals(poisonPill)) { return; } System.out.println(Thread.currentThread().getName() + " result: " + number); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }

Hal penting yang harus diperhatikan adalah penggunaan antrian. Sama seperti di produsen konstruktor, antrian diteruskan sebagai argumen. Kami dapat melakukannya karena BlockingQueue dapat dibagikan antar utas tanpa sinkronisasi eksplisit.

Sekarang setelah produsen dan konsumen kita, kita bisa memulai program kita. Kita perlu menentukan kapasitas antrian, dan kita mengaturnya menjadi 100 elemen.

Kami ingin memiliki 4 utas produsen dan jumlah utas konsumen akan sama dengan jumlah prosesor yang tersedia:

int BOUND = 10; int N_PRODUCERS = 4; int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); int poisonPill = Integer.MAX_VALUE; int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; int mod = N_CONSUMERS % N_PRODUCERS; BlockingQueue queue = new LinkedBlockingQueue(BOUND); for (int i = 1; i < N_PRODUCERS; i++) { new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start(); } for (int j = 0; j < N_CONSUMERS; j++) { new Thread(new NumbersConsumer(queue, poisonPill)).start(); } new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start(); 

BlockingQueue dibuat menggunakan konstruksi dengan kapasitas. Kami menciptakan 4 produsen dan konsumen N. Kami menetapkan pesan pil racun kami menjadi Integer.MAX_VALUE karena nilai tersebut tidak akan pernah dikirim oleh produsen kami dalam kondisi kerja normal. Hal terpenting untuk diperhatikan di sini adalah BlockingQueue digunakan untuk mengoordinasikan pekerjaan di antara mereka.

Saat kami menjalankan program, 4 utas produsen akan menempatkan Integer acak dalam BlockingQueue dan konsumen akan mengambil elemen tersebut dari antrian. Setiap utas akan mencetak ke output standar nama utas bersama-sama dengan hasil.

5. Kesimpulan

Artikel ini menunjukkan penggunaan praktis BlockingQueue dan menjelaskan metode yang digunakan untuk menambah dan mengambil elemen darinya. Selain itu, kami telah menunjukkan cara membuat program konsumen-produsen multithread menggunakan BlockingQueue untuk mengoordinasikan pekerjaan antara produsen dan konsumen.

Penerapan semua contoh dan cuplikan kode ini dapat ditemukan di proyek GitHub - ini adalah proyek berbasis Maven, jadi semestinya mudah untuk mengimpor dan menjalankannya apa adanya.