Semaphore di Jawa

1. Ikhtisar

Dalam tutorial singkat ini, kita akan menjelajahi dasar-dasar semaphore dan mutex di Java.

2. Semaphore

Kami akan mulai dengan java.util.concurrent.Semaphore. Kita dapat menggunakan semaphore untuk membatasi jumlah thread bersamaan yang mengakses resource tertentu.

Dalam contoh berikut, kami akan menerapkan antrian login sederhana untuk membatasi jumlah pengguna di sistem:

class LoginQueueUsingSemaphore { private Semaphore semaphore; public LoginQueueUsingSemaphore(int slotLimit) { semaphore = new Semaphore(slotLimit); } boolean tryLogin() { return semaphore.tryAcquire(); } void logout() { semaphore.release(); } int availableSlots() { return semaphore.availablePermits(); } }

Perhatikan bagaimana kami menggunakan metode berikut:

  • tryAcquire () - mengembalikan nilai true jika izin tersedia segera dan memperolehnya jika tidak mengembalikan salah, tetapi memperoleh () memperoleh izin dan memblokir sampai tersedia
  • release () - merilis izin
  • availablePermits () - mengembalikan jumlah izin saat ini yang tersedia

Untuk menguji antrian login kami, pertama-tama kami akan mencoba mencapai batas tersebut dan memeriksa apakah upaya login berikutnya akan diblokir:

@Test public void givenLoginQueue_whenReachLimit_thenBlocked() { int slots = 10; ExecutorService executorService = Executors.newFixedThreadPool(slots); LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(loginQueue::tryLogin)); executorService.shutdown(); assertEquals(0, loginQueue.availableSlots()); assertFalse(loginQueue.tryLogin()); }

Selanjutnya, kita akan melihat apakah ada slot yang tersedia setelah logout:

@Test public void givenLoginQueue_whenLogout_thenSlotsAvailable() { int slots = 10; ExecutorService executorService = Executors.newFixedThreadPool(slots); LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(loginQueue::tryLogin)); executorService.shutdown(); assertEquals(0, loginQueue.availableSlots()); loginQueue.logout(); assertTrue(loginQueue.availableSlots() > 0); assertTrue(loginQueue.tryLogin()); }

3. Semaphore Jangka Waktu

Selanjutnya kita akan membahas Apache Commons TimedSemaphore. TimedSemaphore memungkinkan sejumlah izin sebagai Semaphore sederhana tetapi dalam jangka waktu tertentu, setelah periode ini waktu reset dan semua izin dilepaskan.

Kita dapat menggunakan TimedSemaphore untuk membuat antrean penundaan sederhana sebagai berikut:

class DelayQueueUsingTimedSemaphore { private TimedSemaphore semaphore; DelayQueueUsingTimedSemaphore(long period, int slotLimit) { semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit); } boolean tryAdd() { return semaphore.tryAcquire(); } int availableSlots() { return semaphore.getAvailablePermits(); } }

Saat kami menggunakan antrian penundaan dengan satu detik sebagai periode waktu dan setelah menggunakan semua slot dalam satu detik, tidak ada yang tersedia:

public void givenDelayQueue_whenReachLimit_thenBlocked() { int slots = 50; ExecutorService executorService = Executors.newFixedThreadPool(slots); DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(delayQueue::tryAdd)); executorService.shutdown(); assertEquals(0, delayQueue.availableSlots()); assertFalse(delayQueue.tryAdd()); }

Tetapi setelah tidur untuk beberapa waktu, semaphore harus mengatur ulang dan melepaskan izin :

@Test public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException { int slots = 50; ExecutorService executorService = Executors.newFixedThreadPool(slots); DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(delayQueue::tryAdd)); executorService.shutdown(); assertEquals(0, delayQueue.availableSlots()); Thread.sleep(1000); assertTrue(delayQueue.availableSlots() > 0); assertTrue(delayQueue.tryAdd()); }

4. Semaphore vs. Mutex

Mutex bertindak mirip dengan semaphore biner, kita dapat menggunakannya untuk mengimplementasikan pengecualian bersama.

Dalam contoh berikut, kita akan menggunakan semafor biner sederhana untuk membuat penghitung:

class CounterUsingMutex { private Semaphore mutex; private int count; CounterUsingMutex() { mutex = new Semaphore(1); count = 0; } void increase() throws InterruptedException { mutex.acquire(); this.count = this.count + 1; Thread.sleep(1000); mutex.release(); } int getCount() { return this.count; } boolean hasQueuedThreads() { return mutex.hasQueuedThreads(); } }

Ketika banyak utas mencoba mengakses penghitung sekaligus, mereka akan diblokir dalam antrian :

@Test public void whenMutexAndMultipleThreads_thenBlocked() throws InterruptedException { int count = 5; ExecutorService executorService = Executors.newFixedThreadPool(count); CounterUsingMutex counter = new CounterUsingMutex(); IntStream.range(0, count) .forEach(user -> executorService.execute(() -> { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); assertTrue(counter.hasQueuedThreads()); }

Saat kami menunggu, semua utas akan mengakses penghitung dan tidak ada utas yang tersisa di antrian:

@Test public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount() throws InterruptedException { int count = 5; ExecutorService executorService = Executors.newFixedThreadPool(count); CounterUsingMutex counter = new CounterUsingMutex(); IntStream.range(0, count) .forEach(user -> executorService.execute(() -> { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); assertTrue(counter.hasQueuedThreads()); Thread.sleep(5000); assertFalse(counter.hasQueuedThreads()); assertEquals(count, counter.getCount()); }

5. Kesimpulan

Pada artikel ini, kami membahas dasar-dasar semaphore di Java.

Seperti biasa, kode sumber lengkap tersedia di GitHub.