To user Tasklet or Chunk in this scenario

459 views Asked by At

I have a job/task to read sub-folders/directory of a given folder/path. The path is dynamic, we get it from Controller. Currently, I have used Tasklet, there are 3 tasklets, one to read sub-directories, another to process it to prepare objects to save to DB and last one to write the processed data objects to a database. The folders can have any number of sub-folders.Currently, I have used this code :

 Path start = Paths.get("x:\\data\\");
    Stream<Path> stream = Files.walk(start, 1);
List<String> collect = stream
                .map(String::valueOf)
                .sorted()
                .collect(Collectors.toList());

To read all the sub folders at once. I followed this https://www.baeldung.com/spring-batch-tasklet-chunk example of Tasklet implementation for the purpose. Is this the right approach ? I also need to run the Job asynchronously with multi-threading. As there can be huge numbers of sub-folders, so there can be huge number of rowsorlist of data to process and write to the database.

Please suggest an appropriate approach. I am learning Spring Batch, have done few examples on file read/process/write too and used Chunk approach for this. But my job is to read sub-directories of a folder/path, so I cannot decide which approach to follow.

1

There are 1 answers

4
solujan On

I have a similar scenario: I need to read all the files from a folder, process and write in db, (Doc)

@Configuration
@EnableBatchProcessing
public class BatchConfig {

@Bean
public Job job(JobBuilderFactory jobBuilderFactory,
               Step masterStep) {
    return jobBuilderFactory.get("MainJob")
            .incrementer(new RunIdIncrementer())
            .flow(masterStep)
            .end()
            .build();
}

@Bean
public Step mainStep(StepBuilderFactory stepBuilderFactory,
                     JdbcBatchItemWriter<Transaction> writer,
                     ItemReader<String> reader,
                     TransactionItemProcessor processor) {
    return stepBuilderFactory.get("Main")
            .<String, Transaction>chunk(2)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            **.taskExecutor(jobTaskExecutor())**
            .listener(new ItemReaderListener())
            .build();
}

@Bean
public TaskExecutor jobTaskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(2);
    taskExecutor.setMaxPoolSize(10);
    taskExecutor.afterPropertiesSet();
    return taskExecutor;
}

@Bean
@StepScope
public ItemReader<String> reader(@Value("#{stepExecution}") StepExecution stepExecution) throws IOException {
    Path start = Paths.get("D:\\test");
    List<String> inputFile = Files.walk(start, 1)
            .map(String::valueOf)
            .sorted()
            .collect(Collectors.toList());
    return new IteratorItemReader<>(inputFile);
}

@Bean
@StepScope
public TransactionItemProcessor processor(@Value("#{stepExecution}") StepExecution stepExecution) {

    return new TransactionItemProcessor();
}

@Bean
@StepScope
public JdbcBatchItemWriter<Transaction> writer(DataSource dataSource) {

    return new JdbcBatchItemWriterBuilder<Transaction>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql("INSERT INTO transaction (id, date, type) VALUES (:id, :date, :type)")
            .dataSource(dataSource)
            .build();
}

}