Restarting a FAILED job is not processing the failed chunk data again

42 views Asked by At

Providing a sample application to reproduce the issue, https://github.com/PSHREYASHOLLA/SamplebatchApplication.

Its a maven project, so you can call mvn install, it will create \target\SamplebatchApplication-0.0.1-SNAPSHOT.jar. Now you can start it like any springboot app(Liquibase enabled), java -jar SamplebatchApplication-0.0.1-SNAPSHOT.jar.

If you see application.properties file we are pointing it to a postgres database. All our batch configurationhttps://github.com/PSHREYASHOLLA/SamplebatchApplication/blob/main/src/main/java/com/example/postgresql/model/FEBPDBConfig.java.

Please start the batch process by calling rest post API , http://localhost:8080/batch/trigger-or-resume-application-batch-job JSON body { "appRestartJobExecutionId": "" } If we call this with empty appRestartJobExecutionId, the flow is like below, com.example.postgresql.controller.BatchController.triggerOrResumeApplicationBatchJobByB2E() --->com.example.postgresql.model.FebpApplicationJobServiceImpl.triggerApplicationBatchJob() --> We do JobLauncher.run(). Now this job will read 50 records from febp_emp_detail_test as part of reader and as part of writer writes the updated records to febp_emp_tax_detail_test. This is a happy flow.

Now if you call the above API, it will process partial data and do partial commit in table for emp0001 to emp0030 will happen into febp_emp_tax_detail_test. While processing the emp0032 record it will fail, as you can see in https://github.com/PSHREYASHOLLA/SamplebatchApplication/blob/main/src/main/java/com/example/postgresql/model/EmployeeTaxCalculationProcessor.java

private static int counter=1;


if(item.getEmpId().equals("emp0032"))
{
    if(counter==1)
    {
        counter++;
        throw new Exception();
    }
}

The counter will be 1 and it will throw an exception for record emp0032 and job goes to FAILED state. Now say I restart the server and call the same post API with failed job execution ID it will now call om.example.postgresql.controller.BatchController.triggerOrResumeApplicationBatchJobByB2E() --->com.example.postgresql.model.FebpApplicationJobServiceImpl.resumeApplicationBatchJob() ---> jobOperator.restart(failedBatchExecutionId);

The job starts processing records from next chunk, i.e emp0036 and so on. It is not processing the failed chunk again? The idea of restart here is to re-execute the same job again for unprocessed data. This is a single thread job.

UPDATE : 20/03/2024 This is my step definition @Bean public Step step1() { return new StepBuilder("step1", jobRepository) .partitioner(slaveStep().getName(), partitioner()) .step(slaveStep()) .gridSize(febpEmployeeTaxCalculationBatchProperties.getTaxCalculationStep1PartitionGridSize()) .taskExecutor(actStmntTaskExecutor()) .build(); }

    // slave step
    @Bean
    public Step slaveStep() 
    {
        
        try {
            return new StepBuilder("slaveStep", jobRepository)
                    .<EmployeeDetail, EmployeeTaxDetail>chunk(febpEmployeeTaxCalculationBatchProperties.getTaxCalculationStep1ReaderPageSize(),transactionManager)
                    .reader(pagingItemReader(null,null))
                    .processor(processor())
                    .writer(customerItemWriter())
                .taskExecutor(actStmntTaskExecutor())
                    .build();
        } catch (Exception ex) {
            throw new RuntimeException("Error creating slave step: " + ex.getMessage());
        }
    }
 /**
   * Act stmnt task executor.
   *
   * @return the simple async task executor
   */
  public SimpleAsyncTaskExecutor actStmntTaskExecutor() {
    SimpleAsyncTaskExecutor acctStmtTaskExecuter = new SimpleAsyncTaskExecutor();
    acctStmtTaskExecuter.setConcurrencyLimit(febpEmployeeTaxCalculationBatchProperties.getTaxCalculationStep1TaskExecuterThreadConcurrencyLimit());
    acctStmtTaskExecuter.setThreadPriority(febpEmployeeTaxCalculationBatchProperties.getTaxCalculationStep1TaskExecuterThreadPriority());
    acctStmtTaskExecuter.setThreadNamePrefix("FEBP_TAX_CALCULATION_GEN");
    return acctStmtTaskExecuter;
  }

