Spring Batch - How to dynamically invoke a Step Configuration to reinitialise partitioner instance

83 views Asked by At

I have a spring batch job consisting of ItemReader, ItemProcessor and NoOpItemWriter.
I have a rest controller that starts the job using some job parameters.

  • The item reader reads from a list ( that is present in the servlet context)

  • Item processor takes the item and makes a DB call

  • Item Writer does nothing

  • The partitioner logic is an implementation of List Partitioner that partitions a list based on index ranges

Initially the servlet context is empty ( has one dummy item). So on bean initialization the list partitioner receives one dummy item. Depending on what gets passed in the post request , the list is populated

The problem is, when the spring boot application starts, the partitioner is already initialized with the dummy list contents. I would like the partitioner to work on what gets passed as part of my rest controller.

Below is my Step , Job and Servlet bean configuration

@Bean(name = ["controllerStep"])
  protected fun controllerStep(
    jobRepository: JobRepository,
    transactionManager: PlatformTransactionManager
  ): Step {

    servletConfiguration.setServletContext()
    return StepBuilder("controllerStep", jobRepository)
      .partitioner(
        "workerStep",
        ListPartitioner(servletConfiguration.getItemProcessingList()!!.size)
      )
      .step(workerStep(jobRepository, transactionManager))
      .gridSize(6)
      .build()
  }


  @Bean
  fun job(jobRepository: JobRepository?, transactionManager: PlatformTransactionManager?): Job? {
    return JobBuilder("job", jobRepository!!)
      .start(controllerStep)
      .build()
  }

Below is the Servlet Configuration Class

@Configuration
class ServletConfiguration(private var servletContext: ServletContext) {

  fun setServletContext() {
    servletContext.setAttribute("DatatoBeProcessed", listOf(""))
  }
  @Suppress("UNCHECKED_CAST")
  fun getItemProcessingList(): List<String>? {
    val contextList = servletContext.getAttribute(ATTRIBUTE_NAME_PROCESSING_INPUT_DATA)
    return if (contextList != null) contextList as? List<String> else listOf("")
  }
}

Below is my Rest Controller: I would like the partitioner to acknowledge the below list that is set to the servlet context when the job is launched

dataService.setListsIntoServletContext(jobRequestDto)
  @PostMapping("/trigger")
  fun startJob(@RequestBody jobDTO: JobDTO): ResponseEntity<String> {

    // fetch list of items and save in servlet context
    dataService.setListsIntoServletContext(jobRequestDto)

    // create job parameters
    val jobParameters =
      JobParametersBuilder()
        .addString("JobUniquekey", Random.nextInt().toString()) // to-be refactored
        .addString(
          "status",
          jobDTO.status.toString(),
        )
        .addString("jobType", jobDTO.jobType.toString())
        .toJobParameters()

    // trigger job
    jobLauncher.run(job, jobParameters)
    return ResponseEntity("JOB trigger:SUCCESS", HttpStatus.ACCEPTED)
  }
2

There are 2 answers

2
Mahmoud Ben Hassine On

I have a spring batch job consisting of ItemReader, ItemProcessor and NoOpItemWriter.

Item Writer does nothing

This is a sign that something is wrong with that design. If the job does not write anything, how to consume its result?

The problem is, when the spring boot application starts, the partitioner is already initialized with the dummy list contents. I would like the partitioner to work on what gets passed as part of my rest controller.

You need to decalre the partitioner as a step-scoped bean and configure it with the parameters coming from the request. You can find an example here: https://docs.spring.io/spring-batch/docs/current/reference/html/step.html#step-scope

0
Jazz On
@Bean(name = ["controllerStep"])
@JobScope
protected fun controllerStep(
  jobRepository: JobRepository,
  transactionManager: PlatformTransactionManager,
  @Value("#{jobParameters['itemListSize']}") listSize: Long
): Step {
  return StepBuilder("controllerStep", jobRepository)
    .partitioner("workerStep", ListPartitioner(listSize.toInt()))
    .step(workerStep(jobRepository, transactionManager))
    .gridSize(6)
    .build()
}

The above helped with partition reinitializing.