I am using spring-batch
in spring-boot
application. The Spring Boot version is 2.3.3.RELEASE
.
What I intend to achieve
I have to read a xml file
containing thousands of Transactions
with header tag
(fileInformation). Do some business logic on transaction and then write the file back with the updated values in transaction. I am using StaxEventItemReader
for reading the file and StaxEventItemWriter
for writing to the file. Then i have couple of ItemProcessors
for handling the business logic. Xml file looks like :
<?xml version="1.0" encoding="UTF-8"?>
<reportFile>
<fileInformation>
<sender>200GH7XZ60</sender>
<timestamp>2020-12-23T09:05:34Z</timestamp>
<environment>PRO</environment>
<version>001.60</version>
</fileInformation>
<record>
<transaction>
<buyer><buyer/>
</transaction>
<transaction>
<buyer><buyer/>
</transaction>
<transaction>
<buyer><buyer/>
</transaction>
</record>
</reportFile>
Problem that I am facing is with the value of header tags.
I have configured the OmegaXmlHeaderCallBack
which generates the desired header tags but the value in those tags should be copied from the input file. As I am aware the StaxWriterCallback
is initialized before reader, processor and writer. So I am not able to inject the value using late binding
.
This looked like a basic requirement, but couldn't find any solution on stackoverflow
.
Here is the code snippets to configure spring batch job.
@Slf4j
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
PIExtractorItemProcessor pIExtractorItemProcessor;
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Value( "${eugateway.batch.chunk.size}" )
private int chunkSize;
@Bean
public Step jobStep(ItemStreamReader<CustomHeaderTransactionXmlElement> reader,
CompositeItemProcessor<CustomHeaderTransactionXmlElement,
ProcessorWriterDto> processor,
CompositeItemWriter<ProcessorWriterDto> writer,
EdsClientItemWriteListener<ProcessorWriterDto> writeListener,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("extractAndReplacePersonalDataStep")
.<CustomHeaderTransactionXmlElement, ProcessorWriterDto>chunk(chunkSize)
.reader(reader)
.processor(processor)
.listener(writeListener)
.writer(writer)
.build();
}
@Bean
public Job extractPersonalDataJob(Step jobStep, JobResultListener jobListener,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("extractAndReplacePersonalDataJob")
.incrementer(new RunIdIncrementer())
.start(jobStep)
.listener(jobListener)
.build();
}
@Bean
@StepScope
public ItemStreamReader<CustomHeaderTransactionXmlElement> itemReader(@Value("#{jobParameters[file.path]}") String path) {
Jaxb2Marshaller transactionMarshaller = new Jaxb2Marshaller();
transactionMarshaller.setClassesToBeBound (FileInformation.class, TransactionPositionReport.class);
log.info("Generating StaxEventItemReader");
return new StaxEventItemReaderBuilder<CustomHeaderTransactionXmlElement>()
.name("headerTransaction")
.resource(new FileSystemResource(new FileSystemResource(path)))
.addFragmentRootElements("fileInformation", "transaction")
.unmarshaller(transactionMarshaller)
.build();
}
@Bean
@StepScope
OmegaXmlHeaderCallBack getOmegaXmlHeaderCallBack(@Value("#{jobExecutionContext['header.sender']}") String sender,
@Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
@Value("#{jobExecutionContext['header.environment']}") String environment,
@Value("#{jobExecutionContext['header.version']}") String version){
return new OmegaXmlHeaderCallBack(sender, timestamp, environment, version);
}
@Bean
@StepScope
OmegaXmlFooterCallBack getOmegaXmlFooterCallBack(){
return new OmegaXmlFooterCallBack();
}
@StepScope
@Bean(name = "staxTransactionWriter")
public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
@Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
@Value("#{jobExecutionContext['header.environment']}") String environment,
@Value("#{jobExecutionContext['header.version']}") String version) {
String exportFilePath = "C:\\Users\\sasharma\\Documents\\TO_BE_DELETED\\eugateway\\outputfile.xml";
Resource exportFileResource = new FileSystemResource(exportFilePath);
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setSupportDtd(true);
marshaller.setSupportJaxbElementClass(true);
marshaller.setClassesToBeBound(TransactionPositionReport.class);
return new StaxEventItemWriterBuilder<TransactionPositionReport>()
.name("transactionWriter")
.version("1.0")
.resource(exportFileResource)
.marshaller(marshaller)
.rootTagName("reportFile")
.headerCallback(getOmegaXmlHeaderCallBack(sender, timestamp, environment, version))
.footerCallback(getOmegaXmlFooterCallBack())
.shouldDeleteIfEmpty(true)
.build();
}
@Bean
@StepScope
public PIExtractorItemProcessor extractItemProcessor() {
log.info("Generating PIExtractorItemProcessor");
return new PIExtractorItemProcessor();
}
@Bean
public PIRemoverItemProcessor removeItemProcessor() {
log.info("Generating PIRemoverItemProcessor");
return new PIRemoverItemProcessor();
}
@Bean
@StepScope
CompositeItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> extractAndRemoveItemProcessor() {
log.info("Generating CompositeItemProcessor");
CompositeItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> itemProcessor = new CompositeItemProcessor<>();
itemProcessor.setDelegates((List<? extends ItemProcessor<?, ?>>) Arrays.asList(extractItemProcessor(), removeItemProcessor()));
return itemProcessor;
}
@Bean
@StepScope
public EdsClientItemWriter<ProcessorWriterDto> edsClientItemWriter() {
log.info("Generating EdsClientItemWriter");
return new EdsClientItemWriter<>();
}
@Bean
@StepScope
public OmegaXmlFileWriter<ProcessorWriterDto> omegaXmlFileWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
@Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
@Value("#{jobExecutionContext['header.environment']}") String environment,
@Value("#{jobExecutionContext['header.version']}") String version) {
log.info("Generating OmegaXmlFileWriter");
return new OmegaXmlFileWriter(staxTransactionItemWriter(sender, timestamp, environment, version));
}
@Bean
@StepScope
public CompositeItemWriter<ProcessorWriterDto> compositeItemWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
@Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
@Value("#{jobExecutionContext['header.environment']}") String environment,
@Value("#{jobExecutionContext['header.version']}") String version) {
log.info("Generating CompositeItemWriter");
CompositeItemWriter<ProcessorWriterDto> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(edsClientItemWriter(), omegaXmlFileWriter(sender, timestamp, environment, version)));
return compositeItemWriter;
}
}
Below is the OmegaXmlHeaderCallBack
class. Due to no late binding I always end up getting empty values in header tag.
@Slf4j
public class OmegaXmlHeaderCallBack implements StaxWriterCallback {
private String sender;
private String timestamp;
private String environment;
private String version;
public OmegaXmlHeaderCallBack(String sender, String timestamp, String environment, String version) {
super();
this.sender = sender;
this.timestamp = timestamp;
this.environment = environment;
this.version = version;
}
@Override
public void write(XMLEventWriter writer) {
XMLEventFactory factory = XMLEventFactory.newInstance();
try {
writer.add(factory.createStartElement("", "", "fileInformation"));
writer.add(factory.createStartElement("", "", "sender"));
writer.add(factory.createCharacters(sender));
writer.add(factory.createEndElement("", "", "sender"));
writer.add(factory.createStartElement("", "", "timestamp"));
writer.add(factory.createCharacters(timestamp));
writer.add(factory.createEndElement("", "", "timestamp"));
writer.add(factory.createStartElement("", "", "environment"));
writer.add(factory.createCharacters(environment));
writer.add(factory.createEndElement("", "", "environment"));
writer.add(factory.createStartElement("", "", "version"));
writer.add(factory.createCharacters(version));
writer.add(factory.createEndElement("", "", "version"));
writer.add(factory.createEndElement("", "", "fileInformation"));
writer.add(factory.createStartElement("", "", "record"));
} catch (XMLStreamException e) {
log.error("Error writing OMEGA XML Header: {}", e.getMessage());
throw new OmegaXmlHeaderWriterException(e.getMessage());
}
}
}
Code for ItemProcessor
is below. I am setting the header data into ExecutionContext
which was intended to be read by headerCallback (sadly not going to happen).
@Slf4j
public class PIExtractorItemProcessor implements ItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> {
@Autowired
PersonalDataExtractor personalDataExtractor;
@Value("#{jobParameters['submission.account']}")
private String subAccntId;
@Value("#{stepExecution}")
private StepExecution stepExecution;
@Override
public ProcessorWriterDto process(CustomHeaderTransactionXmlElement headerTransactionElement) throws Exception {
FileInformation header = null;
TransactionPositionReport transaction = null;
if(headerTransactionElement instanceof FileInformation) {
header = (FileInformation)headerTransactionElement;
stepExecution.getExecutionContext().put("header.sender", header.getSender());
stepExecution.getExecutionContext().put("header.timestamp", header.getTimestamp());
stepExecution.getExecutionContext().put("header.environment", header.getEnvironment());
stepExecution.getExecutionContext().put("header.version", header.getVersion());
log.debug("Header {} found.", header.toString());
return null;
} else {
transaction = (TransactionPositionReport)headerTransactionElement;
log.debug("NO header info found for transaction {}", transaction.getProcessingDetails().getCustomerTransactionId());
log.info("Extracting personal data for transaction customer id {} and create EDS requestDto.", transaction.getProcessingDetails().getCustomerTransactionId());
ProcessorWriterDto transferObject = new ProcessorWriterDto();
transferObject.setEdsRequestDtoList(personalDataExtractor.extract(transaction, subAccntId));
transferObject.setTransaction(transaction);
return transferObject;
}
}
}
Links that have been referred by me :
Your step is doing too much. I would beak things down to two steps:
Here is a quick example:
This example shows the idea. You only need to write the snippet that extracts an xml tag from the file (only the header, see TODO). The
StaxWriterCallback
in that example is a step-scoped bean and can use the header from the execution context. Other step-scoped components from step 2 can also be configured in the same way (processor, listener, etc).