Pemrosesan Batch Java EE 7

1. Perkenalan

Bayangkan kita harus menyelesaikan tugas secara manual seperti memproses slip gaji, menghitung bunga, dan menghasilkan tagihan. Itu akan menjadi sangat membosankan, rawan kesalahan dan daftar tugas manual yang tidak pernah berakhir!

Dalam tutorial ini, kita akan melihat Java Batch Processing (JSR 352), bagian dari platform Jakarta EE, dan spesifikasi yang bagus untuk mengotomatiskan tugas-tugas seperti ini. Ini menawarkan pengembang aplikasi model untuk mengembangkan sistem pemrosesan batch yang kuat sehingga mereka dapat fokus pada logika bisnis.

2. Ketergantungan Maven

Karena JSR 352 hanyalah sebuah spesifikasi, kita perlu menyertakan API dan implementasinya, seperti jberet :

 javax.batch javax.batch-api 1.0.1   org.jberet jberet-core 1.0.2.Final   org.jberet jberet-support 1.0.2.Final   org.jberet jberet-se 1.0.2.Final 

Kami juga akan menambahkan database dalam memori sehingga kami dapat melihat beberapa skenario yang lebih realistis.

3. Konsep Utama

JSR 352 memperkenalkan beberapa konsep, yang dapat kita lihat seperti ini:

Pertama-tama mari kita definisikan setiap bagian:

  • Mulai dari kiri, kami memiliki JobOperator . Ini mengelola semua aspek pemrosesan pekerjaan seperti memulai, menghentikan, dan memulai kembali
  • Selanjutnya, kami memiliki Pekerjaan . Pekerjaan adalah kumpulan langkah yang logis; itu merangkum seluruh proses batch
  • Sebuah pekerjaan akan berisi antara 1 dan n Langkah s. Setiap langkah adalah unit kerja yang independen dan berurutan. Sebuah langkah terdiri dari membaca input, memproses input itu, dan menulis output
  • Dan yang terakhir, namun tidak kalah pentingnya, kami memiliki JobRepository yang menyimpan informasi pekerjaan yang sedang berjalan. Ini membantu untuk melacak pekerjaan, status mereka, dan hasil penyelesaiannya

Langkah-langkahnya memiliki sedikit lebih banyak detail daripada ini, jadi mari kita lihat selanjutnya. Pertama, kita akan melihat langkah-langkah Chunk dan kemudian pada Batchlet s.

4. Membuat Chunk

Seperti yang dinyatakan sebelumnya, sepotong adalah semacam langkah . Kami akan sering menggunakan potongan untuk mengekspresikan operasi yang dilakukan berulang kali, katakanlah atas satu set item. Ini seperti operasi perantara dari Java Streams.

Saat mendeskripsikan potongan, kita perlu mengungkapkan dari mana mengambil barang, bagaimana memprosesnya, dan ke mana mengirimnya sesudahnya.

4.1. Membaca Item

Untuk membaca item, kita perlu mengimplementasikan ItemReader.

Dalam hal ini, kami akan membuat pembaca yang hanya akan mengeluarkan angka 1 hingga 10:

@Named public class SimpleChunkItemReader extends AbstractItemReader { private Integer[] tokens; private Integer count; @Inject JobContext jobContext; @Override public Integer readItem() throws Exception { if (count >= tokens.length) { return null; } jobContext.setTransientUserData(count); return tokens[count++]; } @Override public void open(Serializable checkpoint) throws Exception { tokens = new Integer[] { 1,2,3,4,5,6,7,8,9,10 }; count = 0; } }

Sekarang, kami hanya membaca dari status internal kelas di sini. Tapi, tentu saja, readItem dapat menarik dari database , dari sistem file, atau sumber eksternal lainnya.

Perhatikan bahwa kami menyimpan beberapa status internal ini menggunakan JobContext # setTransientUserData () yang akan berguna nanti.

Perhatikan juga parameter checkpoint . Kami juga akan mengambilnya lagi.

4.2. Memproses Item

Tentu saja, alasan kami melakukan chunking adalah karena kami ingin melakukan semacam operasi pada item kami!

