Pengantar Exchanger di Jawa

1. Ikhtisar

Dalam tutorial ini, kita akan melihat java.util.concurrent.Exchanger. Ini berfungsi sebagai titik umum untuk dua utas di Java untuk bertukar objek di antara mereka.

2. Pengantar Exchanger

The Exchanger kelas di Jawa dapat digunakan untuk benda saham antara dua benang tipe T . Kelas hanya menyediakan satu pertukaran metode kelebihan beban (T t) .

Saat dipanggil pertukaran menunggu utas lain dalam pasangan untuk memanggilnya juga. Pada titik ini, utas kedua menemukan utas pertama sedang menunggu dengan objeknya. Benang menukar objek yang mereka pegang dan memberi sinyal pertukaran, dan sekarang mereka dapat kembali.

Mari kita lihat contoh untuk memahami pertukaran pesan antara dua utas dengan Exchanger :

@Test public void givenThreads_whenMessageExchanged_thenCorrect() { Exchanger exchanger = new Exchanger(); Runnable taskA = () -> { try { String message = exchanger.exchange("from A"); assertEquals("from B", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; Runnable taskB = () -> { try { String message = exchanger.exchange("from B"); assertEquals("from A", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; CompletableFuture.allOf( runAsync(taskA), runAsync(taskB)).join(); }

Di sini, kami memiliki dua utas yang bertukar pesan antara satu sama lain menggunakan penukar umum. Mari kita lihat contoh di mana kita menukar objek dari utas utama dengan utas baru:

@Test public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException { Exchanger exchanger = new Exchanger(); Runnable runner = () -> { try { String message = exchanger.exchange("from runner"); assertEquals("to runner", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; CompletableFuture result = CompletableFuture.runAsync(runner); String msg = exchanger.exchange("to runner"); assertEquals("from runner", msg); result.join(); }

Perhatikan bahwa, kita perlu memulai thread runner terlebih dahulu dan kemudian memanggil exchange () di thread utama.

Juga, perhatikan bahwa panggilan utas pertama mungkin timeout jika utas kedua tidak mencapai titik pertukaran waktu. Berapa lama thread pertama harus menunggu dapat dikontrol menggunakan pertukaran yang kelebihan beban (T t, long timeout, TimeUnit timeUnit).

3. Tidak Ada Pertukaran Data GC

Exchanger dapat digunakan untuk membuat jenis pola pipeline dengan meneruskan data dari satu thread ke thread lainnya. Di bagian ini, kita akan membuat tumpukan untaian sederhana yang terus menerus meneruskan data antara satu sama lain sebagai pipeline.

@Test public void givenData_whenPassedThrough_thenCorrect() throws InterruptedException { Exchanger
    
      readerExchanger = new Exchanger(); Exchanger
     
       writerExchanger = new Exchanger(); Runnable reader = () -> { Queue readerBuffer = new ConcurrentLinkedQueue(); while (true) { readerBuffer.add(UUID.randomUUID().toString()); if (readerBuffer.size() >= BUFFER_SIZE) { readerBuffer = readerExchanger.exchange(readerBuffer); } } }; Runnable processor = () -> { Queue processorBuffer = new ConcurrentLinkedQueue(); Queue writerBuffer = new ConcurrentLinkedQueue(); processorBuffer = readerExchanger.exchange(processorBuffer); while (true) { writerBuffer.add(processorBuffer.poll()); if (processorBuffer.isEmpty()) { processorBuffer = readerExchanger.exchange(processorBuffer); writerBuffer = writerExchanger.exchange(writerBuffer); } } }; Runnable writer = () -> { Queue writerBuffer = new ConcurrentLinkedQueue(); writerBuffer = writerExchanger.exchange(writerBuffer); while (true) { System.out.println(writerBuffer.poll()); if (writerBuffer.isEmpty()) { writerBuffer = writerExchanger.exchange(writerBuffer); } } }; CompletableFuture.allOf( runAsync(reader), runAsync(processor), runAsync(writer)).join(); }
     
    

Di sini, kami memiliki tiga utas: pembaca , prosesor , dan penulis . Bersama-sama, mereka bekerja sebagai pipa tunggal yang bertukar data di antara mereka.

The readerExchanger dibagi antara pembaca dan prosesor benang, sedangkan writerExchanger dibagi antara prosesor dan penulis benang.

Perhatikan bahwa contoh di sini hanya untuk demonstrasi. Kita harus berhati-hati saat membuat loop tak terbatas dengan while (true) . Juga agar kode tetap terbaca, kami telah menghilangkan beberapa penanganan pengecualian.

Pola pertukaran data saat menggunakan kembali buffer memungkinkan pengumpulan sampah lebih sedikit. Metode pertukaran mengembalikan instance antrian yang sama dan dengan demikian tidak akan ada GC untuk objek ini. Tidak seperti antrian pemblokiran, exchanger tidak membuat node atau objek untuk menyimpan dan berbagi data.

Membuat pipeline serupa dengan pola Disrupter, dengan perbedaan utama, pola Disrupter mendukung banyak produsen dan konsumen, sedangkan exchanger dapat digunakan antara sepasang konsumen dan produsen.

4. Kesimpulan

Jadi, kami telah mempelajari apa itu Exchanger di Java, cara kerjanya, dan kami telah melihat cara menggunakan kelas Exchanger . Selain itu, kami membuat pipeline dan mendemonstrasikan pertukaran data tanpa GC antar thread.

Seperti biasa, kode tersedia di GitHub.