How do I parallelize GPars Actors?

818 views Asked by At

My understanding of GPars Actors may be off so please correct me if I'm wrong. I have a Groovy app that polls a web service for jobs. When one or more jobs are found it sends each job to a DynamicDispatchActor I've created, and the job is handled. The jobs are completely self-contained and don't need to return anything to the main thread. When multiple jobs come in at once I'd like them to be processed in parallel, but no matter what configuration I try the actor processes them first in first out.

To give a code example:

def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))

def actor = poolGroup.messageHandler {
    when {Integer msg -> 
        println("I'm number ${msg} on thread ${Thread.currentThread().name}")
        Thread.sleep(1000)
    }
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

integers.each {
    actor << it
}

This prints out:

I'm number 1 on thread Actor Thread 31
I'm number 2 on thread Actor Thread 31
I'm number 3 on thread Actor Thread 31
I'm number 4 on thread Actor Thread 31
I'm number 5 on thread Actor Thread 31
I'm number 6 on thread Actor Thread 31
I'm number 7 on thread Actor Thread 31
I'm number 8 on thread Actor Thread 31
I'm number 9 on thread Actor Thread 31
I'm number 10 on thread Actor Thread 31

With a slight pause in between each print out. Also notice that each printout happens from the same Actor/thread.

What I'd like to see here is the first 5 numbers are printed out instantly because the thread pool is set to 5, and then the next 5 numbers as those threads free up. Am I completely off base here?

1

There are 1 answers

2
Szymon Stepniak On BEST ANSWER

To make it run as you expect there are few changes to make:

import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.DefaultPool

def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))

def closure = {
    when {Integer msg ->
        println("I'm number ${msg} on thread ${Thread.currentThread().name}")
        Thread.sleep(1000)
        stop()
    }
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

def actors = integers.collect { poolGroup.messageHandler(closure) << it }
actors*.join()

Full gist file: https://gist.github.com/wololock/7f1348e04f68710e42d2

Then the output will be:

I'm number 5 on thread Actor Thread 5
I'm number 4 on thread Actor Thread 4
I'm number 1 on thread Actor Thread 1
I'm number 3 on thread Actor Thread 3
I'm number 2 on thread Actor Thread 2
I'm number 6 on thread Actor Thread 3
I'm number 9 on thread Actor Thread 4
I'm number 7 on thread Actor Thread 2
I'm number 8 on thread Actor Thread 5
I'm number 10 on thread Actor Thread 1

Now let's take a look what changed. First of all in your previous example you've worked on a single actor only. You defined poolGroup correctly, but then you created a single actor and shifted computation to this single instance. To make run those computations in parallel you have to rely on poolGroup and only send an input to some message handler - pool group will handle actors creation and their lifecycle management. This is what we do in:

def actors = integers.collect { poolGroup.messageHandler(closure) << it }

It will create a collection of actors started with given input. Pool group will take care that the specified pool size is not exceeded. Then you have to join each actor and this can be done by using groovy's magic: actors*.join(). Thanks that the application will wait with termination until all actors stop their computation. That's why we have to add stop() method to the when closure of message handler's body - without it, it wont terminate, because pool group does not know that actors did they job - they may wait e.g. for some another message.

Alternative solution

We can also consider alternative solution that uses GPars parallelized iterations:

import groovyx.gpars.GParsPool

// This example is dummy, but let's assume that this processor is
// stateless and shared between threads component.
class Processor {
    void process(int number) {
        println "${Thread.currentThread().name} starting with number ${number}"
        Thread.sleep(1000)
    }
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Processor processor = new Processor()

GParsPool.withPool 5, {
    integers.eachParallel { processor.process(it) }
}

In this example you have a stateless component Processor and paralleled computations using one instance of stateless Processor with multiple input values.

I've tried to figure out the case you mentioned in comment, but I'm not sure if single actor can process multiple messages at a time. Statelessness of an actor means only that it does not change it's internal state during the processing of a message and must not store any other information in actor scope. It would be great if someone could correct me if my reasoning is not correct :)

I hope this will help you. Best!