JBeret and JSR352, some clarifications on restart and persistentdata

563 views Asked by At

I'm currently using JBeret as the batch implementation of JSR 352 spec.

FIRST ISSUE

I'm tring to reproduct a chunk job where processor fails and store restart position so I can restart from last succesfull index.

This is the Reader class

@Named
public class MyReaderFails extends AbstractItemReader {

    @Inject
    Logger logger;

    List<Integer> output;
    int index;

    @Inject
    StepContext stepContext;

    @Override
    public Object readItem() throws Exception {
        logger.info("reading item: {}", index);
        return output.get(index++);
    }

    @Override
    public void open(Serializable checkpoint) throws Exception {
        logger.info("open: {}", checkpoint);
        int startIndex = Optional.ofNullable(checkpoint).map(Integer.class::cast).orElse(0);
        output = IntStream.range(startIndex, 30).boxed().collect(Collectors.toList());
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        logger.info("current checkpoint: {}", index);
        return index;
    }
}

This is the Processor

@Named
public class MyProcessorFails implements ItemProcessor {

    @Inject
    Logger logger;

    @Inject
    @BatchProperty(name = "itemnumerror")
    Integer itemnumerror;

    @Inject
    @BatchProperty(name = "error")
    Boolean error;

    @Override
    public Object processItem(Object o) throws Exception {
        logger.info("input: {}", o);
        if (itemnumerror == o && error) {
            throw new RuntimeException(); //first time batch is started, throws an error
        }
        Integer output = (Integer)o + 30;

        return output;
    }
}

the Writer

@Named
public class MyWriterFails extends AbstractItemWriter {

    @Inject
    Logger logger;

    @Inject
    StepContext stepContext;

    @SuppressWarnings("squid:S2629")
    @Override
    public void writeItems(List<Object> list) throws Exception {
        logger.info("output: {}", list.stream().map(String::valueOf).collect(Collectors.joining(" , ", "{", "}")));
        ArrayList<Integer> processed = Optional.ofNullable(stepContext.getPersistentUserData()).map(ArrayList.class::cast).orElse(new ArrayList<Integer>());
        processed.addAll(list.stream().map(Integer.class::cast).collect(Collectors.toList()));
        stepContext.setPersistentUserData(processed);
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return Optional.of(stepContext.getPersistentUserData()).map(List.class::cast).map(List::size).orElse(0);
    }
}

Now I'd expect that if job fails, it set the JOB_EXECUTION.RESTART_POSITION column to the index returned from checkpointInfo, but this doesn't happen.

Therefore, when I try to restart the job from the last id, checkpoint is always null.

How can I store checkpoint in order to retrieve it from the open method ?

SECOND ISSUE

On the app test I often see setting persistence user data from stepContext object (See also my writer implementation). What is the really usage of this ? What can I really do with saved data?

I also guess that, when working with thousand of records, this practice can lead to critical memory overhead.

Any hints ?

1

There are 1 answers

0
cheng On

This is the job xml I used in my testing:

<?xml version="1.0" encoding="UTF-8"?>

<job id="fail-restart" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
     version="1.0">
    <step id="fail-restart.step1">
        <chunk item-count="10">
            <reader ref="myReaderFails">
                <properties>
                    <property name="logger" value="java.util.logging.Logger"/>
                </properties>
            </reader>
            <processor ref="myProcessorFails">
            <properties>
                <property name="logger" value="java.util.logging.Logger"/>
                <property name="error" value="#{jobParameters['error']}"/>
                <property name="itemnumerror" value="15"/>
            </properties>
            </processor>
            <writer ref="myWriterFails">
            <properties>
                <property name="logger" value="java.util.logging.Logger"/>
            </properties>
            </writer>
        </chunk>
    </step>
</job>

This is the test method:


public class SimpleIT {
    private static final JobOperator jobOperator = BatchRuntime.getJobOperator();

