My understanding of GPars Actors may be off so please correct me if I'm wrong. I have a Groovy app that polls a web service for jobs. When one or more jobs are found it sends each job to a DynamicDispatchActor
I've created, and the job is handled. The jobs are completely self-contained and don't need to return anything to the main thread. When multiple jobs come in at once I'd like them to be processed in parallel, but no matter what configuration I try the actor processes them first in first out.
To give a code example:
def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))
def actor = poolGroup.messageHandler {
when {Integer msg ->
println("I'm number ${msg} on thread ${Thread.currentThread().name}")
Thread.sleep(1000)
}
}
def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
integers.each {
actor << it
}
This prints out:
I'm number 1 on thread Actor Thread 31 I'm number 2 on thread Actor Thread 31 I'm number 3 on thread Actor Thread 31 I'm number 4 on thread Actor Thread 31 I'm number 5 on thread Actor Thread 31 I'm number 6 on thread Actor Thread 31 I'm number 7 on thread Actor Thread 31 I'm number 8 on thread Actor Thread 31 I'm number 9 on thread Actor Thread 31 I'm number 10 on thread Actor Thread 31
With a slight pause in between each print out. Also notice that each printout happens from the same Actor/thread.
What I'd like to see here is the first 5 numbers are printed out instantly because the thread pool is set to 5, and then the next 5 numbers as those threads free up. Am I completely off base here?
To make it run as you expect there are few changes to make:
Full gist file: https://gist.github.com/wololock/7f1348e04f68710e42d2
Then the output will be:
Now let's take a look what changed. First of all in your previous example you've worked on a single actor only. You defined
poolGroup
correctly, but then you created a single actor and shifted computation to this single instance. To make run those computations in parallel you have to rely onpoolGroup
and only send an input to some message handler - pool group will handle actors creation and their lifecycle management. This is what we do in:It will create a collection of actors started with given input. Pool group will take care that the specified pool size is not exceeded. Then you have to
join
each actor and this can be done by using groovy's magic:actors*.join()
. Thanks that the application will wait with termination until all actors stop their computation. That's why we have to addstop()
method to thewhen
closure of message handler's body - without it, it wont terminate, because pool group does not know that actors did they job - they may wait e.g. for some another message.Alternative solution
We can also consider alternative solution that uses GPars parallelized iterations:
In this example you have a stateless component
Processor
and paralleled computations using one instance of statelessProcessor
with multiple input values.I've tried to figure out the case you mentioned in comment, but I'm not sure if single actor can process multiple messages at a time. Statelessness of an actor means only that it does not change it's internal state during the processing of a message and must not store any other information in actor scope. It would be great if someone could correct me if my reasoning is not correct :)
I hope this will help you. Best!