So here,

  1. You told about not using TaskExecuter. So should we not use TaskExecuter as mentioned above in step1() and slaveStep()?

  2. What should be the chunk size with respect to gridSize or readers pagesize and fetchsize.? As I see if chunk size is set to low values, restart is not re-writting failed data.

1

There are 1 answers

6
Mahmoud Ben Hassine On BEST ANSWER

It should restart from the last correctly processed chunk (ie the failed chunk will be reprocessed). The way the reader is declared in the code you shared is not correct for the restart functionality to work. Currently you have:

@Component
public class EmployeeTaxCalculationReader {


   public JdbcPagingItemReader<EmployeeDetail> getPagingItemReader() throws Exception {
      // ...
   }

}

And then in the job/step definition the reader is set with .reader(reader.getPagingItemReader()):

@Bean(name="FEBP_EMP_TAX_CALCULATION")
public Job getBatchJob() throws Exception {
    ItemProcessor<EmployeeDetail, EmployeeTaxDetail> itemProcessor = appContext.getBean(EmployeeTaxCalculationProcessor.class);
    ItemWriter<EmployeeTaxDetail> itemWriter = appContext.getBean(EmployeeTaxCalculationWriter.class);
    EmployeeTaxCalculationReader reader = appContext.getBean(EmployeeTaxCalculationReader.class);
    Step step = new StepBuilder("FEBP_EMP_TAX_CALCULATION_STEP", jobRepository)
        .<EmployeeDetail, EmployeeTaxDetail>chunk(5, transactionManager)
        .reader(reader.getPagingItemReader()).processor(itemProcessor).writer(itemWriter).taskExecutor(actStmntTaskExecutor())
        .build();

    Job febpTaxCalculationJob = new JobBuilder("FEBP_EMP_TAX_CALCULATION", jobRepository).incrementer(new RunIdIncrementer()).start(step).build();
    return febpTaxCalculationJob;
  }

So the reader is not a Spring bean, and therefore Spring Batch won't create a proxy around it as an ItemStream to save the progress in the execution context, which is required on restart to resume from the last checkpoint. The JdbcPagingItemReader should be declared as a bean. You can find an example here.

Edit: Add example of how to inject the reader as a bean

@Bean(name="FEBP_EMP_TAX_CALCULATION")
public Job getBatchJob() throws Exception {
    ItemProcessor<EmployeeDetail, EmployeeTaxDetail> itemProcessor = appContext.getBean(EmployeeTaxCalculationProcessor.class);
    ItemWriter<EmployeeTaxDetail> itemWriter = appContext.getBean(EmployeeTaxCalculationWriter.class);
    JdbcPagingItemReader<EmployeeDetail> pagingItemReader = appContext.getBean(JdbcPagingItemReader.class);
    Step step = new StepBuilder("FEBP_EMP_TAX_CALCULATION_STEP", jobRepository)
        .<EmployeeDetail, EmployeeTaxDetail>chunk(5, transactionManager)
        .reader(pagingItemReader).processor(itemProcessor).writer(itemWriter).build();

    Job febpTaxCalculationJob = new JobBuilder("FEBP_EMP_TAX_CALCULATION", jobRepository).incrementer(new RunIdIncrementer()).start(step).build();
    return febpTaxCalculationJob;
  }

Or pass the reader as a parameter to the bean definition method:

@Bean(name="FEBP_EMP_TAX_CALCULATION")
public Job getBatchJob(JdbcPagingItemReader<EmployeeDetail> pagingItemReader) throws Exception {
    ItemProcessor<EmployeeDetail, EmployeeTaxDetail> itemProcessor = appContext.getBean(EmployeeTaxCalculationProcessor.class);
    ItemWriter<EmployeeTaxDetail> itemWriter = appContext.getBean(EmployeeTaxCalculationWriter.class);
    Step step = new StepBuilder("FEBP_EMP_TAX_CALCULATION_STEP", jobRepository)
        .<EmployeeDetail, EmployeeTaxDetail>chunk(5, transactionManager)
        .reader(pagingItemReader).processor(itemProcessor).writer(itemWriter).build();

    Job febpTaxCalculationJob = new JobBuilder("FEBP_EMP_TAX_CALCULATION", jobRepository).incrementer(new RunIdIncrementer()).start(step).build();
    return febpTaxCalculationJob;
  }

You should do the same for the processor and writer.