Setiap kali kami mengembalikan null dari prosesor item, kami menjatuhkan item itu dari batch.

Jadi, katakanlah di sini bahwa kita hanya ingin menyimpan bilangan genap. Kita bisa menggunakan ItemProcessor yang menolak yang ganjil dengan mengembalikan null :

@Named public class SimpleChunkItemProcessor implements ItemProcessor { @Override public Integer processItem(Object t) { Integer item = (Integer) t; return item % 2 == 0 ? item : null; } }

processItem akan dipanggil sekali untuk setiap item yang dipancarkan ItemReader kami .

4.3. Item Menulis

Terakhir, pekerjaan akan memanggil ItemWriter sehingga kita dapat menulis item yang telah diubah:

@Named public class SimpleChunkWriter extends AbstractItemWriter { List processed = new ArrayList(); @Override public void writeItems(List items) throws Exception { items.stream().map(Integer.class::cast).forEach(processed::add); } } 

Berapa lama item ? Sebentar lagi, kami akan menentukan ukuran potongan, yang akan menentukan ukuran daftar yang dikirim ke writeItems .

4.4. Mendefinisikan Bagian dalam Pekerjaan

Sekarang kita menggabungkan semua ini dalam file XML menggunakan JSL atau Bahasa Spesifikasi Pekerjaan. Perhatikan bahwa kami akan mencantumkan pembaca, prosesor, chunker, dan juga ukuran chunk kami:

Ukuran chunk adalah seberapa sering kemajuan dalam chunk dilakukan ke repositori pekerjaan , yang penting untuk menjamin penyelesaian, jika bagian dari sistem gagal.

Kami harus menempatkan file ini di META-INF / batch-jobs untuk. file jar dan di WEB-INF / kelas / META-INF / batch-pekerjaan untuk file .war .

Kami memberi pekerjaan kami id "simpleChunk", jadi mari kita coba dalam pengujian unit.

Sekarang, pekerjaan dijalankan secara asinkron, yang membuatnya sulit untuk diuji. Dalam contoh, pastikan untuk memeriksa BatchTestHelper kami yang melakukan polling dan menunggu hingga pekerjaan selesai:

@Test public void givenChunk_thenBatch_completesWithSuccess() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleChunk", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobExecution = BatchTestHelper.keepTestAlive(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); } 

Jadi itulah yang dimaksud dengan potongan. Sekarang, mari kita lihat batchlets.

5. Membuat Batchlet

Tidak semuanya cocok dengan model iteratif. Misalnya, kita mungkin memiliki tugas yang hanya perlu kita panggil sekali, dijalankan sampai selesai, dan mengembalikan status keluar.

Kontrak untuk batchlet cukup sederhana:

@Named public class SimpleBatchLet extends AbstractBatchlet { @Override public String process() throws Exception { return BatchStatus.COMPLETED.toString(); } }

Seperti JSL:

Dan kita bisa mengujinya menggunakan pendekatan yang sama seperti sebelumnya:

@Test public void givenBatchlet_thenBatch_completeWithSuccess() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleBatchLet", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobExecution = BatchTestHelper.keepTestAlive(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

Jadi, kami telah melihat beberapa cara berbeda untuk menerapkan langkah-langkah.

Sekarang mari kita lihat mekanisme untuk menandai dan menjamin kemajuan.

6. Pos Pemeriksaan Kustom

Kegagalan pasti akan terjadi di tengah pekerjaan. Haruskah kita memulai semuanya dari awal, atau dapatkah kita memulai dari bagian terakhir yang kita tinggalkan?

Seperti namanya, pos pemeriksaan membantu kami menetapkan penanda secara berkala jika terjadi kegagalan.

Secara default, akhir pemrosesan potongan adalah pos pemeriksaan alami .

Namun, kami dapat menyesuaikannya dengan CheckpointAlgorithm kami sendiri :

@Named public class CustomCheckPoint extends AbstractCheckpointAlgorithm { @Inject JobContext jobContext; @Override public boolean isReadyToCheckpoint() throws Exception { int counterRead = (Integer) jobContext.getTransientUserData(); return counterRead % 5 == 0; } }

