akka combining multiple lists of futures without blocking
문제
This question is probably relatively basic to one familiar with akka Futures, so please bear with me.
Assuming I have a hierarchy of akka actors per the following structure:
BigBoss (one)
|___ExecutiveActor (one)
|___ManagerActor (many)
|___WorkerActor (many)
And assuming that each non-BigBoss has internal state in a Status wrapper.
If the BigBoss were to want a List<Status>
from all descendants, I can see this being the method:
// inside BigBoss getAllStatuses
Future<Object> futureStatuses = executive.ask("status", 5000);
List<Status> = (List<Status>)Await.(futureStatuses, timeout);
And after the initial message is propagated to all the way down to Worker, I can see Worker replying to Manager with:
getSender().tell(myStatus);
Manager would have a lot of these coming back and want to put them in a list for the Executive as a list of Futures - the above reply being in response to:
// inside Manager getAllStatuses
List<Future<Object>> statuses =...
for(Worker w : workers) {
Future<Object> status = w.ask("status", 5000);
statuses.add(status);
}
Future<List<Object>> futureStatuses = sequence(statuses, ...);
getSender().tell(futureStatuses);
The above sequence first converts the Futures into a Future<List<Object>>
(which really has statuses in it)
So this is where it starts to get a bit hairy to me.
ExecutiveActor had done this:
for(Manager m : managers) {
Future<Object> status = m.ask("status", 5000);
// the above Future would be a Future<Future<List<Object>>
}
So, finally - my questions -
How do I reduce the above
Future<Future<List<Object>>
to aFuture<List<Object>>
without blocking until BigBoss finally calls Await?Is there a way to retain the Status type throughout this rather than using Object? I know an UntypedActor cannot send a explicit typed reply, but perhaps there is a cleaner way to express this?
Is there any more elegant approach to this entirely that I'm not thinking of? Did I use Sequence too early, or should I have not used it at all?
I am (obviously) using Java - and would prefer not to use TypedActors, but would love all feedback!
Thank you very much for your time-
해결책
Something like this?
final ActorRef cacheSender = getSender();
Future<List<Object>> futureStatuses = sequence(statuses, ...);
futureStatuses.onComplete( new Procedure2<Throwable, List<Object>>() {
public void apply(Throwable t, Object r) {
if(t != null) cacheSender.tell("ohnoes");
else cacheSender.tell(r);
}
});