Tinjauan java.util.concurrent

1. Ikhtisar

The java.util.concurrent paket menyediakan alat untuk membuat aplikasi bersamaan.

Pada artikel ini, kami akan memberikan gambaran umum tentang keseluruhan paket.

2. Komponen Utama

The java.util.concurrent mengandung terlalu banyak fitur untuk membahas dalam satu write-up. Pada artikel ini, kami terutama akan fokus pada beberapa utilitas paling berguna dari paket ini seperti:

  • Pelaksana
  • ExecutorService
  • ScheduledExecutorService
  • Masa depan
  • CountDownLatch
  • CyclicBarrier
  • Tiang sinyal
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Kunci
  • Phaser

Anda juga dapat menemukan banyak artikel khusus untuk kelas individu di sini.

2.1. Pelaksana

Pelaksana adalah antarmuka yang mewakili objek yang menjalankan tugas yang disediakan.

Itu tergantung pada implementasi tertentu (dari mana pemanggilan dimulai) jika tugas harus dijalankan pada utas baru atau saat ini. Oleh karena itu, dengan menggunakan antarmuka ini, kita dapat memisahkan aliran eksekusi tugas dari mekanisme eksekusi tugas yang sebenarnya.

Satu hal yang perlu diperhatikan di sini adalah bahwa Pelaksana tidak secara ketat membutuhkan eksekusi tugas menjadi asinkron. Dalam kasus yang paling sederhana, eksekutor dapat menjalankan tugas yang dikirimkan secara instan di thread pemanggilan.

Kita perlu membuat invoker untuk membuat contoh eksekutor:

public class Invoker implements Executor { @Override public void execute(Runnable r) { r.run(); } }

Sekarang, kita bisa menggunakan invoker ini untuk menjalankan tugas.