Ingat hitungan yang kita tempatkan di data transien sebelumnya? Di sini, kita bisa menariknya dengan JobContext # getTransientUserDatauntuk menyatakan bahwa kami ingin berkomitmen pada setiap 5 nomor yang diproses.

Tanpa ini, komit akan terjadi di akhir setiap bagian, atau dalam kasus kami, setiap angka ketiga.

And then, we match that up with the checkout-algorithm directive in our XML underneath our chunk:

Let's test the code, again noting that some of the boilerplate steps are hidden away in BatchTestHelper:

@Test public void givenChunk_whenCustomCheckPoint_thenCommitCountIsThree() throws Exception { // ... start job and wait for completion jobOperator.getStepExecutions(executionId) .stream() .map(BatchTestHelper::getCommitCount) .forEach(count -> assertEquals(3L, count.longValue())); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

So, we might be expecting a commit count of 2 since we have ten items and configured the commits to be every 5th item. But, the framework does one more final read commit at the end to ensure everything has been processed, which is what brings us up to 3.

Next, let's look at how to handle errors.

7. Exception Handling

By default, the job operator will mark our job as FAILED in case of an exception.

Let's change our item reader to make sure that it fails:

@Override public Integer readItem() throws Exception { if (tokens.hasMoreTokens()) { String tempTokenize = tokens.nextToken(); throw new RuntimeException(); } return null; }

And then test:

@Test public void whenChunkError_thenBatch_CompletesWithFailed() throws Exception { // ... start job and wait for completion assertEquals(jobExecution.getBatchStatus(), BatchStatus.FAILED); }

But, we can override this default behavior in a number of ways:

  • skip-limit specifies the number of exceptions this step will ignore before failing
  • retry-limit specifies the number of times the job operator should retry the step before failing
  • skippable-exception-class specifies a set of exceptions that chunk processing will ignore

So, we can edit our job so that it ignores RuntimeException, as well as a few others, just for illustration:

And now our code will pass:

@Test public void givenChunkError_thenErrorSkipped_CompletesWithSuccess() throws Exception { // ... start job and wait for completion jobOperator.getStepExecutions(executionId).stream() .map(BatchTestHelper::getProcessSkipCount) .forEach(skipCount -> assertEquals(1L, skipCount.longValue())); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8. Executing Multiple Steps

We mentioned earlier that a job can have any number of steps, so let's see that now.

8.1. Firing the Next Step

By default, each step is the last step in the job.

In order to execute the next step within a batch job, we'll have to explicitly specify by using the next attribute within the step definition:

If we forget this attribute, then the next step in sequence will not get executed.

And we can see what this looks like in the API:

@Test public void givenTwoSteps_thenBatch_CompleteWithSuccess() throws Exception { // ... start job and wait for completion assertEquals(2 , jobOperator.getStepExecutions(executionId).size()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8.2. Flows

A sequence of steps can also be encapsulated into a flow. When the flow is finished, it is the entire flow that transitions to the execution element. Also, elements inside the flow can't transition to elements outside the flow.

We can, say, execute two steps inside a flow, and then have that flow transition to an isolated step:

And we can still see each step execution independently:

@Test public void givenFlow_thenBatch_CompleteWithSuccess() throws Exception { // ... start job and wait for completion assertEquals(3, jobOperator.getStepExecutions(executionId).size()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8.3. Decisions

We also have if/else support in the form of decisions. Decisions provide a customized way of determining a sequence among steps, flows, and splits.

Like steps, it works on transition elements such as next which can direct or terminate job execution.

Let's see how the job can be configured:

Any decision element needs to be configured with a class that implements Decider. Its job is to return a decision as a String.

Each next inside decision is like a case in a switch statement.

8.4. Splits

Splits are handy since they allow us to execute flows concurrently:

Of course, this means that the order isn't guaranteed.

Let's confirm that they still all get run. The flow steps will be performed in an arbitrary order, but the isolated step will always be last:

@Test public void givenSplit_thenBatch_CompletesWithSuccess() throws Exception { // ... start job and wait for completion List stepExecutions = jobOperator.getStepExecutions(executionId); assertEquals(3, stepExecutions.size()); assertEquals("splitJobSequenceStep3", stepExecutions.get(2).getStepName()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

9. Partitioning a Job

We can also consume the batch properties within our Java code which have been defined in our job.

They can be scoped at three levels – the job, the step, and the batch-artifact.

Let's see some examples of how they consumed.

When we want to consume the properties at job level:

@Inject JobContext jobContext; ... jobProperties = jobContext.getProperties(); ...

This can be consumed at a step level as well:

@Inject StepContext stepContext; ... stepProperties = stepContext.getProperties(); ...

When we want to consume the properties at batch-artifact level:

@Inject @BatchProperty(name = "name") private String nameString;

This comes in handy with partitions.

See, with splits, we can run flows concurrently. But we can also partition a step into n sets of items or set separate inputs, allowing us another way to split up the work across multiple threads.

To comprehend the segment of work each partition should do, we can combine properties with partitions:

10. Stop and Restart

Now, that's it for defining jobs. Now let's talk for a minute about managing them.

We've already seen in our unit tests that we can get an instance of JobOperator from BatchRuntime:

JobOperator jobOperator = BatchRuntime.getJobOperator();

And then, we can start the job:

Long executionId = jobOperator.start("simpleBatchlet", new Properties());

However, we can also stop the job:

jobOperator.stop(executionId);

And lastly, we can restart the job:

executionId = jobOperator.restart(executionId, new Properties());

Let's see how we can stop a running job:

@Test public void givenBatchLetStarted_whenStopped_thenBatchStopped() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleBatchLet", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobOperator.stop(executionId); jobExecution = BatchTestHelper.keepTestStopped(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED); }

And if a batch is STOPPED, then we can restart it:

@Test public void givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess() { // ... start and stop the job assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED); executionId = jobOperator.restart(jobExecution.getExecutionId(), new Properties()); jobExecution = BatchTestHelper.keepTestAlive(jobOperator.getJobExecution(executionId)); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

11. Fetching Jobs

When a batch job is submitted then the batch runtime creates an instance of JobExecution to track it.

To obtain the JobExecution for an execution id, we can use the JobOperator#getJobExecution(executionId) method.

And, StepExecution provides helpful information for tracking a step's execution.

To obtain the StepExecution for an execution id, we can use the JobOperator#getStepExecutions(executionId) method.

And from that, we can get several metrics about the step via StepExecution#getMetrics:

@Test public void givenChunk_whenJobStarts_thenStepsHaveMetrics() throws Exception { // ... start job and wait for completion assertTrue(jobOperator.getJobNames().contains("simpleChunk")); assertTrue(jobOperator.getParameters(executionId).isEmpty()); StepExecution stepExecution = jobOperator.getStepExecutions(executionId).get(0); Map metricTest = BatchTestHelper.getMetricsMap(stepExecution.getMetrics()); assertEquals(10L, metricTest.get(Metric.MetricType.READ_COUNT).longValue()); assertEquals(5L, metricTest.get(Metric.MetricType.FILTER_COUNT).longValue()); assertEquals(4L, metricTest.get(Metric.MetricType.COMMIT_COUNT).longValue()); assertEquals(5L, metricTest.get(Metric.MetricType.WRITE_COUNT).longValue()); // ... and many more! }

12. Disadvantages

JSR 352 is powerful, though it is lacking in a number of areas:

  • Tampaknya ada kekurangan pembaca dan penulis yang dapat memproses format lain seperti JSON
  • Tidak ada dukungan obat generik
  • Partisi hanya mendukung satu langkah
  • API tidak menawarkan apa pun untuk mendukung penjadwalan (meskipun J2EE memiliki modul penjadwalan terpisah)
  • Karena sifatnya yang tidak sinkron, pengujian bisa menjadi tantangan
  • API-nya cukup bertele-tele

13. Kesimpulan

Dalam artikel ini, kami melihat JSR 352 dan mempelajari tentang chunks, batchlets, split, flow, dan banyak lagi. Namun, kami baru saja menyentuh permukaannya.

Seperti biasa, kode demo dapat ditemukan di GitHub.