I'm using DeployerPartitionHandler (local variant) to partition my Spring batch job. When I run my job, I'm getting a null pointer exception in the launch step of the worker as below

at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313)
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302)
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:106)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy248.run(Unknown Source)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:207)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:181)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:168)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:163)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:781)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:765)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:319)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1204).

I see below error message when the job starts

ERROR [o.s.cloud.task.listener.TaskLifecycleListener ] An event to end a task has been received for a task that has not yet started. [main] []

But it then makes an entry in TaskRepository as below and proceeds to create the partitions

DEBUG [o.s.c.task.repository.support.SimpleTaskRepository] Creating: TaskExecution{executionId=0, parentExecutionId=null, exitCode=null, taskName='MYTASK', startTime=Tue Nov 24 17:14:48 IST 2020, endTime=null, exitMessage='null', externalExecutionId='null', errorMessage='null', arguments=[--spring.profiles.active=local, --spring.output.ansi.enabled=always]} [main] []

DEBUG [o.s.c.t.batch.partition.DeployerPartitionHandler ] 3 partitions were returned [main] []

Further I see another entry being made in TaskRepository but this time it is with null values in most columns DEBUG [o.s.c.task.repository.support.SimpleTaskRepository] Creating: TaskExecution{executionId=65, parentExecutionId=null, exitCode=null, taskName='null', startTime=null, endTime=null, exitMessage='null', externalExecutionId='null', errorMessage='null', arguments=[]} [main] []

I think since the spring batch is a single task, I should expect only one entry in the TaskRepository but unable to figure out why the second entry is being made I'm using Postgres and following same steps as mentioned in sample code but unable to figure out the issue https://github.com/spring-cloud/spring-cloud-task/tree/master/spring-cloud-task-samples/partitioned-batch-job

Below is the code anyway for reference

@Bean
@Profile("!worker")
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
        TaskRepository taskRepository) throws Exception {
    

    Resource resource = this.resourceLoader.getResource("maven://XXX:YYYY:0.0.2-SNAPSHOT");

    DeployerPartitionHandler partitionHandler =
                new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep", taskRepository);
      
      
    List<String> commandLineArgs = new ArrayList<>(3);
    commandLineArgs.add("--spring.profiles.active=worker");
    commandLineArgs.add("--spring.cloud.task.initialize-enabled=false");
    commandLineArgs.add("--spring.batch.initializer.enabled=false");
    partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
    partitionHandler.setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment));
    partitionHandler.setMaxWorkers(2);
    partitionHandler.setApplicationName("MYTASK");

    return partitionHandler;
}


@Bean
@Profile("!worker")
public Job partitionedJob(PartitionHandler partitionHandler) throws Exception {
    
    Random random = new Random();
    return this.jobBuilderFactory.get("partitionedJob" + random.nextInt())
        .start(step1(partitionHandler))
        .build();
}

@Bean
@Profile("!worker")
public Step step1(PartitionHandler partitionHandler) throws Exception {

    return this.stepBuilderFactory.get("step1")
        .partitioner(workerStep().getName(), partitioner())         
        .step(workerStep())
        .partitionHandler(partitionHandler)
        .build();
}


@Bean
@Profile("!worker")
public Partitioner partitioner() {
    return new Partitioner() {
        @Override
        public Map<String, ExecutionContext> partition(int gridSize) {
            System.out.println("In partitioner");
            Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);

            for (int i = 0; i < 3; i++) {
                ExecutionContext context1 = new ExecutionContext();
                context1.put("partitionNumber", i);

                partitions.put("partition" + i, context1);
            }
            
            return partitions;
        }
    };
}

@Bean
@Profile("worker")
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
    
    return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
}

@Bean

public Step workerStep() {

    return this.stepBuilderFactory.get("workerStep")
        .tasklet(workerTasklet(null))
    
        .build();
}

@Bean
@StepScope
public Tasklet workerTasklet(
    final @Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber) {

    return new Tasklet() {
        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

            return RepeatStatus.FINISHED;
        }
    };
}
1

There are 1 answers

0
Danil Ko On

Because the code seem slightly incomplete, so not 100% sure.

But I was able to resolved similar issue after adding

  1. @EnableTask

On the Configuration class where this job profile is defined

  1. And ensure the partioniner is giving less value equal or less than maxworker (the partitioner seem to give 3 grid accroding to for loop, but the maxworker is 2 only)

Regarding the Task execution number, following is what I found after I map the TaskRepository also to DB (instead of in-memory) Can be reference here for code/info how how to map the TaskRepository with actual DB instead of default memory https://www.baeldung.com/spring-cloud-task

If start from fresh, assume 2 worker setup in above code (partitioner is adding 3 grid in the for loop but MAXWORKER -> 2, so will set to max from my understanding)

TASK 1 will link to JOB 1 TASK 2 -> worker #1 TASK 3 -> worker #2

So it seem spring cloud task will automatically create a task for each of launch worker too to take partition (or it is grid).

The task 2 and task 3 will have a mapping in another table to mention they are associated with task #1