I successfully deployed a remote partitioned job using Spring Cloud Data Flow and Spring Cloud Task; the installation is based on Kubernetes, so I added the Kubernetes implementation of Spring Cloud Deployer to the project.
But it seems that it's impossible to propagate values from the step execution context of a worker to the job execution context.
The worker tasklet writes some data into the step execution context, which is successfully saved in the "BATCH_STEP_EXECUTION_CONTEXT" table:
@Bean
public Tasklet workerTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
ExecutionContext executionContext = chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext();
Integer partitionNumber = executionContext.getInt("partitionNumber");
System.out.println("This tasklet ran partition UPD1: " + partitionNumber);
executionContext.put(RESULT_PREFIX+partitionNumber, "myResult "+partitionNumber);
return RepeatStatus.FINISHED;
}
};
}
There's an ExecutionPromotionListener which is added during the Step building process:
@Bean
public StepExecutionListener promotionListener() {
ExecutionContextPromotionListener listener = new
ExecutionContextPromotionListener();
listener.setKeys(new String[] {"result0","result1","result2","result3"});
return listener;
}
@Bean
public Step workerStep() {
return this.stepBuilderFactory.get("workerStep")
.tasklet(workerTasklet())
.listener(promotionListener())
.build();
}
The job successfully completes and the work is partitioned properly and executed by 4 Kubernetes pods:
But the expected values are not present in the BATCH_JOB_EXECUTION_CONTEXT table.
Conversley, the step execution context propagation works with a partitioned job in a not cloud environment, for example using a TaskExecutorPartitionHandler.