I'm currently using JBeret as the batch implementation of JSR 352 spec.
FIRST ISSUE
I'm tring to reproduct a chunk job where processor fails and store restart position so I can restart from last succesfull index.
This is the Reader class
@Named
public class MyReaderFails extends AbstractItemReader {
@Inject
Logger logger;
List<Integer> output;
int index;
@Inject
StepContext stepContext;
@Override
public Object readItem() throws Exception {
logger.info("reading item: {}", index);
return output.get(index++);
}
@Override
public void open(Serializable checkpoint) throws Exception {
logger.info("open: {}", checkpoint);
int startIndex = Optional.ofNullable(checkpoint).map(Integer.class::cast).orElse(0);
output = IntStream.range(startIndex, 30).boxed().collect(Collectors.toList());
}
@Override
public Serializable checkpointInfo() throws Exception {
logger.info("current checkpoint: {}", index);
return index;
}
}
This is the Processor
@Named
public class MyProcessorFails implements ItemProcessor {
@Inject
Logger logger;
@Inject
@BatchProperty(name = "itemnumerror")
Integer itemnumerror;
@Inject
@BatchProperty(name = "error")
Boolean error;
@Override
public Object processItem(Object o) throws Exception {
logger.info("input: {}", o);
if (itemnumerror == o && error) {
throw new RuntimeException(); //first time batch is started, throws an error
}
Integer output = (Integer)o + 30;
return output;
}
}
the Writer
@Named
public class MyWriterFails extends AbstractItemWriter {
@Inject
Logger logger;
@Inject
StepContext stepContext;
@SuppressWarnings("squid:S2629")
@Override
public void writeItems(List<Object> list) throws Exception {
logger.info("output: {}", list.stream().map(String::valueOf).collect(Collectors.joining(" , ", "{", "}")));
ArrayList<Integer> processed = Optional.ofNullable(stepContext.getPersistentUserData()).map(ArrayList.class::cast).orElse(new ArrayList<Integer>());
processed.addAll(list.stream().map(Integer.class::cast).collect(Collectors.toList()));
stepContext.setPersistentUserData(processed);
}
@Override
public Serializable checkpointInfo() throws Exception {
return Optional.of(stepContext.getPersistentUserData()).map(List.class::cast).map(List::size).orElse(0);
}
}
Now I'd expect that if job fails, it set the JOB_EXECUTION.RESTART_POSITION column to the index returned from checkpointInfo, but this doesn't happen.
Therefore, when I try to restart the job from the last id, checkpoint is always null.
How can I store checkpoint in order to retrieve it from the open method ?
SECOND ISSUE
On the app test I often see setting persistence user data from stepContext object (See also my writer implementation). What is the really usage of this ? What can I really do with saved data?
I also guess that, when working with thousand of records, this practice can lead to critical memory overhead.
Any hints ?
This is the job xml I used in my testing:
This is the test method:
The output: