Question

AFAIK, all it knows is that at some point, its SetResult or SetException method is being called to complete the Task<T> exposed through its Task property.

In other words, it acts as the producer for a Task<TResult> and its completion.

I saw here the example:

If I need a way to execute a Func<T> asynchronously and have a Task<T> to represent that operation.

public static Task<T> RunAsync<T>(Func<T> function) 
{ 
    if (function == null) throw new ArgumentNullException(“function”); 
    var tcs = new TaskCompletionSource<T>(); 
    ThreadPool.QueueUserWorkItem(_ => 
    { 
        try 
        {  
            T result = function(); 
            tcs.SetResult(result);  
        } 
        catch(Exception exc) { tcs.SetException(exc); } 
    }); 
    return tcs.Task; 
}

Which could be used if I didn’t have Task.Factory.StartNew - But I do have Task.Factory.StartNew.

Question:

Can someone please explain by example a scenario related directly to TaskCompletionSource and not to a hypothetical situation in which I don't have Task.Factory.StartNew?

Was it helpful?

Solution

I mostly use it when only an event based API is available (for example Windows Phone 8 sockets):

public Task<Args> SomeApiWrapper()
{
    TaskCompletionSource<Args> tcs = new TaskCompletionSource<Args>(); 

    var obj = new SomeApi();

    // will get raised, when the work is done
    obj.Done += (args) => 
    {
        // this will notify the caller 
        // of the SomeApiWrapper that 
        // the task just completed
        tcs.SetResult(args);
    }

    // start the work
    obj.Do();

    return tcs.Task;
}

So it's especially useful when used together with the C#5 async keyword.

OTHER TIPS

In my experiences, TaskCompletionSource is great for wrapping old asynchronous patterns to the modern async/await pattern.

The most beneficial example I can think of is when working with Socket. It has the old APM and EAP patterns, but not the awaitable Task methods that TcpListener and TcpClient have.

I personally have several issues with the NetworkStream class and prefer the raw Socket. Being that I also love the async/await pattern, I made an extension class SocketExtender which creates several extension methods for Socket.

All of these methods make use of TaskCompletionSource<T> to wrap the asynchronous calls like so:

    public static Task<Socket> AcceptAsync(this Socket socket)
    {
        if (socket == null)
            throw new ArgumentNullException("socket");

        var tcs = new TaskCompletionSource<Socket>();

        socket.BeginAccept(asyncResult =>
        {
            try
            {
                var s = asyncResult.AsyncState as Socket;
                var client = s.EndAccept(asyncResult);

                tcs.SetResult(client);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }

        }, socket);

        return tcs.Task;
    }

I pass the socket into the BeginAccept methods so that I get a slight performance boost out of the compiler not having to hoist the local parameter.

Then the beauty of it all:

 var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
 listener.Bind(new IPEndPoint(IPAddress.Loopback, 2610));
 listener.Listen(10);

 var client = await listener.AcceptAsync();

To me, a classic scenario for using TaskCompletionSource is when it's possible that my method won't necessarily have to do a time consuming operation. What it allows us to do is to choose the specific cases where we'd like to use a new thread.

A good example for this is when you use a cache. You can have a GetResourceAsync method, which looks in the cache for the requested resource and returns at once (without using a new thread, by using TaskCompletionSource) if the resource was found. Only if the resource wasn't found, we'd like to use a new thread and retrieve it using Task.Run().

A code example can be seen here: How to conditionally run a code asynchonously using tasks

In this blog post, Levi Botelho describes how to use the TaskCompletionSource to write an asynchronous wrapper for a Process such that you can launch it and await its termination.

public static Task RunProcessAsync(string processPath)
{
    var tcs = new TaskCompletionSource<object>();
    var process = new Process
    {
        EnableRaisingEvents = true,
        StartInfo = new ProcessStartInfo(processPath)
        {
            RedirectStandardError = true,
            UseShellExecute = false
        }
    };
    process.Exited += (sender, args) =>
    {
        if (process.ExitCode != 0)
        {
            var errorMessage = process.StandardError.ReadToEnd();
            tcs.SetException(new InvalidOperationException("The process did not exit correctly. " +
                "The corresponding error message was: " + errorMessage));
        }
        else
        {
            tcs.SetResult(null);
        }
        process.Dispose();
    };
    process.Start();
    return tcs.Task;
}

