Can I use FlatFileItemWriter to write multi-format file in Spring Batch?

1.5k views Asked by At

I'm reading a multi-format file using FlatFileItemReader and mapping every line to its corresponding bean type in ItemProcessor and performing data enrichment. But when I try to write these records into a file using FlatFileItemWriter, I'm not able to assign separate BeanWrapperFieldExtractor for the different record types. How can I sort this out?

Input File format

1#9999999#00001#2#RecordType1
2#00002#June#Statement#2020#9#RecordType2
3#7777777#RecordType3

Expected output file format

1#9999999#00001#2#RecordType1#mobilenumber1
2#00002#June#Statement#2020#9#RecordType2#mobilenumber2
3#7777777#RecordType3#mobilenumber3

ItemProcessor

public class RecordTypeItemProcessor implements ItemProcessor<RecordType, RecordType> {

    @Override
    public RecordType process(RecordType recordType) throws Exception {
        if (recordType instanceof RecordType1) {
            RecordType1 recordType1 = (RecordType1) recordType;
            //enrichment logic
            return recordType1;
        } else if (recordType instanceof RecordType2) {
            RecordType2 recordType2 = (RecordType2) recordType;
            //enrichment logic
            return recordType2;
        }
        else
            return null;
    }
}```

2

There are 2 answers

3
Mahmoud Ben Hassine On BEST ANSWER

You can use a ClassifierCompositeItemProcessor with a ClassifierCompositeItemWriter for that. The idea is to classify items according to their type using a Classifier and call the corresponding item processor/writer. This approach is cleaner IMO than using multiple instance of checks in the processor/writer.

There are some built-in Classifier implementations (I think SubclassClassifier is a good option for you), but you can create your own if needed.

You can find examples in ClassifierCompositeItemProcessorTests and ClassifierCompositeItemWriterTests.

2
pulikuttie On

The solution suggested by Mahmoud finally worked :) However, there are a few caveats. ClassifierCompositeItemWriter is designed to write into different files. i.e., if there are 3 different Record Types, there will be 3 different output files. But in my use case, I'm expecting the output in a single file in the same order. So, I mentioned the same file name in each Writer bean and added `writer.setAppendAllowed(true); But after this, different Record Types were clubbed together and the sorting order changed. So, I reduced the chunk size from 50 to 3. 3 is the total number of Record Types. This will take a hit on the performance, however. With some tweaks like this, finally I'm getting the desired output. Here is my implementation (Just for reference, needs more cleanup)

Configuration.java (StepBuilderFactory)

...chunk(3).reader(reader).processor(processor()).writer(writer).stream(recordType1FlatFileItemWriter()).stream(recordType2FlatFileItemWriter()).build();

Processor Step

@Bean
@StepScope
public ItemProcessor processor() {
            ClassifierCompositeItemProcessor<? extends RecordType, ? extends RecordType> processor = new ClassifierCompositeItemProcessor<>();
            ItemProcessor<RecordType1, RecordType1> recordType1Processor = new ItemProcessor<RecordType1, RecordType1>() {
                @Nullable
                @Override
                public RecordType1 process(RecordType1 recordType1) throws Exception {
                    //Processing logic
                    return recordType1;
                }
            };

            ItemProcessor<RecordType2, RecordType2> recordType2Processor = new ItemProcessor<RecordType2, RecordType2>() {
                @Nullable
                @Override
                public RecordType2 process(RecordType2 recordType2) throws Exception {
                    //Processing logic
                    return recordType2;
                }
            };

        SubclassClassifier classifier = new SubclassClassifier();
        Map typeMap = new HashMap();
        typeMap.put(RecordType1.class, recordType1Processor);
        typeMap.put(RecordType2.class, recordType2Processor);
        classifier.setTypeMap(typeMap);
        processor.setClassifier(classifier);
        return processor;
    }

Writer Step

    @Bean
    @StepScope
    public ClassifierCompositeItemWriter writer(@Value("#{stepExecutionContext[fileName]}") Resource file) throws Exception {
        ClassifierCompositeItemWriter<String> writer = new ClassifierCompositeItemWriter<>();
        SubclassClassifier classifier = new SubclassClassifier<>();
        Map typeMap = new HashMap<>();
        typeMap.put(RecordType1.class, recordType1FlatFileItemWriter());
        typeMap.put(RecordType2.class, recordType2FlatFileItemWriter());
        typeMap.put(RecordType3.class, recordType3FlatFileItemWriter());
        classifier.setTypeMap(typeMap);
        writer.setClassifier(classifier);
        return writer;
    }

Writer

    @Bean
    public FlatFileItemWriter<RecordType1> recordType1FlatFileItemWriter() throws Exception{
        FlatFileItemWriter<RecordType1> writer = new FlatFileItemWriter<>();
        writer.setResource( new FileSystemResource(outputFolder + "test.txt"));
        writer.setAppendAllowed(true);
        writer.setLineAggregator(new DelimitedLineAggregator<RecordType1>() {{
            setDelimiter("#");
            setFieldExtractor(new BeanWrapperFieldExtractor<RecordType1>() {
                {
                    setNames(new String[] { "RecordType", "ID1", "ID2", "ID3");
                }
            });
        }});
        return  writer;
    }

    @Bean
    public FlatFileItemWriter<RecordType2> recordType2FlatFileItemWriter() throws Exception{
        FlatFileItemWriter<RecordType2> writer = new FlatFileItemWriter<>();
        writer.setResource( new FileSystemResource(outputFolder + "test.txt"));
        writer.setAppendAllowed(true);
        writer.setLineAggregator(new DelimitedLineAggregator<RecordType2>() {{
            setDelimiter("#");
            setFieldExtractor(new BeanWrapperFieldExtractor<RecordType2>() {
                {
                    setNames(new String[] { "RecordType", "ID9", "ID8",);
                }
            });
        }});
        return  writer;
    }