public void execute() { Executor executor = new Invoker(); executor.execute( () -> { // task to be performed }); }

Hal yang perlu diperhatikan di sini adalah jika pelaksana tidak dapat menerima tugas untuk dieksekusi, itu akan memunculkan RejectedExecutionException .

2.2. ExecutorService

ExecutorService adalah solusi lengkap untuk pemrosesan asinkron. Ini mengelola antrian dalam memori dan menjadwalkan tugas yang dikirimkan berdasarkan ketersediaan utas.

Untuk menggunakan ExecutorService, kita perlu membuat satu kelas Runnable .

public class Task implements Runnable { @Override public void run() { // task details } }

Sekarang kita dapat membuat instance ExecutorService dan menetapkan tugas ini. Pada saat pembuatan, kita perlu menentukan ukuran kumpulan benang.

ExecutorService executor = Executors.newFixedThreadPool(10);

Jika kita ingin membuat instance ExecutorService single-threaded , kita bisa menggunakan newSingleThreadExecutor (ThreadFactory threadFactory) untuk membuat instance.

Setelah pelaksana dibuat, kita dapat menggunakannya untuk mengirimkan tugas.

public void execute() { executor.submit(new Task()); }

Kami juga dapat membuat instance Runnable saat mengirimkan tugas.

executor.submit(() -> { new Task(); });

Itu juga dilengkapi dengan dua metode penghentian eksekusi out-of-the-box. Yang pertama adalah shutdown () ; itu menunggu sampai semua tugas yang dikirimkan selesai dijalankan. Metode lainnya adalah shutdownNow () whic h segera berakhir semua pending / tugas mengeksekusi.

Ada juga metode lain awaitTermination (long timeout, TimeUnit unit) yang secara paksa memblokir hingga semua tugas telah menyelesaikan eksekusi setelah peristiwa shutdown yang dipicu atau eksekusi-timeout terjadi, atau thread eksekusi itu sendiri terganggu,

try { executor.awaitTermination( 20l, TimeUnit.NANOSECONDS ); } catch (InterruptedException e) { e.printStackTrace(); }

2.3. ScheduledExecutorService

ScheduledExecutorService adalah antarmuka yang mirip dengan ExecutorService, tetapi dapat melakukan tugas secara berkala.

Executor and ExecutorService‘s methods are scheduled on the spot without introducing any artificial delay. Zero or any negative value signifies that the request needs to be executed instantly.

We can use both Runnable and Callable interface to define the task.

public void execute() { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); Future future = executorService.schedule(() -> { // ... return "Hello world"; }, 1, TimeUnit.SECONDS); ScheduledFuture scheduledFuture = executorService.schedule(() -> { // ... }, 1, TimeUnit.SECONDS); executorService.shutdown(); }

ScheduledExecutorService can also schedule the task after some given fixed delay:

executorService.scheduleAtFixedRate(() -> { // ... }, 1, 10, TimeUnit.SECONDS); executorService.scheduleWithFixedDelay(() -> { // ... }, 1, 10, TimeUnit.SECONDS);

Here, the scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit ) method creates and executes a periodic action that is invoked firstly after the provided initial delay, and subsequently with the given period until the service instance shutdowns.

The scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit ) method creates and executes a periodic action that is invoked firstly after the provided initial delay, and repeatedly with the given delay between the termination of the executing one and the invocation of the next one.

2.4. Future

Future is used to represent the result of an asynchronous operation. It comes with methods for checking if the asynchronous operation is completed or not, getting the computed result, etc.

What's more, the cancel(boolean mayInterruptIfRunning) API cancels the operation and releases the executing thread. If the value of mayInterruptIfRunning is true, the thread executing the task will be terminated instantly.

Otherwise, in-progress tasks will be allowed to complete.

We can use below code snippet to create a future instance:

public void invoke() { ExecutorService executorService = Executors.newFixedThreadPool(10); Future future = executorService.submit(() -> { // ... Thread.sleep(10000l); return "Hello world"; }); }

We can use following code snippet to check if the future result is ready and fetch the data if the computation is done:

if (future.isDone() && !future.isCancelled()) { try { str = future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }

We can also specify a timeout for a given operation. If the task takes more than this time, a TimeoutException is thrown:

try { future.get(10, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); }

2.5. CountDownLatch

CountDownLatch (introduced in JDK 5) is a utility class which blocks a set of threads until some operation completes.

A CountDownLatch is initialized with a counter(Integer type); this counter decrements as the dependent threads complete execution. But once the counter reaches zero, other threads get released.

You can learn more about CountDownLatch here.

2.6. CyclicBarrier

CyclicBarrier works almost the same as CountDownLatch except that we can reuse it. Unlike CountDownLatch, it allows multiple threads to wait for each other using await() method(known as barrier condition) before invoking the final task.

We need to create a Runnable task instance to initiate the barrier condition:

public class Task implements Runnable { private CyclicBarrier barrier; public Task(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { LOG.info(Thread.currentThread().getName() + " is waiting"); barrier.await(); LOG.info(Thread.currentThread().getName() + " is released"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }

Now we can invoke some threads to race for the barrier condition:

public void start() { CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { // ... LOG.info("All previous tasks are completed"); }); Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); if (!cyclicBarrier.isBroken()) { t1.start(); t2.start(); t3.start(); } }

Here, the isBroken() method checks if any of the threads got interrupted during the execution time. We should always perform this check before performing the actual process.

2.7. Semaphore

The Semaphore is used for blocking thread level access to some part of the physical or logical resource. A semaphore contains a set of permits; whenever a thread tries to enter the critical section, it needs to check the semaphore if a permit is available or not.

If a permit is not available (via tryAcquire()), the thread is not allowed to jump into the critical section; however, if the permit is available the access is granted, and the permit counter decreases.

Once the executing thread releases the critical section, again the permit counter increases (done by release() method).

We can specify a timeout for acquiring access by using the tryAcquire(long timeout, TimeUnit unit) method.

We can also check the number of available permits or the number of threads waiting to acquire the semaphore.

Following code snippet can be used to implement a semaphore:

static Semaphore semaphore = new Semaphore(10); public void execute() throws InterruptedException { LOG.info("Available permit : " + semaphore.availablePermits()); LOG.info("Number of threads waiting to acquire: " + semaphore.getQueueLength()); if (semaphore.tryAcquire()) { try { // ... } finally { semaphore.release(); } } }

We can implement a Mutex like data-structure using Semaphore. More details on this can be found here.

2.8. ThreadFactory

As the name suggests, ThreadFactory acts as a thread (non-existing) pool which creates a new thread on demand. It eliminates the need of a lot of boilerplate coding for implementing efficient thread creation mechanisms.

We can define a ThreadFactory:

public class BaeldungThreadFactory implements ThreadFactory { private int threadId; private String name; public BaeldungThreadFactory(String name) { threadId = 1; this.name = name; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, name + "-Thread_" + threadId); LOG.info("created new thread with id : " + threadId + " and name : " + t.getName()); threadId++; return t; } }

We can use this newThread(Runnable r) method to create a new thread at runtime:

BaeldungThreadFactory factory = new BaeldungThreadFactory( "BaeldungThreadFactory"); for (int i = 0; i < 10; i++) { Thread t = factory.newThread(new Task()); t.start(); }

2.9. BlockingQueue

In asynchronous programming, one of the most common integration patterns is the producer-consumer pattern. The java.util.concurrent package comes with a data-structure know as BlockingQueue – which can be very useful in these async scenarios.

More information and a working example on this is available here.

2.10. DelayQueue

DelayQueue is an infinite-size blocking queue of elements where an element can only be pulled if it's expiration time (known as user defined delay) is completed. Hence, the topmost element (head) will have the most amount delay and it will be polled last.

More information and a working example on this is available here.

2.11. Locks

Not surprisingly, Lock is a utility for blocking other threads from accessing a certain segment of code, apart from the thread that's executing it currently.

The main difference between a Lock and a Synchronized block is that synchronized block is fully contained in a method; however, we can have Lock API’s lock() and unlock() operation in separate methods.

More information and a working example on this is available here.

2.12. Phaser

Phaser adalah solusi yang lebih fleksibel daripada CyclicBarrier dan CountDownLatch - digunakan untuk bertindak sebagai penghalang dapat digunakan kembali yang nomor dinamis benang perlu menunggu sebelum melanjutkan eksekusi. Kami dapat mengoordinasikan beberapa fase eksekusi, menggunakan kembali instance Phaser untuk setiap fase program.

Informasi lebih lanjut dan contoh kerja tentang ini tersedia di sini.

3. Kesimpulan

Dalam artikel ikhtisar tingkat tinggi ini, kita telah berfokus pada berbagai utilitas yang tersedia dari paket java.util.concurrent .

Seperti biasa, kode sumber lengkap tersedia di GitHub.