I am trying to implement a Java Aggregator for Akka, since it doesn't look like the Java API supports them (why not!?)
Here's my best attempt thus far:
// Groovy pseudo-code
abstract class Aggregator<T> extends UntypedActor {
ActorRef recipient
Set<T> aggregation
// TODO: Timer timer (?)
abstract boolean isAggregated()
@Override
void onReceive(Object message) {
aggregation << message as T
if(isAggregated()) {
recipient.tell(new Aggregation(aggregation)) // again, pseudo-code
aggregation.clear()
// TODO: timer.reset()
}
}
}
What is missing is some kind of a Timer
construct that will time the Aggregator
out after, say, 60 seconds if it has not aggregated yet. On timeout, it should throw some exception of some sort. On aggregation, the timer should be reset. Any ideas how to do this?
What you are looking for is
ReceiveTimeout
. Akka provides a feature to have a timeout when a particular actor has not received anything in a predefined amount of time.In Java you would do something like this inside your actor:
When this trigger it sends a message of type
ReceiveTimeout
to the actor and then you can decide what you want to do (exceptions, logging, resetting...).You can find more information here under the section 'Receive timeout': http://doc.akka.io/docs/akka/snapshot/java/untyped-actors.html
On the other hand, there are open-source libraries to do these kind of things available in github. Take a look to https://github.com/sksamuel/akka-patterns for more examples.