Spring Batch - Tasklets vs Chunks

1. Perkenalan

Spring Batch menyediakan dua cara berbeda untuk mengimplementasikan pekerjaan: menggunakan tasklet dan chunks .

Dalam artikel ini, kita akan mempelajari cara mengonfigurasi dan menerapkan kedua metode menggunakan contoh kehidupan nyata yang sederhana.

2. Dependensi

Mari kita mulai dengan menambahkan dependensi yang diperlukan :

 org.springframework.batch spring-batch-core 4.2.0.RELEASE   org.springframework.batch spring-batch-test 4.2.0.RELEASE test 

Untuk mendapatkan versi terbaru pengujian inti-pegas dan pengujian pegas-pegas, silakan merujuk ke Maven Central.

3. Kasus Penggunaan Kami

Mari pertimbangkan file CSV dengan konten berikut:

Mae Hodges,10/22/1972 Gary Potter,02/22/1953 Betty Wise,02/17/1968 Wayne Rose,04/06/1977 Adam Caldwell,09/27/1995 Lucille Phillips,05/14/1992

Posisi pertama dari setiap baris mewakili nama seseorang dan posisi kedua mewakili tanggal lahirnya .

Kasus penggunaan kami adalah membuat file CSV lain yang berisi nama dan usia setiap orang :

Mae Hodges,45 Gary Potter,64 Betty Wise,49 Wayne Rose,40 Adam Caldwell,22 Lucille Phillips,25

Sekarang domain kita sudah jelas, mari kita lanjutkan dan membangun solusi menggunakan kedua pendekatan tersebut. Kami akan mulai dengan tasklet.

4. Pendekatan Tasklets

4.1. Pendahuluan dan Desain

Tasklet dimaksudkan untuk melakukan satu tugas dalam satu langkah. Pekerjaan kita akan terdiri dari beberapa langkah yang dijalankan satu demi satu. Setiap langkah harus melakukan hanya satu tugas yang ditentukan .

Pekerjaan kami akan terdiri dari tiga langkah:

  1. Membaca baris dari file CSV masukan.
  2. Hitung usia untuk setiap orang di file CSV masukan.
  3. Tuliskan nama dan usia setiap orang ke file CSV keluaran baru.

Sekarang setelah gambaran besarnya siap, mari buat satu kelas per langkah.

LinesReader akan bertugas membaca data dari file input:

