we use JDK 7 watchservice to watch directory which can have xml or csv files. These files are put in threadpool and later on processed and pushed into database.This application runs for ever watching the directory and keeps processing files as and when available. XML file are small and does not take time, however each csv file can contain more than 80 thousand records so processing takes time to put in database. Java application give us outofmemory error when there are 15 csv files getting processed from threadpool. Is there any way where when csv files comes into threadpool, it can be serially processed i.e only one at a time.
Sequentially processing file in threadpool executor
1.2k views Asked by user2751620 At
3
There are 3 answers
0
On
You can try:
- Increase the memory of JVM using the
-Xmx
JVM option Use a different executor to reduce the number of processed files at a time. A drastical solution is to use a
SingleThreadExecutor
:public class FileProcessor implements Runnable { public FileProcessor(String name) { } public void run() { // process file } } // ... ExecutorService executor = Executors.newSingleThreadExecutor(); // ... public void onNewFile(String fileName) { executor.submit(new FileProcessor(fileName)); }
0
On
Java application give us outofmemory error when there are 15 csv files getting processed from threadpool. Is there any way where when csv files comes into threadpool, it can be serially processed i.e only one at a time.
If I'm understanding, you want to stop adding to the pool if you are over some threshold. There is an easy way to do that which is by using a blocking-queue and the rejected execution handler.
See the following answer:
To summarize it, you do something like the following:
// only allow 100 jobs to queue
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100);
ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, queue);
// we need our RejectedExecutionHandler to block if the queue is full
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// this will block the producer until there's room in the queue
executor.getQueue().put(r);
} catch (InterruptedException e) {
throw new RejectedExecutionException(
"Unexpected InterruptedException", e);
}
}
});
This will mean that it will block adding to the queue and should not exhaust memory.
I would take a different route to solve your problem, I guess you have everything right except when you start reading too much data into memory.
Not sure how are you reading csv files, would suggest to use a LineReader and read e.g. 500 lines process them and then read next 500 lines, all large files should be handled this way only, because no matter how much you increase your memory arguments, you will hit out of memory as soon as you will have a bigger file to process, so use an implementation that can handle records in batches. This would require some extra coding effort but will never fail no matter how big file you have to process.
Cheers !!