My requirements goes as below:
- There are different event types for specific time period and these are captured at every one second granularity.
- The same event types data must be aggregated such that sequence of data shouldn't change which means basically group by can't be applied over event type directly. This kind of data that jpapagingitemreader is already producing.
- Now, I need to collect all the records produced by item reader in processor. Reason is item reader data must be re-aggregated(event types) to form final object list using already existing reference object.
- Initailly reference object must be fetched from db and upon after constructing final object list. this reference object must be updated to final object in the list. 5.This final object list can have one or multiple (list) depending upon the requirements definition as multiple event types are reformed to create an new object.
- Since, processor can produce list w.r.t reader. I am fine to get List in item writer, as it can be re-structured by sorting with time for persisting.
- If I can't get list in the processor then entire logic must be applied in item writer.
- As I don't want to mix up the processor and writer responsibility, this question raised.
I am already aware about chunks in spring batch which drive the item writer management. Its fine for me as data will be normally less than 100 records as partial aggregation is already applied at db level. I have found there is an AggregateItemReader in spring batch but its for files. I would appreciate your help to migitate this problem. Also please try to provide reference code as I am new to spring batch not aware about all the concepts.
ItemReader is in below code:
@Bean("jpaPagingItemReader")
@StepScope
public JpaPagingItemReader<TrailAggregate> jpaPagingItemReader(
@Qualifier("entityManagerFactory") LocalContainerEntityManagerFactoryBean entityManagerFactory,
@Value("#{jobParameters['vehicleId']}") Long vehicleId,
@Value("#{jobParameters['startTime']}") Long startTime, @Value("#{jobParameters['endTime']}") Long endTime,
@Value("${jpa.paging.item.reader.query}") String jpaPagingItemReaderQuery) throws Exception {
JpaPagingItemReader<TrailAggregate> jpaPagingItemReader = new JpaPagingItemReader<>();
jpaPagingItemReader.setEntityManagerFactory(entityManagerFactory.getObject());
JpaNativeQueryProvider<TrailAggregate> queryProvider = new JpaNativeQueryProvider<>();
queryProvider.setEntityClass(TrailAggregate.class);
queryProvider.setSqlQuery(jpaPagingItemReaderQuery);
queryProvider.afterPropertiesSet();
jpaPagingItemReader.setQueryProvider(queryProvider);
Map<String, Object> jpaPagingItemReaderParams = new HashMap<>();
jpaPagingItemReaderParams.put(FieldConstants.VEHICLE_ID, vehicleId);
jpaPagingItemReaderParams.put(FieldConstants.START_TIME, Instant.ofEpochMilli(startTime));
jpaPagingItemReaderParams.put(FieldConstants.END_TIME, Instant.ofEpochMilli(endTime));
jpaPagingItemReader.setParameterValues(jpaPagingItemReaderParams);
jpaPagingItemReader.setPageSize(10);
jpaPagingItemReader.setSaveState(false);
jpaPagingItemReader.afterPropertiesSet();
return jpaPagingItemReader;
}
ItemProcessor:
public class AggregateItemProcessor implements ItemProcessor<**List<TrailAggregate>**, List<AggregatedTrail>> {
@Override
public List<AggregatedTrail> process(List<TrailAggregate> items) throws Exception {
// **Spring throws saying list is not supported**
return items;
}
}
All required list of objects reference correctly given while creatign job with required object reference for chunk