Pengantar Thread Pools di Java

1. Perkenalan

Artikel ini membahas kumpulan thread di Java - dimulai dengan implementasi yang berbeda di perpustakaan Java standar dan kemudian melihat perpustakaan Guava Google.

2. Kolam Benang

Di Java, utas dipetakan ke utas tingkat sistem yang merupakan sumber daya sistem operasi. Jika Anda membuat utas secara tidak terkendali, Anda mungkin kehabisan sumber daya ini dengan cepat.

Peralihan konteks antar utas juga dilakukan oleh sistem operasi - untuk meniru paralelisme. Pandangan sederhananya adalah - semakin banyak utas yang Anda buat, semakin sedikit waktu yang dihabiskan setiap utas untuk melakukan pekerjaan sebenarnya.

Pola Thread Pool membantu menghemat sumber daya dalam aplikasi multithread, dan juga memuat paralelisme dalam batas yang telah ditentukan sebelumnya.

Saat Anda menggunakan kumpulan utas, Anda menulis kode serentak dalam bentuk tugas paralel dan mengirimkannya untuk dieksekusi ke sebuah instance kumpulan utas . Instance ini mengontrol beberapa utas yang digunakan kembali untuk menjalankan tugas-tugas ini.

Pola ini memungkinkan Anda untuk mengontrol jumlah utas yang dibuat aplikasi , siklus hidupnya, serta menjadwalkan eksekusi tugas dan menyimpan tugas yang masuk dalam antrian.

3. Thread Pools di Jawa

3.1. Executors , Executor dan ExecutorService

Kelas helper Executors berisi beberapa metode untuk pembuatan instance kumpulan thread yang telah dikonfigurasi sebelumnya untuk Anda. Kelas-kelas itu adalah tempat yang baik untuk memulai - gunakan jika Anda tidak perlu menerapkan penyesuaian khusus apa pun.

The Pelaksana dan ExecutorService interface yang digunakan untuk bekerja dengan berbagai implementasi kolam thread di Jawa. Biasanya, Anda harus menjaga kode Anda dipisahkan dari implementasi sebenarnya dari kumpulan utas dan menggunakan antarmuka ini di seluruh aplikasi Anda.

The Pelaksana antarmuka memiliki satu mengeksekusi metode untuk mengirimkan Runnable contoh untuk eksekusi.

Berikut adalah contoh singkat tentang bagaimana Anda dapat menggunakan Executors API untuk memperoleh instance Executor yang didukung oleh satu kumpulan thread dan antrian tak terbatas untuk menjalankan tugas secara berurutan. Di sini, kami menjalankan satu tugas yang hanya mencetak " Hello World " di layar. Tugas tersebut diajukan sebagai lambda (fitur Java 8) yang disimpulkan menjadi Runnable .

Executor executor = Executors.newSingleThreadExecutor(); executor.execute(() -> System.out.println("Hello World"));

The ExecutorService antarmuka mengandung sejumlah besar metode untuk mengontrol kemajuan tugas dan mengelola penghentian layanan . Dengan menggunakan antarmuka ini, Anda dapat mengirimkan tugas untuk dieksekusi dan juga mengontrol eksekusinya menggunakan instance Future yang dikembalikan .

Pada contoh berikut , kita membuat sebuah ExecutorService , menyerahkan tugas dan kemudian menggunakan kembali Future 's get metode untuk menunggu sampai tugas yang diajukan adalah selesai dan nilai dikembalikan:

ExecutorService executorService = Executors.newFixedThreadPool(10); Future future = executorService.submit(() -> "Hello World"); // some operations String result = future.get();

Tentu saja, dalam skenario kehidupan nyata, Anda biasanya tidak ingin langsung memanggil future.get () tetapi tunda panggilan sampai Anda benar-benar membutuhkan nilai komputasi.

The mengirimkan metode kelebihan beban untuk mengambil baik Runnable atau Callable keduanya merupakan interface fungsional dan dapat disahkan sebagai lambdas (dimulai dengan Java 8).

