Process huge amount of data with grails and gpars

2.2k views Asked by At

I have a Grails application which runs a job on a daily basis at midnight. In my example app I have 10000 Person records and do the following in the quartz job:

package threading

import static grails.async.Promises.task
import static groovyx.gpars.GParsExecutorsPool.withPool

class ComplexJob {
    static triggers = {
        simple repeatInterval: 30 * 1000l
    }

    def execute() {
        if (Person.count == 5000) {
            println "Executing job"                
            withPool 10000, {
                Person.listOrderByAge(order: "asc").each { p ->
                    task {
                        log.info "Started ${p}"
                        Thread.sleep(15000l - (-1 * p.age))
                    }.onComplete {
                        log.info "Completed ${p}"
                    }
                }
            }                
        }
    }
}

ignore the repeatInterval as this is only for testing purposes. When the job gets executed I get the following exception:

2014-11-14 16:11:51,880 quartzScheduler_Worker-3 grails.plugins.quartz.listeners.ExceptionPrinterJobListener - Exception occurred in job: Grails Job
org.quartz.JobExecutionException: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000 [See nested exception: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000]
    at grails.plugins.quartz.GrailsJobFactory$GrailsJob.execute(GrailsJobFactory.java:111)
    at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
    at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
Caused by: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000
    at org.grails.async.factory.gpars.LoggingPoolFactory$3.rejectedExecution(LoggingPoolFactory.groovy:100)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
    at groovyx.gpars.scheduler.DefaultPool.execute(DefaultPool.java:155)
    at groovyx.gpars.group.PGroup.task(PGroup.java:305)
    at groovyx.gpars.group.PGroup.task(PGroup.java:286)
    at groovyx.gpars.dataflow.Dataflow.task(Dataflow.java:93)
    at org.grails.async.factory.gpars.GparsPromise.<init>(GparsPromise.groovy:41)
    at org.grails.async.factory.gpars.GparsPromiseFactory.createPromise(GparsPromiseFactory.groovy:68)
    at grails.async.Promises.task(Promises.java:123)
    at threading.ComplexJob$_execute_closure1_closure3.doCall(ComplexJob.groovy:20)
    at threading.ComplexJob$_execute_closure1.doCall(ComplexJob.groovy:19)
    at groovyx.gpars.GParsExecutorsPool$_withExistingPool_closure2.doCall(GParsExecutorsPool.groovy:192)
    at groovyx.gpars.GParsExecutorsPool.withExistingPool(GParsExecutorsPool.groovy:191)
    at groovyx.gpars.GParsExecutorsPool.withPool(GParsExecutorsPool.groovy:162)
    at groovyx.gpars.GParsExecutorsPool.withPool(GParsExecutorsPool.groovy:136)
    at threading.ComplexJob.execute(ComplexJob.groovy:18)
    at grails.plugins.quartz.GrailsJobFactory$GrailsJob.execute(GrailsJobFactory.java:104)
    ... 2 more
2014-11-14 16:12:06,756 Actor Thread 20 org.grails.async.factory.gpars.LoggingPoolFactory - Async execution error: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed.
java.lang.IllegalStateException: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed.
    at groovyx.gpars.dataflow.expression.DataflowExpression.bind(DataflowExpression.java:368)
    at groovyx.gpars.group.PGroup$4.run(PGroup.java:315)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
2014-11-14 16:12:06,756 Actor Thread 5 org.grails.async.factory.gpars.LoggingPoolFactory - Async execution error: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed.
java.lang.IllegalStateException: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed.
    at groovyx.gpars.dataflow.expression.DataflowExpression.bind(DataflowExpression.java:368)
    at groovyx.gpars.group.PGroup$4.run(PGroup.java:315)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

It seems as if the thread pool hasn't been set to 10000 while I use withPool(10000) Can I perhaps do this computation (now only prints log statements) in chunks? If so how can I tell what the latest item was that was processed (e.g. where to continue) ?

2

There are 2 answers

0
Denis Borovikov On BEST ANSWER

Trying to wrap processing of each element to task seems not optimal. Standard way to make parallel processing is split whole task to appropriate number of sub-tasks. You are starting with selection of this number. For CPU-bound task you might create N=number-of-processors tasks. Then you split task to N sub-tasks. Like this:

persons = Person.listOrderByAge(order: "asc")
nThreads = Runtime.getRuntime().availableProcessors()
size = persons.size() / nThreads
withPool nThreads, {
    persons.collate(size).each { subList =>
        task {
            subList.each { p =>
                ...     
            }
        }           
    }
}
0
Vaclav Pech On

I suspect the withPool() method has no effect, since the task is most likely using a default thread pool, not the one created in withPool. Try removing the call to withPool() and see if the tasks still run.

The groovyx.gpars.scheduler.DefaultPool pool (the default for tasks) in GPars resizes with tasks and has a limit to 1000 concurrent threads.

I'd suggest creating a fixed size pool instead, e.g.:

def group = new DefaultPGroup(numberOfThreads)
group.task {...}

Note: I'm not familiar with the grails.async task, only the core GPars ones, so things may be slightly different around PGroups in grails.async.