Return multiple items from spring batch ItemProcessor

30.9k views Asked by At

I'm writing a spring batch job and in one of my step I have the following code for the processor:

@Component
public class SubscriberProcessor implements ItemProcessor<NewsletterSubscriber, Account>, InitializingBean {

    @Autowired
    private AccountService service;

    @Override public Account process(NewsletterSubscriber item) throws Exception {
        if (!Strings.isNullOrEmpty(item.getId())) {
            return service.getAccount(item.getId());
        }
        // search with email address
        List<Account> accounts = service.findByEmail(item.getEmail());
        checkState(accounts.size() <= 1, "Found more than one account with email %s", item.getEmail());
        return accounts.isEmpty() ? null : accounts.get(0);
    }

    @Override public void afterPropertiesSet() throws Exception {
        Assert.notNull(service, "account service must be set");
    }
}

The above code works but I've found out that there are some edge cases where having more than one Account per NewsletterSubscriber is allowed. So I need to remove the state check and to pass more than one Account to the item writer.

One solution I found is to change both ItemProcessor and ItemWriter to deal with List<Account> type instead of Account but this have two drawbacks:

  • Code and tests are uglier and harder to write and maintain because of nested lists in writer
  • Most important more than one Account object may be written in the same transaction because a list given to writer may contain multiple accounts and I'd like to avoid this.

Is there any way, maybe using a listener, or replacing some internal component used by spring batch to avoid lists in processor?

Update

I've opened an issue on spring Jira for this problem.

I'm looking into isComplete and getAdjustedOutputs methods in FaultTolerantChunkProcessor which are marked as extension points in SimpleChunkProcessor to see if I can use them in some way to achieve my goal.

Any hint is welcome.

4

There are 4 answers

1
Matt Broekhuis On

Item Processor takes one thing in, and returns a list

MyItemProcessor implements ItemProcessor<SingleThing,List<ExtractedThingFromSingleThing>> {
    public List<ExtractedThingFromSingleThing> process(SingleThing thing) {
    //parse and convert to list
    }
}

Wrap the downstream writer to iron things out. This way stuff downstream from this writer doesn't have to work with lists.

@StepScope
public class ItemListWriter<T> implements ItemWriter<List<T>> {
    private ItemWriter<T> wrapped;

    public ItemListWriter(ItemWriter<T> wrapped) {
        this.wrapped = wrapped;
    }

    @Override
    public void write(List<? extends List<T>> items) throws Exception {
        for (List<T> subList : items) {
            wrapped.write(subList);
        }
    }
}
3
Michael Minella On

There isn't a way to return more than one item per call to an ItemProcessor in Spring Batch without getting pretty far into the weeds. If you really want to know where the relationship between an ItemProcessor and ItemWriter exits (not recommended), take a look at the implementations of the ChunkProcessor interface. While the simple case (SimpleChunkProcessor) isn't that bad, if you use any of the fault tolerant logic (skip/retry via FaultTolerantChunkProcessor), it get's very unwieldily quick.

A much simpler option would be to move this logic to an ItemReader that does this enrichment before returning the item. Wrap whatever ItemReader you're using in a custom ItemReader implementation that does the service lookup before returning the item. In this case, instead of returning a NewsletterSubscriber from the reader, you'd be returning an Account based on the previous information.

0
Salah Atwa On

You can made transformer to transform your Pojo( Pojo object from file) to your Entity By making the following code :

public class Intializer {

public static LGInfo initializeEntity() throws Exception {
    Constructor<LGInfo> constr1 = LGInfo.class.getConstructor();
    LGInfo info = constr1.newInstance();
    return info;
}
}

And in your item Processor

public class LgItemProcessor<LgBulkLine, LGInfo> implements ItemProcessor<LgBulkLine, LGInfo> {

private static final Log log = LogFactory.getLog(LgItemProcessor.class);

@SuppressWarnings("unchecked")
@Override
public LGInfo process(LgBulkLine item) throws Exception {
    log.info(item);
    return (LGInfo) Intializer.initializeEntity();
}

}
0
Esben On

Instead of returning an Account you return create an AccountWrapper or Collection. The Writer obviously must take this into account :)