and its usage

await RunProcessAsync("myexecutable.exe");

It looks like no one mentioned, but I guess unit tests too can be considered real life enough.

I find TaskCompletionSource to be useful when mocking a dependency with an async method.

In actual program under test:

public interface IEntityFacade
{
  Task<Entity> GetByIdAsync(string id);
}

In unit tests:

// set up mock dependency (here with NSubstitute)

TaskCompletionSource<Entity> queryTaskDriver = new TaskCompletionSource<Entity>();

IEntityFacade entityFacade = Substitute.For<IEntityFacade>();

entityFacade.GetByIdAsync(Arg.Any<string>()).Returns(queryTaskDriver.Task);

// later on, in the "Act" phase

private void When_Task_Completes_Successfully()
{
  queryTaskDriver.SetResult(someExpectedEntity);
  // ...
}

private void When_Task_Gives_Error()
{
  queryTaskDriver.SetException(someExpectedException);
  // ...
}

After all, this usage of TaskCompletionSource seems another case of "a Task object that does not execute code".

TaskCompletionSource is used to create Task objects that don't execute code. In real world scenarios, TaskCompletionSource is ideal for I/O bound operations. This way, you get all the benefits of tasks (e.g. return values, continuations, etc) without blocking a thread for the duration of the operation. If your "function" is an I/O bound operation, it isn't recommended to block a thread using a new Task. Instead, using TaskCompletionSource, you can create a slave task to just indicate when your I/O bound operation finishes or faults.

There's a real world example with a decent explanation in this post from the "Parallel Programming with .NET" blog. You really should read it, but here's a summary anyway.

The blog post shows two implementations for:

"a factory method for creating “delayed” tasks, ones that won’t actually be scheduled until some user-supplied timeout has occurred."

The first implementation shown is based on Task<> and has two major flaws. The second implementation post goes on to mitigate these by using TaskCompletionSource<>.

Here's that second implementation:

public static Task StartNewDelayed(int millisecondsDelay, Action action)
{
    // Validate arguments
    if (millisecondsDelay < 0)
        throw new ArgumentOutOfRangeException("millisecondsDelay");
    if (action == null) throw new ArgumentNullException("action");

    // Create a trigger used to start the task
    var tcs = new TaskCompletionSource<object>();

    // Start a timer that will trigger it
    var timer = new Timer(
        _ => tcs.SetResult(null), null, millisecondsDelay, Timeout.Infinite);

    // Create and return a task that will be scheduled when the trigger fires.
    return tcs.Task.ContinueWith(_ =>
    {
        timer.Dispose();
        action();
    });
}

This may be oversimplifying things but the TaskCompletion source allows one to await on an event. Since the tcs.SetResult is only set once the event occurs, the caller can await on the task.

Watch this video for more insights:

http://channel9.msdn.com/Series/Three-Essential-Tips-for-Async/Lucian03-TipsForAsyncThreadsAndDatabinding

I real world scenario where I have used TaskCompletionSource is when implementing a download queue. In my case if the user starts 100 downloads I don't want to fire them all off at once and so instead of returning a strated task I return a task attached to TaskCompletionSource. Once the download gets completed the thread that is working the queue completes the task.

The key concept here is that I am decoupling when a client asks for a task to be started from when it actually gets started. In this case because I don't want the client to have to deal with resource management.

note that you can use async/await in .net 4 as long as you are using a C# 5 compiler (VS 2012+) see here for more details.

I've used TaskCompletionSource to run a Task until it is cancelled. In this case it's a ServiceBus subscriber that I normally want to run for as long as the application runs.

public async Task RunUntilCancellation(
    CancellationToken cancellationToken,
    Func<Task> onCancel)
{
    var doneReceiving = new TaskCompletionSource<bool>();

    cancellationToken.Register(
        async () =>
        {
            await onCancel();
            doneReceiving.SetResult(true); // Signal to quit message listener
        });

    await doneReceiving.Task.ConfigureAwait(false); // Listen until quit signal is received.
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top