TL;DR
Given a file with a million reocrds where there is considerable amount of logic to be executed on each row in the file, what is the fastest way to read the file and finish applying the logic on each row. I used a multi-threaded step with file reader whose read
method is synchronized
to read the file and also used an AsynItemProcessor
so that the records are processed in its own thread.
My expectation is that the AsynItemProcessor
should start immediately as soon as it has a record from a reader to process. Each record should be processed in its own thread; however, this doesn't seem to be the case in my example below
I have a step in my Spring batch job that uses a TaskExecutor
with 20 threads and commit interval of 10000 to read a file. I am also using an AsycnItemProcessor
and AsyncItemWriter
since the data processing can at times take longer than the amount of time required to read a line from the file.
<step id="aggregationStep">
<tasklet throttle-limit="20" task-executor="taskExecutor">
<chunk reader="fileReader"
processor="asyncProcessor" writer="asyncWriter"
commit-interval="10000" />
</tasklet>
</step>
Where :
fileReader
is a class that extendsFlatFileItemReader
and theread
method issynchronized
and simply callssuper.read
within it.asyncProcessor
as nothing but aAsyncItemProcessor
bean that is passed each row from the file and groups it by a key, and stores it into a singleton bean that holds aMap<String,BigDecimal>
object. In other words, the processor is simply grouping the file data by a a few columns and storing this data in memory.asyncWriter
is nothing but anAsyncItemWriter
that wraps a no operationItemWriter
within it. In other words, the job does not need to do any kind of writing since the processor itself is doing the aggregation and storing data in memory. (Map
).- Note that the
AsynItemProcessor
has its onThreadPoolTaskExecutor
withcorePoolSize=10
andmaxPoolSize=20
and theStep
has its ownThreadPoolTaskExecutor
with acorePoolSize=20
andmaxPoolSize=40
With the above setup, my exepcation was that the reading and processing would happen in parallel. Something like :
- FileReader reads a record from the file and passes it to the processor
AsyncItemProcessor
performs aggregation. Since it is anAsyncItemProcessor
, the thread that called theprocess
method should ideally be free to do other work?- Finally, the
AsynItemWriter
would get theFuture
and extract the data but do nothing since the delegate is a no operationItemWriter
.
But when I added some logs, I am not seeing what I expected :
2019-09-07 10:04:49 INFO FileReader:45 - Finished reading 2500 records
2019-09-07 10:04:49 INFO FileReader:45 - Finished reading 5000 records
2019-09-07 10:04:50 INFO FileReader:45 - Finished reading 7501 records
2019-09-07 10:04:50 INFO FileReader:45 - Finished reading 10000 records
2019-09-07 10:04:51 INFO FileReader:45 - Finished reading 12500 records
2019-09-07 10:04:51 INFO FileReader:45 - Finished reading 15000 records
... more such lines are printed until entire file is read. Only after the file is read do I start seeing the processor start doing its work :
2019-09-07 10:06:53 INFO FileProcessor:83 - Finished processing 2500 records
2019-09-07 10:08:28 INFO FileProcessor:83 - Finished processing 5000 records
2019-09-07 10:10:04 INFO FileProcessor:83 - Finished processing 7500 records
2019-09-07 10:11:40 INFO FileProcessor:83 - Finished processing 10000 records
2019-09-07 10:13:16 INFO FileProcessor:83 - Finished processing 12500 records
2019-09-07 10:14:51 INFO FileProcessor:83 - Finished processing 15000 records
Bottom line : Why is the processor not kicking in until the file has been fully read? No matter what the ThreadPoolTaskExecutor
parameters used for the AsynItemProcessor
or for the entire step
, the reading always completes first and only then the processing starts.
This is how chunk oriented processing works. The step will read X items in a variable (where X is the commit-interval), then processing/writing are performed. You can see that in the code of
ChunkOrientedTasklet
.In a multi-thread step, each chunk will be processed by a thread.