Metode tunggal Runnable tidak memunculkan pengecualian dan tidak mengembalikan nilai. The Callable antarmuka mungkin lebih nyaman, karena memungkinkan kita untuk membuang pengecualian dan mengembalikan nilai.

Terakhir - untuk membiarkan compiler menyimpulkan tipe Callable , cukup kembalikan nilai dari lambda.

Untuk lebih banyak contoh penggunaan antarmuka ExecutorService dan masa depan, lihat "Panduan untuk Java ExecutorService".

3.2. ThreadPoolExecutor

The ThreadPoolExecutor adalah thread implementasi renang extensible dengan banyak parameter dan kait untuk fine-tuning.

Parameter konfigurasi utama yang akan kita diskusikan di sini adalah: corePoolSize , maximumPoolSize , dan keepAliveTime .

Pool terdiri dari sejumlah thread inti tetap yang disimpan di dalam sepanjang waktu, dan beberapa thread berlebih yang mungkin muncul dan kemudian dihentikan saat tidak diperlukan lagi. The corePoolSize parameter jumlah benang inti yang akan dipakai dan disimpan di kolam renang. Ketika tugas baru masuk, jika semua utas inti sibuk dan antrian internal penuh, maka kumpulan diizinkan untuk berkembang hingga maximumPoolSize .

The KeepAliveTime parameter interval waktu yang benang yang berlebihan (dipakai lebih dari corePoolSize ) diperbolehkan untuk eksis di negara menganggur. Secara default, ThreadPoolExecutor hanya mempertimbangkan utas non-inti untuk dihapus. Untuk menerapkan kebijakan penghapusan yang sama ke utas inti, kita dapat menggunakan metode allowCoreThreadTimeOut (true) .

Parameter ini mencakup berbagai kasus penggunaan, tetapi konfigurasi yang paling umum ditentukan sebelumnya dalam metode statis Pelaksana .

Misalnya , metode newFixedThreadPool membuat ThreadPoolExecutor dengan nilai parameter corePoolSize dan maximumPoolSize yang sama serta keepAliveTime nol . Ini berarti bahwa jumlah utas di kumpulan utas ini selalu sama:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); assertEquals(2, executor.getPoolSize()); assertEquals(1, executor.getQueue().size());

Dalam contoh di atas, kami membuat instance ThreadPoolExecutor dengan jumlah thread tetap 2. Ini berarti bahwa jika jumlah tugas yang berjalan secara bersamaan kurang atau sama dengan dua di sepanjang waktu, maka tugas tersebut akan segera dijalankan. Jika tidak, beberapa tugas ini mungkin akan dimasukkan ke dalam antrian untuk menunggu giliran .

Kami membuat tiga tugas yang Dapat Dipanggil yang meniru pekerjaan berat dengan tidur selama 1000 milidetik. Dua tugas pertama akan dijalankan sekaligus, dan yang ketiga harus menunggu dalam antrian. Kita bisa memverifikasinya dengan memanggil metode getPoolSize () dan getQueue (). Size () segera setelah mengirimkan tugas.

ThreadPoolExecutor lain yang telah dikonfigurasi sebelumnya dapat dibuat dengan metode Executors.newCachedThreadPool () . Metode ini tidak menerima sejumlah untaian sama sekali. The corePoolSize sebenarnya diatur ke 0, dan maximumPoolSize diatur untuk Integer.MAX_VALUE misalnya ini. The KeepAliveTime adalah 60 detik untuk yang satu ini.

Nilai parameter ini berarti bahwa kumpulan utas yang di -cache dapat tumbuh tanpa batas untuk mengakomodasi sejumlah tugas yang dikirimkan . Namun jika tidak diperlukan lagi, utas akan dibuang setelah tidak aktif selama 60 detik. Kasus penggunaan yang khas adalah ketika Anda memiliki banyak tugas berumur pendek dalam aplikasi Anda.

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); assertEquals(3, executor.getPoolSize()); assertEquals(0, executor.getQueue().size());

