Spring Batch menggunakan Partitioner

1. Ikhtisar

Dalam pengantar kami sebelumnya ke Spring Batch, kami memperkenalkan framework sebagai alat pemrosesan batch. Kami juga mempelajari detail konfigurasi dan implementasi untuk eksekusi tugas proses tunggal berutas tunggal.

Untuk mengimplementasikan pekerjaan dengan beberapa pemrosesan paralel, berbagai opsi disediakan. Di tingkat yang lebih tinggi, ada dua mode pemrosesan paralel:

  1. Proses Tunggal, multi-utas
  2. Multi-Proses

Dalam artikel singkat ini, kita akan membahas pembagian Langkah , yang dapat diterapkan untuk pekerjaan proses tunggal dan multi-proses.

2. Mempartisi Langkah

Spring Batch dengan pemartisian memberi kita fasilitas untuk membagi pelaksanaan Langkah :

Ringkasan Partisi

Gambar di atas menunjukkan implementasi Pekerjaan dengan Langkah yang dipartisi .

Ada Langkah yang disebut "Master", yang pelaksanaannya dibagi menjadi beberapa langkah "Budak". Budak ini bisa menggantikan tuannya, dan hasilnya tetap tidak akan berubah. Baik master dan slave adalah contoh dari Langkah . Budak dapat berupa layanan jarak jauh atau hanya menjalankan utas secara lokal.

Jika diperlukan, kita bisa meneruskan data dari master ke slave. Meta data (yaitu JobRepository ), memastikan bahwa setiap slave hanya dijalankan sekali dalam satu eksekusi Job.

Berikut adalah diagram urutan yang menunjukkan bagaimana semuanya bekerja:

Langkah Partisi

Seperti yang ditunjukkan, PartitionStep mendorong eksekusi. The PartitionHandler bertanggung jawab untuk membelah karya “Master” ke dalam “Budak”. Langkah paling kanan adalah budak.

3. The Maven POM

Dependensi Maven sama dengan yang disebutkan di artikel sebelumnya. Yaitu Spring Core, Spring Batch dan ketergantungan untuk database (dalam kasus kami, SQLite ).

4. Konfigurasi

Dalam artikel pengantar kami, kami melihat contoh konversi beberapa data keuangan dari CSV ke file XML. Mari kita memperluas contoh yang sama.

Di sini, kami akan mengonversi informasi keuangan dari 5 file CSV ke file XML yang sesuai, menggunakan implementasi multi-utas.

Kita dapat mencapai ini menggunakan satu partisi Pekerjaan dan Langkah . Kami akan memiliki lima utas, satu untuk setiap file CSV.

Pertama-tama, mari buat Pekerjaan:

@Bean(name = "partitionerJob") public Job partitionerJob() throws UnexpectedInputException, MalformedURLException, ParseException { return jobs.get("partitioningJob") .start(partitionStep()) .build(); }

Seperti yang bisa kita lihat, Job ini dimulai dengan PartitioningStep . Ini adalah langkah utama kami yang akan dibagi menjadi berbagai langkah budak:

@Bean public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("partitionStep") .partitioner("slaveStep", partitioner()) .step(slaveStep()) .taskExecutor(taskExecutor()) .build(); }

Di sini, kita akan membuat PartitioningStep menggunakan StepBuilderFactory . Untuk itu, kami perlu memberikan informasi tentang SlaveSteps dan Partitioner .

The partitioner adalah sebuah antarmuka yang menyediakan fasilitas untuk mendefinisikan seperangkat nilai-nilai masukan untuk setiap budak. Dengan kata lain, logika untuk membagi tugas menjadi utas masing-masing ada di sini.

Mari buat implementasinya, yang disebut CustomMultiResourcePartitioner , di mana kita akan meletakkan nama file input dan output di ExecutionContext untuk diteruskan ke setiap langkah slave:

public class CustomMultiResourcePartitioner implements Partitioner { @Override public Map partition(int gridSize) { Map map = new HashMap(gridSize); int i = 0, k = 1; for (Resource resource : resources) { ExecutionContext context = new ExecutionContext(); Assert.state(resource.exists(), "Resource does not exist: " + resource); context.putString(keyName, resource.getFilename()); context.putString("opFileName", "output"+k+++".xml"); map.put(PARTITION_KEY + i, context); i++; } return map; } }

Kami juga akan membuat kacang untuk kelas ini, di mana kami akan memberikan direktori sumber untuk file input:

@Bean public CustomMultiResourcePartitioner partitioner() { CustomMultiResourcePartitioner partitioner = new CustomMultiResourcePartitioner(); Resource[] resources; try { resources = resoursePatternResolver .getResources("file:src/main/resources/input/*.csv"); } catch (IOException e) { throw new RuntimeException("I/O problems when resolving" + " the input file pattern.", e); } partitioner.setResources(resources); return partitioner; }

Kami akan mendefinisikan langkah budak, sama seperti langkah lainnya dengan pembaca dan penulis. Pembaca dan penulis akan sama seperti yang kita lihat di contoh pengantar kita, kecuali mereka akan menerima parameter nama file dari StepExecutionContext.

Perhatikan bahwa kacang ini perlu dilingkupi secara bertahap sehingga mereka akan dapat menerima parameter stepExecutionContext , di setiap langkah. Jika mereka tidak termasuk dalam cakupan langkah, kacang mereka akan dibuat pada awalnya, dan tidak akan menerima nama file pada tingkat langkah:

@StepScope @Bean public FlatFileItemReader itemReader( @Value("#{stepExecutionContext[fileName]}") String filename) throws UnexpectedInputException, ParseException { FlatFileItemReader reader = new FlatFileItemReader(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); String[] tokens = {"username", "userid", "transactiondate", "amount"}; tokenizer.setNames(tokens); reader.setResource(new ClassPathResource("input/" + filename)); DefaultLineMapper lineMapper = new DefaultLineMapper(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper); return reader; } 
@Bean @StepScope public ItemWriter itemWriter(Marshaller marshaller, @Value("#{stepExecutionContext[opFileName]}") String filename) throws MalformedURLException { StaxEventItemWriter itemWriter = new StaxEventItemWriter(); itemWriter.setMarshaller(marshaller); itemWriter.setRootTagName("transactionRecord"); itemWriter.setResource(new ClassPathResource("xml/" + filename)); return itemWriter; }

Saat menyebutkan pembaca dan penulis di langkah slave, kita dapat meneruskan argumen sebagai null, karena nama file ini tidak akan digunakan, karena mereka akan menerima nama file dari stepExecutionContext :

@Bean public Step slaveStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("slaveStep").chunk(1) .reader(itemReader(null)) .writer(itemWriter(marshaller(), null)) .build(); }

5. Kesimpulan

Dalam tutorial ini, kami membahas cara mengimplementasikan pekerjaan dengan pemrosesan paralel menggunakan Spring Batch.

Seperti biasa, implementasi lengkap untuk contoh ini tersedia di GitHub.