public class LinesReader implements Tasklet { // ... }

LinesProcessor akan menghitung usia untuk setiap orang di file:

public class LinesProcessor implements Tasklet { // ... }

Terakhir, LinesWriter akan memiliki tanggung jawab untuk menulis nama dan usia ke file output:

public class LinesWriter implements Tasklet { // ... }

Pada titik ini, semua langkah kami menerapkan antarmuka Tasklet . Itu akan memaksa kita untuk mengimplementasikan metode eksekusinya :

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { // ... }

Metode ini adalah tempat kami akan menambahkan logika untuk setiap langkah. Sebelum memulai dengan kode itu, mari konfigurasikan pekerjaan kita.

4.2. Konfigurasi

Kita perlu menambahkan beberapa konfigurasi ke konteks aplikasi Spring . Setelah menambahkan deklarasi bean standar untuk kelas yang dibuat di bagian sebelumnya, kami siap untuk membuat definisi pekerjaan kami:

@Configuration @EnableBatchProcessing public class TaskletsConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean protected Step readLines() { return steps .get("readLines") .tasklet(linesReader()) .build(); } @Bean protected Step processLines() { return steps .get("processLines") .tasklet(linesProcessor()) .build(); } @Bean protected Step writeLines() { return steps .get("writeLines") .tasklet(linesWriter()) .build(); } @Bean public Job job() { return jobs .get("taskletsJob") .start(readLines()) .next(processLines()) .next(writeLines()) .build(); } // ... }

Ini berarti bahwa "taskletsJob" kami akan terdiri dari tiga langkah. Yang pertama ( readLines ) akan menjalankan tasklet yang didefinisikan di bean linesReader dan pindah ke langkah berikutnya: processLines. ProcessLines akan melakukan tasklet yang didefinisikan di bean linesProcessor dan pergi ke langkah terakhir: writeLines .

Alur pekerjaan kami ditentukan, dan kami siap menambahkan beberapa logika!

4.3. Model dan Utilitas

Karena kita akan memanipulasi garis dalam file CSV, kita akan membuat kelas Line:

public class Line implements Serializable { private String name; private LocalDate dob; private Long age; // standard constructor, getters, setters and toString implementation }

Harap dicatat bahwa Line mengimplementasikan Serializable. Itu karena Line akan bertindak sebagai DTO untuk mentransfer data antar langkah. Menurut Spring Batch, objek yang ditransfer antar langkah harus dapat diserialkan .

Di sisi lain, kita bisa mulai berpikir untuk membaca dan menulis baris.

Untuk itu, kami akan menggunakan OpenCSV:

 com.opencsv opencsv 4.1 

Cari versi OpenCSV terbaru di Maven Central.

Setelah OpenCSV disertakan, kami juga akan membuat kelas FileUtils . Ini akan memberikan metode untuk membaca dan menulis baris CSV:

public class FileUtils { public Line readLine() throws Exception { if (CSVReader == null) initReader(); String[] line = CSVReader.readNext(); if (line == null) return null; return new Line( line[0], LocalDate.parse( line[1], DateTimeFormatter.ofPattern("MM/dd/yyyy"))); } public void writeLine(Line line) throws Exception { if (CSVWriter == null) initWriter(); String[] lineStr = new String[2]; lineStr[0] = line.getName(); lineStr[1] = line .getAge() .toString(); CSVWriter.writeNext(lineStr); } // ... }

Perhatikan bahwa readLine bertindak sebagai pembungkus di atas metode readNext OpenCSV dan mengembalikan objek Line .

Dengan cara yang sama, writeLine membungkus writeNext OpenCSV yang menerima objek Line . Implementasi penuh dari kelas ini dapat ditemukan di Proyek GitHub.

Pada titik ini, kita siap untuk memulai setiap langkah implementasi.

4.4. LinesReader

Mari lanjutkan dan selesaikan kelas LinesReader kita :

public class LinesReader implements Tasklet, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesReader.class); private List lines; private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { lines = new ArrayList(); fu = new FileUtils( "taskletsvschunks/input/tasklets-vs-chunks.csv"); logger.debug("Lines Reader initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { Line line = fu.readLine(); while (line != null) { lines.add(line); logger.debug("Read line: " + line.toString()); line = fu.readLine(); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeReader(); stepExecution .getJobExecution() .getExecutionContext() .put("lines", this.lines); logger.debug("Lines Reader ended."); return ExitStatus.COMPLETED; } }

Metode eksekusi LinesReader membuat instance FileUtils melalui jalur file input. Lalu, tambahkan baris ke daftar sampai tidak ada baris lagi untuk dibaca .

Kelas kami juga mengimplementasikan StepExecutionListener yang menyediakan dua metode tambahan: beforeStep dan afterStep . Kami akan menggunakan metode tersebut untuk menginisialisasi dan menutup hal-hal sebelum dan sesudah menjalankan eksekusi .

Jika kita melihat kode afterStep , kita akan melihat baris di mana daftar hasil ( baris) diletakkan dalam konteks pekerjaan untuk membuatnya tersedia untuk langkah berikutnya:

stepExecution .getJobExecution() .getExecutionContext() .put("lines", this.lines);

Pada titik ini, langkah pertama kita telah memenuhi tanggung jawabnya: memuat baris CSV ke dalam Daftar di memori. Mari beralih ke langkah kedua dan memprosesnya.

4.5. LinesProcessor

LinesProcessor juga akan mengimplementasikan StepExecutionListener dan tentu saja, Tasklet . Itu berarti itu akan mengimplementasikan metode beforeStep , mengeksekusi dan afterStep juga:

public class LinesProcessor implements Tasklet, StepExecutionListener { private Logger logger = LoggerFactory.getLogger( LinesProcessor.class); private List lines; @Override public void beforeStep(StepExecution stepExecution) { ExecutionContext executionContext = stepExecution .getJobExecution() .getExecutionContext(); this.lines = (List) executionContext.get("lines"); logger.debug("Lines Processor initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { for (Line line : lines) { long age = ChronoUnit.YEARS.between( line.getDob(), LocalDate.now()); logger.debug("Calculated age " + age + " for line " + line.toString()); line.setAge(age); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.debug("Lines Processor ended."); return ExitStatus.COMPLETED; } }

Sangat mudah untuk memahami bahwa itu memuat daftar baris dari konteks pekerjaan dan menghitung usia setiap orang .

Tidak perlu meletakkan daftar hasil lain dalam konteks karena modifikasi terjadi pada objek yang sama yang berasal dari langkah sebelumnya.

Dan kami siap untuk langkah terakhir kami.

4.6. LinesWriter

Tugas LinesWriter adalah memeriksa daftar baris dan menulis nama dan usia ke file keluaran :

public class LinesWriter implements Tasklet, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesWriter.class); private List lines; private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { ExecutionContext executionContext = stepExecution .getJobExecution() .getExecutionContext(); this.lines = (List) executionContext.get("lines"); fu = new FileUtils("output.csv"); logger.debug("Lines Writer initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { for (Line line : lines) { fu.writeLine(line); logger.debug("Wrote line " + line.toString()); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeWriter(); logger.debug("Lines Writer ended."); return ExitStatus.COMPLETED; } }

We're done with our job's implementation! Let's create a test to run it and see the results.

4.7. Running the Job

To run the job, we'll create a test:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = TaskletsConfig.class) public class TaskletsTest { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test public void givenTaskletsJob_whenJobEnds_thenStatusCompleted() throws Exception { JobExecution jobExecution = jobLauncherTestUtils.launchJob(); assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); } }

ContextConfiguration annotation is pointing to the Spring context configuration class, that has our job definition.

We'll need to add a couple of extra beans before running the test:

@Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } @Bean public JobRepository jobRepository() throws Exception { MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean(); factory.setTransactionManager(transactionManager()); return (JobRepository) factory.getObject(); } @Bean public PlatformTransactionManager transactionManager() { return new ResourcelessTransactionManager(); } @Bean public JobLauncher jobLauncher() throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository()); return jobLauncher; }

Everything is ready! Go ahead and run the test!

After the job has finished, output.csv has the expected content and logs show the execution flow:

[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized. [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Gary Potter,02/22/1953] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Betty Wise,02/17/1968] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended. [main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized. [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line [Gary Potter,02/22/1953] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line [Betty Wise,02/17/1968] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended. [main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized. [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Gary Potter,02/22/1953,64] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Betty Wise,02/17/1968,49] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25] [main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.

That's it for Tasklets. Now we can move on to the Chunks approach.

5. Chunks Approach

5.1. Introduction and Design

As the name suggests, this approach performs actions over chunks of data. That is, instead of reading, processing and writing all the lines at once, it'll read, process and write a fixed amount of records (chunk) at a time.

Then, it'll repeat the cycle until there's no more data in the file.

As a result, the flow will be slightly different:

  1. While there're lines:
    • Do for X amount of lines:
      • Read one line
      • Process one line
    • Write X amount of lines.

So, we also need to create three beans for chunk oriented approach:

public class LineReader { // ... }
public class LineProcessor { // ... }
public class LinesWriter { // ... }

Before moving to implementation, let's configure our job.

5.2. Configuration

The job definition will also look different:

@Configuration @EnableBatchProcessing public class ChunksConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean public ItemReader itemReader() { return new LineReader(); } @Bean public ItemProcessor itemProcessor() { return new LineProcessor(); } @Bean public ItemWriter itemWriter() { return new LinesWriter(); } @Bean protected Step processLines(ItemReader reader, ItemProcessor processor, ItemWriter writer) { return steps.get("processLines"). chunk(2) .reader(reader) .processor(processor) .writer(writer) .build(); } @Bean public Job job() { return jobs .get("chunksJob") .start(processLines(itemReader(), itemProcessor(), itemWriter())) .build(); } }

In this case, there's only one step performing only one tasklet.

However, that tasklet defines a reader, a writer and a processor that will act over chunks of data.

Note that the commit interval indicates the amount of data to be processed in one chunk. Our job will read, process and write two lines at a time.

Now we're ready to add our chunk logic!

5.3. LineReader

LineReader will be in charge of reading one record and returning a Line instance with its content.

To become a reader, our class has to implement ItemReader interface:

public class LineReader implements ItemReader { @Override public Line read() throws Exception { Line line = fu.readLine(); if (line != null) logger.debug("Read line: " + line.toString()); return line; } }

The code is straightforward, it just reads one line and returns it. We'll also implement StepExecutionListener for the final version of this class:

public class LineReader implements ItemReader, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LineReader.class); private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv"); logger.debug("Line Reader initialized."); } @Override public Line read() throws Exception { Line line = fu.readLine(); if (line != null) logger.debug("Read line: " + line.toString()); return line; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeReader(); logger.debug("Line Reader ended."); return ExitStatus.COMPLETED; } }

It should be noticed that beforeStep and afterStep execute before and after the whole step respectively.

5.4. LineProcessor

LineProcessor follows pretty much the same logic than LineReader.

However, in this case, we'll implement ItemProcessor and its method process():

public class LineProcessor implements ItemProcessor { private Logger logger = LoggerFactory.getLogger(LineProcessor.class); @Override public Line process(Line line) throws Exception { long age = ChronoUnit.YEARS .between(line.getDob(), LocalDate.now()); logger.debug("Calculated age " + age + " for line " + line.toString()); line.setAge(age); return line; } }

The process() method takes an input line, processes it and returns an output line. Again, we'll also implement StepExecutionListener:

public class LineProcessor implements ItemProcessor, StepExecutionListener { private Logger logger = LoggerFactory.getLogger(LineProcessor.class); @Override public void beforeStep(StepExecution stepExecution) { logger.debug("Line Processor initialized."); } @Override public Line process(Line line) throws Exception { long age = ChronoUnit.YEARS .between(line.getDob(), LocalDate.now()); logger.debug( "Calculated age " + age + " for line " + line.toString()); line.setAge(age); return line; } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.debug("Line Processor ended."); return ExitStatus.COMPLETED; } }

5.5. LinesWriter

Unlike reader and processor, LinesWriter will write an entire chunk of lines so that it receives a List of Lines:

public class LinesWriter implements ItemWriter, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesWriter.class); private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { fu = new FileUtils("output.csv"); logger.debug("Line Writer initialized."); } @Override public void write(List lines) throws Exception { for (Line line : lines) { fu.writeLine(line); logger.debug("Wrote line " + line.toString()); } } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeWriter(); logger.debug("Line Writer ended."); return ExitStatus.COMPLETED; } }

LinesWriter code speaks for itself. And again, we're ready to test our job.

5.6. Running the Job

We'll create a new test, same as the one we created for the tasklets approach:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = ChunksConfig.class) public class ChunksTest { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test public void givenChunksJob_whenJobEnds_thenStatusCompleted() throws Exception { JobExecution jobExecution = jobLauncherTestUtils.launchJob(); assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); } }

After configuring ChunksConfig as explained above for TaskletsConfig, we're all set to run the test!

Once the job is done, we can see that output.csv contains the expected result again, and the logs describe the flow:

[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialized. [main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized. [main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized. [main] DEBUG o.b.t.chunks.LineReader - Read line: [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Gary Potter,02/22/1953] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line [Gary Potter,02/22/1953] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Gary Potter,02/22/1953,64] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Betty Wise,02/17/1968] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line [Betty Wise,02/17/1968] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Betty Wise,02/17/1968,49] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25] [main] DEBUG o.b.t.chunks.LineProcessor - Line Processor ended. [main] DEBUG o.b.t.chunks.LinesWriter - Line Writer ended. [main] DEBUG o.b.t.chunks.LineReader - Line Reader ended.

We have the same result and a different flow. Logs make evident how the job executes following this approach.

6. Conclusion

Konteks yang berbeda akan menunjukkan perlunya satu pendekatan atau yang lain. Sementara Tasklet terasa lebih alami untuk skenario 'satu tugas demi tugas yang lain', potongan memberikan solusi sederhana untuk menangani pembacaan paginasi atau situasi di mana kami tidak ingin menyimpan data dalam jumlah besar dalam memori.

Implementasi lengkap dari contoh ini dapat ditemukan di proyek GitHub .