Spring batch : ClassifierCompositeItemWriter footer not getting called

1.5k views Asked by At

I am using Spring Batch to write multiple reports. Requirement is i will get records with BranchId and name. I need to create one file for each branchId and write respective data into that file along with some header and footer.

Example:

Student A = new Student("A",1);
        Student B = new Student("B",2);
        Student C = new Student("C",1);
        Student D = new Student("D",4);

In this case it should create total 3 files

 file1-->1.txt(with A,C)
    file2-->2.txt(with B)
    file3-->4.txt(with D))

.

I am using ClassifierCompositeItemWriter to create / reuse the FlatFileItemWriter based on data(id in this case) & able to create the files successfully. For header and footer - using callbacks at writer level. Generated files are having only HEADER and DATA. But somehow FOOTER is not at all getting executed.

Looks like some issues with File closing before Footer or issue with usage of STEP SCOPE.

Can someone help me in getting the FOOTER called.

here is the code.

   

     import java.util.Arrays;
        import java.util.HashMap;
        import java.util.Map;
        
        import org.springframework.batch.core.Job;
        import org.springframework.batch.core.JobParameters;
        import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
        import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
        import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
        import org.springframework.batch.core.configuration.annotation.StepScope;
        import org.springframework.batch.core.launch.JobLauncher;
        import org.springframework.batch.item.ExecutionContext;
        import org.springframework.batch.item.ItemReader;
        import org.springframework.batch.item.ItemWriter;
        import org.springframework.batch.item.file.FlatFileItemWriter;
        import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
        import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
        import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
        import org.springframework.batch.item.support.ListItemReader;
        import org.springframework.classify.Classifier;
        import org.springframework.context.ApplicationContext;
        import org.springframework.context.annotation.AnnotationConfigApplicationContext;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import org.springframework.core.io.FileSystemResource;
        
        @Configuration
        @EnableBatchProcessing
        public class MyJob3 {
        
            public static void main(String[] args) throws Exception {
                ApplicationContext context = new AnnotationConfigApplicationContext(MyJob3.class);
                JobLauncher jobLauncher = context.getBean(JobLauncher.class);
                Job job = context.getBean(Job.class);
                jobLauncher.run(job, new JobParameters());
            }
        
            @Bean
            public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
                return jobs.get("job").start(steps.get("step").<Student, Student>chunk(5)
                        .reader(itemReader())
                        .writer(getStudentItemWriter(itemWriterClassifier()))
                        .build())
                        .build();
            }
        
            @Bean
            @StepScope
            public ItemReader<Student> itemReader() {
                Student A = new Student("A", 1);
                Student B = new Student("B", 2);
                Student C = new Student("C", 1);
                Student D = new Student("D", 4);
                Student E = new Student("E", 4); 
        
                return new ListItemReader<>(Arrays.asList(A,B,C,D,E));
            }
        
            Map<Integer, FlatFileItemWriter<Student>> map = new HashMap<>();
        
            @Bean
            @StepScope
            public ClassifierCompositeItemWriter<Student> getStudentItemWriter(Classifier<Student, ItemWriter<? super Student>> classifier) {
                ClassifierCompositeItemWriter<Student> compositeItemWriter = new ClassifierCompositeItemWriter<>();
                compositeItemWriter.setClassifier(classifier);
                return compositeItemWriter;
            }
        
            @Bean
            @StepScope
            public Classifier<Student, ItemWriter<? super Student>> itemWriterClassifier() {
                return student -> {
        
                    System.out.println("Branch Id ::" + student.getBranchId() + " and Student Name" + student.getName());
                    if (map.containsKey(student.getBranchId())) {
                        FlatFileItemWriter<Student> result = map.get(student.getBranchId());
                        System.out.println("Exising Writer object ::" + result);
                        return result;
                    }
                    
                    String fileName ="Branch_Info_" + student.getBranchId() + ".txt";
                    BeanWrapperFieldExtractor<Student> fieldExtractor = new BeanWrapperFieldExtractor<>();
                    fieldExtractor.setNames(new String[] { "branchId", "name" });
                    DelimitedLineAggregator<Student> lineAggregator = new DelimitedLineAggregator<>();
                    lineAggregator.setFieldExtractor(fieldExtractor);
        
                    FlatFileItemWriter<Student> flatFileItemWriter = new FlatFileItemWriter<>();
                    flatFileItemWriter.setResource(new FileSystemResource(fileName));
                    flatFileItemWriter.setAppendAllowed(true);
                    flatFileItemWriter.setLineAggregator(lineAggregator);
                    System.out.println("Writing header...");
                    flatFileItemWriter.setHeaderCallback(writer -> writer.write("Header"));
                    System.out.println("Writing Footer...");
                    flatFileItemWriter.setFooterCallback(writer -> writer.write("Footer"));
                    System.out.println("Writing done...");
                    flatFileItemWriter.open(new ExecutionContext());
        
                    map.put(student.getBranchId(), flatFileItemWriter);
        
                    System.out.println("New Writer object ::" + flatFileItemWriter);
                    return flatFileItemWriter;
                };
            }
        }

