@BeforeStep defined in ItemProcessor and ItemWitter is not getting called with spring batch partitioning

50 views Asked by At

We were moving our spring batch implementations to use partitions. After making changes now the @BeforeStep is not getting called which is inside our processor and writter.

Sample code here,

@Bean
    @StepScope
    public ItemWriter<EmployeeTaxDetail> customerItemWriter()
    {
        ItemWriter<EmployeeTaxDetail> itemWriter = appContext.getBean(EmployeeTaxCalculationWriter.class);
        return itemWriter;
    }
    @Bean
    @StepScope
    public ItemProcessor<EmployeeDetail, EmployeeTaxDetail> processor() {
        ItemProcessor<EmployeeDetail, EmployeeTaxDetail> itemProcessor = appContext.getBean(EmployeeTaxCalculationProcessor.class);
        return itemProcessor;
    }
    
    // Master
    @Bean
    public Step step1() 
    {
        return new StepBuilder("step1", jobRepository)
                .partitioner(slaveStep().getName(), partitioner())
                .listener(processor())
                .listener(customerItemWriter())
                .step(slaveStep())
                .gridSize(5)
                .taskExecutor(new SimpleAsyncTaskExecutor())
                .build();
    }
    
    // slave step
    @Bean
    public Step slaveStep() 
    {
        
        return new StepBuilder("slaveStep", jobRepository)
                .<EmployeeDetail, EmployeeTaxDetail>chunk(10,transactionManager)
                .reader(pagingItemReader(null,null))
                .processor(processor())
                .writer(customerItemWriter())
                .build();
    }

EmployeeTaxCalculationWriter and EmployeeTaxCalculationProcessor are like,

@Component
public class EmployeeTaxCalculationWriter extends AbstractItemWriter<EmployeeTaxDetail> {

  /** The febp data source provider. */
  @Autowired
  private FEBPDataSourceProvider febpDataSourceProvider;

  /** The Constant LOGGER. */
  private static final Logger LOGGER = LoggerFactory.getLogger(EmployeeTaxCalculationWriter.class.getName());

  /** The Constant INSERT_REPORT_HISTORY_QUERY. */
  private static final String INSERT_EMP_TAX_QUERY = "INSERT INTO FEBP_EMP_TAX_DETAIL_TEST " + "(ENTITY_ID, EMP_ID, EMP_TAX) VALUES (?, ?, ?)";

  /**
   * Execute write.
   *
   * @param chunk the chunk
   * @throws Exception the exception
   */
  @Override
  protected void executeWrite(Chunk<? extends EmployeeTaxDetail> chunk) throws Exception {
    saveToTaxDetail(chunk);

  }

  /**
   * Save to tax detail.
   *
   * @param chunk the chunk
   */
  private void saveToTaxDetail(Chunk<? extends EmployeeTaxDetail> chunk) {

    List<EmployeeTaxDetail> employeeTaxDetailList = (List<EmployeeTaxDetail>) chunk.getItems();
    DataSource febpDBSrc = getFebpDataSource();
    JdbcTemplate jdbcTemplate = new JdbcTemplate(febpDBSrc);
    jdbcTemplate.batchUpdate(INSERT_EMP_TAX_QUERY, new BatchPreparedStatementSetter() {
      @Override
      public void setValues(PreparedStatement ps, int i) throws SQLException {
        EmployeeTaxDetail empTaxDetail = employeeTaxDetailList.get(i);
        ps.setString(1, UUID.randomUUID().toString());
        ps.setString(2, empTaxDetail.getEmpId());
        ps.setLong(3, empTaxDetail.getTaxAmount());
      }

      @Override
      public int getBatchSize() {
        return employeeTaxDetailList.size();
      }
    });
  }

  /**
   * Gets the febp data source.
   *
   * @return the febp data source
   */
  private DataSource getFebpDataSource() {
    return febpDataSourceProvider.getDataSource(FengThreadLocal.getTenantId(), "FEBP");
  }

}

and Component

public class EmployeeTaxCalculationProcessor extends AbstractItemProcessor<EmployeeDetail, EmployeeTaxDetail> {

  /**
   * Execute process.
   *
   * @param item the item
   * @return the employee tax detail
   * @throws Exception the exception
   */
  @Override
  protected EmployeeTaxDetail executeProcess(EmployeeDetail item) throws Exception {
    EmployeeTaxDetail employeeTaxDetail = new EmployeeTaxDetail();
    employeeTaxDetail.setEmpId(item.getEmpId());
    employeeTaxDetail.setTaxAmount((long) (item.getEmpSalary() * .1));
    if(item.getEmpId().endsWith("2"))
    {
        TimeUnit.SECONDS.sleep(5);
    }
    return employeeTaxDetail;
  }

}
And inside AbstractItemWriter we have 
public abstract class AbstractItemWriter<T> implements ItemWriter<T> {

  /** The parameters. */
  private JobParameters parameters;

  /**
   * Write.
   *
   * @param chunk the chunk
   * @throws Exception the exception
   */
  @Override
  public void write(@NonNull Chunk<? extends T> chunk) throws Exception {
      JobParameter jobParameter = null;
      if( parameters!=null) {
     jobParameter = parameters.getParameter(FebpAPIConstants.FEBP_THREAD_LOCAL);
    }
    if (jobParameter != null) {
      ApplicationThreadLocal.populateThreadLocalData(
          (ApplicationThreadLocalState) JsonUtil.readJsonValue(jobParameter.getValue().toString(), ApplicationThreadLocalState.class, false));
    }
    executeWrite(chunk);
  }

  /**
   * Execute write.
   *
   * @param chunk the chunk
   * @throws Exception the exception
   */
  protected abstract void executeWrite(@NonNull Chunk<? extends T> chunk) throws Exception;

  /**
   * Before step.
   *
   * @param stepExecution the step execution
   */
  **@BeforeStep**
  public void beforeStep(final StepExecution stepExecution) {
    parameters = stepExecution.getJobExecution().getJobParameters();
  }

  /**
   * Gets the parameters.
   *
   * @return the parameters
   */
  public JobParameters getParameters() {
    return parameters;
  }
} and inside AbstractItemProcessor we have
public abstract class AbstractItemProcessor<I, O> implements ItemProcessor<I, O> {

  /** The parameters. */
  private JobParameters parameters;

  /**
   * Process.
   *
   * @param item the item
   * @return the o
   * @throws Exception the exception
   */
  @Override
  public O process(I item) throws Exception {
      JobParameter jobParameter = null;
      if( parameters!=null) {
     jobParameter = parameters.getParameter(FebpAPIConstants.FEBP_THREAD_LOCAL);
    }
    if (jobParameter != null) {
      ApplicationThreadLocal.populateThreadLocalData(
          (ApplicationThreadLocalState) JsonUtil.readJsonValue(jobParameter.getValue().toString(), ApplicationThreadLocalState.class, false));
    }
    return executeProcess(item);
  }

  /**
   * Execute write.
   *
   * @param item the item
   * @return the o
   * @throws Exception the exception
   */
  protected abstract O executeProcess(I item) throws Exception;

  /**
   * Before step.
   *
   * @param stepExecution the step execution
   */
  @BeforeStep
  public void beforeStep(final StepExecution stepExecution) {
    parameters = stepExecution.getJobExecution().getJobParameters();
  }

  /**
   * Gets the parameters.
   *
   * @return the parameters
   */
  public JobParameters getParameters() {
    return parameters;
  }

}

Now the process method is getting called for each processor and writter call but @BeforeStep is stopped getting called after introducing partitioning.

0

There are 0 answers