    @Test
    public void failRestart() throws Exception {
        final Properties params = new Properties();
        params.setProperty("error", String.valueOf(Boolean.TRUE));

        final long jobExecutionId = jobOperator.start("fail-restart", params);
        JobExecutionImpl jobExecution = (JobExecutionImpl) jobOperator.getJobExecution(jobExecutionId);
        jobExecution.awaitTermination(5, TimeUnit.MINUTES);
        Assert.assertEquals(BatchStatus.FAILED, jobExecution.getBatchStatus());

        params.setProperty("error", String.valueOf(Boolean.FALSE));
        final long restartId = jobOperator.restart(jobExecutionId, params);
        jobExecution = (JobExecutionImpl) jobOperator.getJobExecution(restartId);
        jobExecution.awaitTermination(5, TimeUnit.MINUTES);
        Assert.assertEquals(BatchStatus.COMPLETED, jobExecution.getBatchStatus());
    }
}

The output:

Dec 09, 2019 11:36:18 AM sample.MyReaderFails open
INFO: open: null
output list: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 0
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 0
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 1
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 1
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 2
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 2
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 3
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 3
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 4
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 4
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 5
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 5
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 6
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 6
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 7
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 7
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 8
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 8
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 9
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 9
Dec 09, 2019 11:36:18 AM sample.MyWriterFails writeItems
INFO: output: {30 , 31 , 32 , 33 , 34 , 35 , 36 , 37 , 38 , 39}
Dec 09, 2019 11:36:18 AM sample.MyReaderFails checkpointInfo
INFO: current checkpoint: 10
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 10
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 10
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 11
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 11
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 12
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 12
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 13
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 13
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 14
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 14
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 15
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 15
Dec 09, 2019 11:36:18 AM org.jberet.runtime.runner.ChunkRunner readProcessWriteItems
ERROR: ProcessingInfo{count=6, timerExpired=false, itemState=RUNNING, chunkState=RUNNING, checkpointPosition=9, readPosition=15, failurePoint=null}
Dec 09, 2019 11:36:18 AM org.jberet.runtime.runner.ChunkRunner run
ERROR: item-count=10, time-limit=0, skip-limit=-1, skipCount=0, retry-limit=-1, retryCount=0
Dec 09, 2019 11:36:18 AM org.jberet.runtime.runner.ChunkRunner run
ERROR: JBERET000007: Failed to run job fail-restart, fail-restart.step1, org.jberet.job.model.Chunk@7f938562
java.lang.RuntimeException
    at sample.MyProcessorFails.processItem(MyProcessorFails.java:38)
    at org.jberet.runtime.runner.ChunkRunner.processItem(ChunkRunner.java:422)
    at org.jberet.runtime.runner.ChunkRunner.readProcessWriteItems(ChunkRunner.java:335)
    at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:208)
    at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:225)
    at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:144)
    at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164)
    at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88)
    at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:60)
    at org.jberet.spi.JobExecutor$1.run(JobExecutor.java:99)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)

Dec 09, 2019 11:36:18 AM sample.MyReaderFails open
INFO: open: 10
output list: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 0
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 10
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 1
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 11
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 2
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 12
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 3
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 13
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 4
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 14
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 5
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 15
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 6
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 16
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 7
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 17
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 8
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 18
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 9
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 19
Dec 09, 2019 11:36:18 AM sample.MyWriterFails writeItems
INFO: output: {40 , 41 , 42 , 43 , 44 , 45 , 46 , 47 , 48 , 49}
Dec 09, 2019 11:36:18 AM sample.MyReaderFails checkpointInfo
INFO: current checkpoint: 10
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 10
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 20
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 11
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 21
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 12
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 22
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 13
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 23
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 14
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 24
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 15
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 25
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 16
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 26
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 17
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 27
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 18
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 28
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 19
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 29
Dec 09, 2019 11:36:18 AM sample.MyWriterFails writeItems
INFO: output: {50 , 51 , 52 , 53 , 54 , 55 , 56 , 57 , 58 , 59}
Dec 09, 2019 11:36:18 AM sample.MyReaderFails checkpointInfo
INFO: current checkpoint: 20
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 20
Dec 09, 2019 11:36:18 AM sample.MyReaderFails checkpointInfo
INFO: current checkpoint: 21
Weld SE container STATIC_INSTANCE shut down by shutdown hook

Process finished with exit code 0