1

There are 1 answers

0
Mahmoud Ben Hassine On BEST ANSWER

In my case, i don't have fixed number writers (foo & boo in your case) and they will be dynamic and need to create at RUN time. Any suggestions on how to do it and register them to step?

In that case, you need to:

  1. pre-calculate the possible distinct values (1, 2 and 4 in your case) with a query like select distinct(id) from table for example or a similar mechanism depending on you input data
  2. dynamically create ItemWriter beans and register them as streams in your step.

The following is an example based on your use case: given a list of students in different groups, the idea is to write them in different files based on their group. Here is a tasklet that pre-calculates the distinct groups and creates/registers item writers dynamically in the application context:

import java.io.IOException;
import java.io.Writer;
import java.util.List;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.file.FlatFileFooterCallback;
import org.springframework.batch.item.file.FlatFileHeaderCallback;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;

class DynamicWritersConfigurationTasklet implements Tasklet {

    private JdbcTemplate jdbcTemplate;
    private ConfigurableApplicationContext applicationContext;

    public DynamicWritersConfigurationTasklet(JdbcTemplate jdbcTemplate, ConfigurableApplicationContext applicationContext) {
        this.jdbcTemplate = jdbcTemplate;
        this.applicationContext = applicationContext;
    }

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
        ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
        BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;

        String sql = "select distinct(groupId) from student";
        List<Integer> groups = jdbcTemplate.queryForList(sql, Integer.class);
        for (Integer group : groups) {
            String name = "group" + group + "Writer";
            GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
            beanDefinition.setBeanClassName(FlatFileItemWriter.class.getName());
            MutablePropertyValues propertyValues = new MutablePropertyValues();
            propertyValues.addPropertyValue("name", name);
            propertyValues.addPropertyValue("lineAggregator", new PassThroughLineAggregator<>());
            propertyValues.addPropertyValue("resource", new FileSystemResource(group + ".txt"));
            propertyValues.addPropertyValue("headerCallback", (FlatFileHeaderCallback) writer -> writer.write("header"));
            propertyValues.addPropertyValue("footerCallback", (FlatFileFooterCallback) writer -> writer.write("footer"));
            beanDefinition.setPropertyValues(propertyValues);
            registry.registerBeanDefinition(name, beanDefinition);
        }
        return RepeatStatus.FINISHED;
    }

}

Once that in place, a second step loads those item writers from the application context at runtime and registers them as delegates in a ClassifierCompositeItemWriter:

@Bean
@StepScope
public ClassifierCompositeItemWriter<Student> itemWriter(ConfigurableApplicationContext applicationContext) {
    // dynamically get writers from the application context and register them as delegates in the composite
    Map<String, FlatFileItemWriter> beansOfType = applicationContext.getBeansOfType(FlatFileItemWriter.class);
    // Classify students by group
    Classifier<Student, FlatFileItemWriter<Student>> classifier = student -> beansOfType.get("group" + student.getGroupId() + "Writer");
    return new ClassifierCompositeItemWriterBuilder()
            .classifier(classifier)
            .build();
}

@Bean
@JobScope
public Step step2(StepBuilderFactory stepBuilderFactory, ConfigurableApplicationContext applicationContext, DataSource dataSource) {
    SimpleStepBuilder<Student, Student> step2 = stepBuilderFactory.get("readWriteStudents")
            .<Student, Student>chunk(2)
            .reader(itemReader(dataSource))
            .writer(itemWriter(applicationContext));

    // register writers as streams in the step so that open/update/close are called correctly
    Map<String, FlatFileItemWriter> beansOfType = applicationContext.getBeansOfType(FlatFileItemWriter.class);
    for (FlatFileItemWriter flatFileItemWriter : beansOfType.values()) {
        step2.stream(flatFileItemWriter);
    }

    return step2.build();
}

I have a complete example here: sample app for SO67604628. Please refer to this guide to see how to checkout a single folder (if you don't want to clone the entire repo). The sample generates 3 files with students grouped by groupId. Note how headers/footers are correctly generated since delegate writers are registered as streams in the step.