Ukuran antrian dalam contoh di atas akan selalu nol karena secara internal instance SynchronousQueue digunakan. Dalam SynchronousQueue , pasangan operasi penyisipan dan penghapusan selalu terjadi secara bersamaan, sehingga antrean tidak pernah benar-benar berisi apa pun.

The Executors.newSingleThreadExecutor() API creates another typical form of ThreadPoolExecutor containing a single thread. The single thread executor is ideal for creating an event loop. The corePoolSize and maximumPoolSize parameters are equal to 1, and the keepAliveTime is zero.

Tasks in the above example will be executed sequentially, so the flag value will be 2 after the task's completion:

AtomicInteger counter = new AtomicInteger(); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> { counter.set(1); }); executor.submit(() -> { counter.compareAndSet(1, 2); });

Additionally, this ThreadPoolExecutor is decorated with an immutable wrapper, so it cannot be reconfigured after creation. Note that also this is the reason we cannot cast it to a ThreadPoolExecutor.

3.3. ScheduledThreadPoolExecutor

The ScheduledThreadPoolExecutor extends the ThreadPoolExecutor class and also implements the ScheduledExecutorService interface with several additional methods:

  • schedule method allows to execute a task once after a specified delay;
  • scheduleAtFixedRate method allows to execute a task after a specified initial delay and then execute it repeatedly with a certain period; the period argument is the time measured between the starting times of the tasks, so the execution rate is fixed;
  • scheduleWithFixedDelay method is similar to scheduleAtFixedRate in that it repeatedly executes the given task, but the specified delay is measured between the end of the previous task and the start of the next; the execution rate may vary depending on the time it takes to execute any given task.

The Executors.newScheduledThreadPool() method is typically used to create a ScheduledThreadPoolExecutor with a given corePoolSize, unbounded maximumPoolSize and zero keepAliveTime. Here's how to schedule a task for execution in 500 milliseconds:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); executor.schedule(() -> { System.out.println("Hello World"); }, 500, TimeUnit.MILLISECONDS);

The following code shows how to execute a task after 500 milliseconds delay and then repeat it every 100 milliseconds. After scheduling the task, we wait until it fires three times using the CountDownLatch lock, then cancel it using the Future.cancel() method.

CountDownLatch lock = new CountDownLatch(3); ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); ScheduledFuture future = executor.scheduleAtFixedRate(() -> { System.out.println("Hello World"); lock.countDown(); }, 500, 100, TimeUnit.MILLISECONDS); lock.await(1000, TimeUnit.MILLISECONDS); future.cancel(true);

3.4. ForkJoinPool

ForkJoinPool is the central part of the fork/join framework introduced in Java 7. It solves a common problem of spawning multiple tasks in recursive algorithms. Using a simple ThreadPoolExecutor, you will run out of threads quickly, as every task or subtask requires its own thread to run.

In a fork/join framework, any task can spawn (fork) a number of subtasks and wait for their completion using the join method. The benefit of the fork/join framework is that it does not create a new thread for each task or subtask, implementing the Work Stealing algorithm instead. This framework is thoroughly described in the article “Guide to the Fork/Join Framework in Java”

Let’s look at a simple example of using ForkJoinPool to traverse a tree of nodes and calculate the sum of all leaf values. Here’s a simple implementation of a tree consisting of a node, an int value and a set of child nodes:

static class TreeNode { int value; Set children; TreeNode(int value, TreeNode... children) { this.value = value; this.children = Sets.newHashSet(children); } }

Now if we want to sum all values in a tree in parallel, we need to implement a RecursiveTask interface. Each task receives its own node and adds its value to the sum of values of its children. To calculate the sum of children values, task implementation does the following:

  • streams the children set,
  • maps over this stream, creating a new CountingTask for each element,
  • executes each subtask by forking it,
  • collects the results by calling the join method on each forked task,
  • sums the results using the Collectors.summingInt collector.
public static class CountingTask extends RecursiveTask { private final TreeNode node; public CountingTask(TreeNode node) { this.node = node; } @Override protected Integer compute() { return node.value + node.children.stream() .map(childNode -> new CountingTask(childNode).fork()) .collect(Collectors.summingInt(ForkJoinTask::join)); } }

