Question

I'm making a port of the AKKA framework for .NET (don't take this too serious now, it is a weekend hack of the Actor part of it right now)

I'm having some problems with the "Future" support in it. In Java/Scala Akka, Futures are to be awaited synchronously with an Await call. Much like the .NET Task.Wait()

My goal is to support true async await for this. It works right now, but the continuation is executed on the wrong thread in my current solution.

This is the result when passing a message to one of my actors that contain an await block for a future. As you can see, the actor always executes on the same thread, while the await block executes on a random threadpool thread.

actor thread: 6
await thread 10
actor thread: 6
await thread 12
actor thread: 6
actor thread: 6
await thread 13
...

The actor gets a message using a DataFlow BufferBlock<Message> Or rather, I use RX over the bufferblock to subscribe to messages. It is configured like this:

var messages = new BufferBlock<Message>()
{
        BoundedCapacity = 100,
        TaskScheduler = TaskScheduler.Default,
};
messages.AsObservable().Subscribe(this);

So far so good.

However, when I await on a future result. like so:

protected override void OnReceive(IMessage message)
{
    ....

    var result = await Ask(logger, m);
    // This is not executed on the same thread as the above code
    result.Match()  
       .With<SomeMessage>(t => {
       Console.WriteLine("await thread {0}",
          System.Threading.Thread.CurrentThread.GetHashCode());
        })
       .Default(_ => Console.WriteLine("Unknown message"));
     ...

I know this is normal behavior of async await, but I really must ensure that only one thread has access to my actor.

I don't want the future to run synchronously, I want to to run async just like normal, but I want the continuation to run on the same thread as the message processor/actor does.

My code for the future support looks like this:

public Task<IMessage> Ask(ActorRef actor, IMessage message)
{
    TaskCompletionSource<IMessage> result = 
        new TaskCompletionSource<IMessage>();
    var future = Context.ActorOf<FutureActor>(name : Guid.NewGuid().ToString());

    // once this object gets a response, 
    // we set the result for the task completion source
    var futureActorRef = new FutureActorRef(result);            
    future.Tell(new SetRespondTo(), futureActorRef); 
    actor.Tell(message, future); 
    return result.Task;
}

Any ideas what I can do to force the continuation to run on the same thread that started the above code?

Était-ce utile?

La solution

I'm making a port of the AKKA framework for .NET

Sweet. I went to an Akka talk at CodeMash '13 despite having never touched Java/Scala/Akka. I saw a lot of potential there for a .NET library/framework. Microsoft is working on something similar, which I hope will eventually be made generally available (it's currently in a limited preview).

I suspect that staying in the Dataflow/Rx world as much as possible is the easier approach; async is best when you have asynchronous operations (with a single start and single result for each operation), while Dataflow and Rx work better with streams and subscriptions (with a single start and multiple results). So my first gut reaction is to either link the buffer block to an ActionBlock with a specific scheduler, or use ObserveOn to move the Rx notifications to a specific scheduler, instead of trying to do it on the async side. Of course I'm not really familiar with the Akka API design, so take that with a grain of salt.

Anyway, my async intro describes the only two reliable options for scheduling await continuations: SynchronizationContext.Current and TaskScheduler.Current. If your Akka port is more of a framework (where your code does the hosting, and end-user code is always executed by your code), then a SynchronizationContext may make sense. If your port is more of a library (where end-user code does the hosting and calls your code as necessary), then a TaskScheduler would make more sense.

There aren't many examples of a custom SynchronizationContext, because that's pretty rare. I do have an AsyncContextThread type in my AsyncEx library which defines both a SynchronizationContext and a TaskScheduler for that thread. There are several examples of custom TaskSchedulers, such as the Parallel Extensions Extras which has an STA scheduler and a "current thread" scheduler.

Autres conseils

Task scheduler decides whether to run a task on a new thread or on the current thread. There is an option to force running it on a new thread, but none forcing it to run on the current thread. But there is a method Task.RunSynchronously() which Runs the Task synchronously on the current TaskScheduler. Also if you are using async/await there is already a similar question on that.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top