For situations like this, I wrote something called a ResponseAggregator
. It is an Actor
instantiated as needed (rather than as a persistent single instance) taking as arguments a destination ActorRef
, an arbitrary key
(to distinguish the aggregator if a single destination gets fed by more than one aggregator), a completion predicate that takes a Seq[Any]
holding responses received by the aggregator so far and which returns true
if those responses represent completion of the aggregation process and a timeout value. The aggregator accepts and collects incoming messages until the predicate returns true or the timeout expires. Once aggregation is complete (including due to timeout) all the messages that have been received are sent to the destination along with a flag indicating whether or not aggregation timed out.
The code is a bit too big to include here and is not open source.
For this to work, the messages propagating through the system must bear ActorRef
s indicating to whom a response message is to be sent (I rarely design actors that reply only to sender
).
I often define the replyTo
field of a message value as ActorRef*
and then use my MulticastActor
class, which enables the !*
"send to multiple recipients" operator. This has the advantage of syntactic cleanliness in the message construction (by comparison to using Option[ActorRef] or Seq[ActorRef]) and has equal overhead (requiring the construction of something to capture the reply-to actor ref or refs).
Anyway, with these things, you can set up pretty flexible routing topologies.