@BeforeStep defined in ItemProcessor and ItemWitter is not getting called with spring batch partitioning

83 views Asked by At

We were moving our spring batch implementations to use partitions. After making changes now the @BeforeStep is not getting called which is inside our processor and writter.

Sample code here,

    public ItemWriter<EmployeeTaxDetail> customerItemWriter()
        ItemWriter<EmployeeTaxDetail> itemWriter = appContext.getBean(EmployeeTaxCalculationWriter.class);
        return itemWriter;
    public ItemProcessor<EmployeeDetail, EmployeeTaxDetail> processor() {
        ItemProcessor<EmployeeDetail, EmployeeTaxDetail> itemProcessor = appContext.getBean(EmployeeTaxCalculationProcessor.class);
        return itemProcessor;
    // Master
    public Step step1() 
        return new StepBuilder("step1", jobRepository)
                .partitioner(slaveStep().getName(), partitioner())
                .taskExecutor(new SimpleAsyncTaskExecutor())
    // slave step
    public Step slaveStep() 
        return new StepBuilder("slaveStep", jobRepository)
                .<EmployeeDetail, EmployeeTaxDetail>chunk(10,transactionManager)

EmployeeTaxCalculationWriter and EmployeeTaxCalculationProcessor are like,

public class EmployeeTaxCalculationWriter extends AbstractItemWriter<EmployeeTaxDetail> {

  /** The febp data source provider. */
  private FEBPDataSourceProvider febpDataSourceProvider;

  /** The Constant LOGGER. */
  private static final Logger LOGGER = LoggerFactory.getLogger(EmployeeTaxCalculationWriter.class.getName());


   * Execute write.
   * @param chunk the chunk
   * @throws Exception the exception
  protected void executeWrite(Chunk<? extends EmployeeTaxDetail> chunk) throws Exception {


   * Save to tax detail.
   * @param chunk the chunk
  private void saveToTaxDetail(Chunk<? extends EmployeeTaxDetail> chunk) {

    List<EmployeeTaxDetail> employeeTaxDetailList = (List<EmployeeTaxDetail>) chunk.getItems();
    DataSource febpDBSrc = getFebpDataSource();
    JdbcTemplate jdbcTemplate = new JdbcTemplate(febpDBSrc);
    jdbcTemplate.batchUpdate(INSERT_EMP_TAX_QUERY, new BatchPreparedStatementSetter() {
      public void setValues(PreparedStatement ps, int i) throws SQLException {
        EmployeeTaxDetail empTaxDetail = employeeTaxDetailList.get(i);
        ps.setString(1, UUID.randomUUID().toString());
        ps.setString(2, empTaxDetail.getEmpId());
        ps.setLong(3, empTaxDetail.getTaxAmount());

      public int getBatchSize() {
        return employeeTaxDetailList.size();

   * Gets the febp data source.
   * @return the febp data source
  private DataSource getFebpDataSource() {
    return febpDataSourceProvider.getDataSource(FengThreadLocal.getTenantId(), "FEBP");


and Component

public class EmployeeTaxCalculationProcessor extends AbstractItemProcessor<EmployeeDetail, EmployeeTaxDetail> {

   * Execute process.
   * @param item the item
   * @return the employee tax detail
   * @throws Exception the exception
  protected EmployeeTaxDetail executeProcess(EmployeeDetail item) throws Exception {
    EmployeeTaxDetail employeeTaxDetail = new EmployeeTaxDetail();
    employeeTaxDetail.setTaxAmount((long) (item.getEmpSalary() * .1));
    return employeeTaxDetail;

And inside AbstractItemWriter we have 
public abstract class AbstractItemWriter<T> implements ItemWriter<T> {

  /** The parameters. */
  private JobParameters parameters;

   * Write.
   * @param chunk the chunk
   * @throws Exception the exception
  public void write(@NonNull Chunk<? extends T> chunk) throws Exception {
      JobParameter jobParameter = null;
      if( parameters!=null) {
     jobParameter = parameters.getParameter(FebpAPIConstants.FEBP_THREAD_LOCAL);
    if (jobParameter != null) {
          (ApplicationThreadLocalState) JsonUtil.readJsonValue(jobParameter.getValue().toString(), ApplicationThreadLocalState.class, false));

   * Execute write.
   * @param chunk the chunk
   * @throws Exception the exception
  protected abstract void executeWrite(@NonNull Chunk<? extends T> chunk) throws Exception;

   * Before step.
   * @param stepExecution the step execution
  public void beforeStep(final StepExecution stepExecution) {
    parameters = stepExecution.getJobExecution().getJobParameters();

   * Gets the parameters.
   * @return the parameters
  public JobParameters getParameters() {
    return parameters;
} and inside AbstractItemProcessor we have
public abstract class AbstractItemProcessor<I, O> implements ItemProcessor<I, O> {

  /** The parameters. */
  private JobParameters parameters;

   * Process.
   * @param item the item
   * @return the o
   * @throws Exception the exception
  public O process(I item) throws Exception {
      JobParameter jobParameter = null;
      if( parameters!=null) {
     jobParameter = parameters.getParameter(FebpAPIConstants.FEBP_THREAD_LOCAL);
    if (jobParameter != null) {
          (ApplicationThreadLocalState) JsonUtil.readJsonValue(jobParameter.getValue().toString(), ApplicationThreadLocalState.class, false));
    return executeProcess(item);

   * Execute write.
   * @param item the item
   * @return the o
   * @throws Exception the exception
  protected abstract O executeProcess(I item) throws Exception;

   * Before step.
   * @param stepExecution the step execution
  public void beforeStep(final StepExecution stepExecution) {
    parameters = stepExecution.getJobExecution().getJobParameters();

   * Gets the parameters.
   * @return the parameters
  public JobParameters getParameters() {
    return parameters;


Now the process method is getting called for each processor and writter call but @BeforeStep is stopped getting called after introducing partitioning.


There are 0 answers