We were moving our spring batch implementations to use partitions. After making changes now the @BeforeStep is not getting called which is inside our processor and writter.
Sample code here,
@Bean
@StepScope
public ItemWriter<EmployeeTaxDetail> customerItemWriter()
{
ItemWriter<EmployeeTaxDetail> itemWriter = appContext.getBean(EmployeeTaxCalculationWriter.class);
return itemWriter;
}
@Bean
@StepScope
public ItemProcessor<EmployeeDetail, EmployeeTaxDetail> processor() {
ItemProcessor<EmployeeDetail, EmployeeTaxDetail> itemProcessor = appContext.getBean(EmployeeTaxCalculationProcessor.class);
return itemProcessor;
}
// Master
@Bean
public Step step1()
{
return new StepBuilder("step1", jobRepository)
.partitioner(slaveStep().getName(), partitioner())
.listener(processor())
.listener(customerItemWriter())
.step(slaveStep())
.gridSize(5)
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
// slave step
@Bean
public Step slaveStep()
{
return new StepBuilder("slaveStep", jobRepository)
.<EmployeeDetail, EmployeeTaxDetail>chunk(10,transactionManager)
.reader(pagingItemReader(null,null))
.processor(processor())
.writer(customerItemWriter())
.build();
}
EmployeeTaxCalculationWriter and EmployeeTaxCalculationProcessor are like,
@Component
public class EmployeeTaxCalculationWriter extends AbstractItemWriter<EmployeeTaxDetail> {
/** The febp data source provider. */
@Autowired
private FEBPDataSourceProvider febpDataSourceProvider;
/** The Constant LOGGER. */
private static final Logger LOGGER = LoggerFactory.getLogger(EmployeeTaxCalculationWriter.class.getName());
/** The Constant INSERT_REPORT_HISTORY_QUERY. */
private static final String INSERT_EMP_TAX_QUERY = "INSERT INTO FEBP_EMP_TAX_DETAIL_TEST " + "(ENTITY_ID, EMP_ID, EMP_TAX) VALUES (?, ?, ?)";
/**
* Execute write.
*
* @param chunk the chunk
* @throws Exception the exception
*/
@Override
protected void executeWrite(Chunk<? extends EmployeeTaxDetail> chunk) throws Exception {
saveToTaxDetail(chunk);
}
/**
* Save to tax detail.
*
* @param chunk the chunk
*/
private void saveToTaxDetail(Chunk<? extends EmployeeTaxDetail> chunk) {
List<EmployeeTaxDetail> employeeTaxDetailList = (List<EmployeeTaxDetail>) chunk.getItems();
DataSource febpDBSrc = getFebpDataSource();
JdbcTemplate jdbcTemplate = new JdbcTemplate(febpDBSrc);
jdbcTemplate.batchUpdate(INSERT_EMP_TAX_QUERY, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
EmployeeTaxDetail empTaxDetail = employeeTaxDetailList.get(i);
ps.setString(1, UUID.randomUUID().toString());
ps.setString(2, empTaxDetail.getEmpId());
ps.setLong(3, empTaxDetail.getTaxAmount());
}
@Override
public int getBatchSize() {
return employeeTaxDetailList.size();
}
});
}
/**
* Gets the febp data source.
*
* @return the febp data source
*/
private DataSource getFebpDataSource() {
return febpDataSourceProvider.getDataSource(FengThreadLocal.getTenantId(), "FEBP");
}
}
and Component
public class EmployeeTaxCalculationProcessor extends AbstractItemProcessor<EmployeeDetail, EmployeeTaxDetail> {
/**
* Execute process.
*
* @param item the item
* @return the employee tax detail
* @throws Exception the exception
*/
@Override
protected EmployeeTaxDetail executeProcess(EmployeeDetail item) throws Exception {
EmployeeTaxDetail employeeTaxDetail = new EmployeeTaxDetail();
employeeTaxDetail.setEmpId(item.getEmpId());
employeeTaxDetail.setTaxAmount((long) (item.getEmpSalary() * .1));
if(item.getEmpId().endsWith("2"))
{
TimeUnit.SECONDS.sleep(5);
}
return employeeTaxDetail;
}
}
And inside AbstractItemWriter we have
public abstract class AbstractItemWriter<T> implements ItemWriter<T> {
/** The parameters. */
private JobParameters parameters;
/**
* Write.
*
* @param chunk the chunk
* @throws Exception the exception
*/
@Override
public void write(@NonNull Chunk<? extends T> chunk) throws Exception {
JobParameter jobParameter = null;
if( parameters!=null) {
jobParameter = parameters.getParameter(FebpAPIConstants.FEBP_THREAD_LOCAL);
}
if (jobParameter != null) {
ApplicationThreadLocal.populateThreadLocalData(
(ApplicationThreadLocalState) JsonUtil.readJsonValue(jobParameter.getValue().toString(), ApplicationThreadLocalState.class, false));
}
executeWrite(chunk);
}
/**
* Execute write.
*
* @param chunk the chunk
* @throws Exception the exception
*/
protected abstract void executeWrite(@NonNull Chunk<? extends T> chunk) throws Exception;
/**
* Before step.
*
* @param stepExecution the step execution
*/
**@BeforeStep**
public void beforeStep(final StepExecution stepExecution) {
parameters = stepExecution.getJobExecution().getJobParameters();
}
/**
* Gets the parameters.
*
* @return the parameters
*/
public JobParameters getParameters() {
return parameters;
}
} and inside AbstractItemProcessor we have
public abstract class AbstractItemProcessor<I, O> implements ItemProcessor<I, O> {
/** The parameters. */
private JobParameters parameters;
/**
* Process.
*
* @param item the item
* @return the o
* @throws Exception the exception
*/
@Override
public O process(I item) throws Exception {
JobParameter jobParameter = null;
if( parameters!=null) {
jobParameter = parameters.getParameter(FebpAPIConstants.FEBP_THREAD_LOCAL);
}
if (jobParameter != null) {
ApplicationThreadLocal.populateThreadLocalData(
(ApplicationThreadLocalState) JsonUtil.readJsonValue(jobParameter.getValue().toString(), ApplicationThreadLocalState.class, false));
}
return executeProcess(item);
}
/**
* Execute write.
*
* @param item the item
* @return the o
* @throws Exception the exception
*/
protected abstract O executeProcess(I item) throws Exception;
/**
* Before step.
*
* @param stepExecution the step execution
*/
@BeforeStep
public void beforeStep(final StepExecution stepExecution) {
parameters = stepExecution.getJobExecution().getJobParameters();
}
/**
* Gets the parameters.
*
* @return the parameters
*/
public JobParameters getParameters() {
return parameters;
}
}
Now the process method is getting called for each processor and writter call but @BeforeStep is stopped getting called after introducing partitioning.