The code to run the calculation on an actual tree is very simple:

TreeNode tree = new TreeNode(5, new TreeNode(3), new TreeNode(2, new TreeNode(2), new TreeNode(8))); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); int sum = forkJoinPool.invoke(new CountingTask(tree));

4. Thread Pool's Implementation in Guava

Guava is a popular Google library of utilities. It has many useful concurrency classes, including several handy implementations of ExecutorService. The implementing classes are not accessible for direct instantiation or subclassing, so the only entry point for creating their instances is the MoreExecutors helper class.

4.1. Adding Guava as a Maven Dependency

Add the following dependency to your Maven pom file to include the Guava library to your project. You can find the latest version of Guava library in the Maven Central repository:

 com.google.guava guava 19.0 

4.2. Direct Executor and Direct Executor Service

Sometimes you want to execute the task either in the current thread or in a thread pool, depending on some conditions. You would prefer to use a single Executor interface and just switch the implementation. Although it is not so hard to come up with an implementation of Executor or ExecutorService that executes the tasks in the current thread, it still requires writing some boilerplate code.

Gladly, Guava provides predefined instances for us.

Here's an example that demonstrates the execution of a task in the same thread. Although the provided task sleeps for 500 milliseconds, it blocks the current thread, and the result is available immediately after the execute call is finished:

Executor executor = MoreExecutors.directExecutor(); AtomicBoolean executed = new AtomicBoolean(); executor.execute(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } executed.set(true); }); assertTrue(executed.get());

The instance returned by the directExecutor() method is actually a static singleton, so using this method does not provide any overhead on object creation at all.

You should prefer this method to the MoreExecutors.newDirectExecutorService() because that API creates a full-fledged executor service implementation on every call.

4.3. Exiting Executor Services

Another common problem is shutting down the virtual machine while a thread pool is still running its tasks. Even with a cancellation mechanism in place, there is no guarantee that the tasks will behave nicely and stop their work when the executor service shuts down. This may cause JVM to hang indefinitely while the tasks keep doing their work.

To solve this problem, Guava introduces a family of exiting executor services. They are based on daemon threads that terminate together with the JVM.

These services also add a shutdown hook with the Runtime.getRuntime().addShutdownHook() method and prevent the VM from terminating for a configured amount of time before giving up on hung tasks.

In the following example, we're submitting the task that contains an infinite loop, but we use an exiting executor service with a configured time of 100 milliseconds to wait for the tasks upon VM termination. Without the exitingExecutorService in place, this task would cause the VM to hang indefinitely:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); ExecutorService executorService = MoreExecutors.getExitingExecutorService(executor, 100, TimeUnit.MILLISECONDS); executorService.submit(() -> { while (true) { } });

4.4. Listening Decorators

Listening decorators allow you to wrap the ExecutorService and receive ListenableFuture instances upon task submission instead of simple Future instances. The ListenableFuture interface extends Future and has a single additional method addListener. This method allows adding a listener that is called upon future completion.

Anda jarang ingin menggunakan metode ListenableFuture.addListener () secara langsung, tetapi metode ini penting untuk sebagian besar metode helper di kelas utilitas Futures . Misalnya, dengan metode Futures.allAsList () Anda dapat menggabungkan beberapa instance ListenableFuture dalam satu ListenableFuture yang diselesaikan setelah berhasil menyelesaikan semua futures yang digabungkan:

ExecutorService executorService = Executors.newCachedThreadPool(); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService); ListenableFuture future1 = listeningExecutorService.submit(() -> "Hello"); ListenableFuture future2 = listeningExecutorService.submit(() -> "World"); String greeting = Futures.allAsList(future1, future2).get() .stream() .collect(Collectors.joining(" ")); assertEquals("Hello World", greeting);

5. Kesimpulan

Pada artikel ini, kita telah membahas pola Thread Pool dan implementasinya di perpustakaan Java standar dan di perpustakaan Guava Google.

Kode sumber artikel tersedia di GitHub.