Question

Let's consider the method:

Task Foo(IEnumerable items, CancellationToken token)
{
    return Task.Run(() =>
    {
        foreach (var i in items)
            token.ThrowIfCancellationRequested();

    }, token);
}

Then I have a consumer:

var cts = new CancellationTokenSource();
var task = Foo(Items, cts.token);
task.Wait();

And the example of Items:

IEnumerable Items
{
    get
    {
        yield return 0;
        Task.Delay(Timeout.InfiniteTimeSpan).Wait();
        yield return 1;
    }
}

What about task.Wait? I cannot put my cancel token into collection of items.

How to kill the not responding task or get around this?

Was it helpful?

Solution 7

My previous solution was based on an optimistic assumption that the enumerable is likely to not hang and is quite fast. Thus we could sometimes sucrifice one thread of the system's thread pool? As Dax Fohl pointed out, the task will be still active even if its parent task has been killed by cancel exception. And in this regard, that could chock up the underlying ThreadPool, which is used by default task scheduler, if several collections have been frozen indefinitely.

Consequently I have refactored ToCancellable method:

public static IEnumerable<T> ToCancellable<T>(this IEnumerable<T> @this, CancellationToken token)
{
    var enumerator = @this.GetEnumerator();
    var state = new State();

    for (; ; )
    {
        token.ThrowIfCancellationRequested();

        var thread = new Thread(s => { ((State)s).Result = enumerator.MoveNext(); }) { IsBackground = true, Priority = ThreadPriority.Lowest };
        thread.Start(state);

        try
        {
            while (!thread.Join(10))
                token.ThrowIfCancellationRequested();
        }
        catch (OperationCanceledException)
        {
            thread.Abort();
            throw;
        }

        if (!state.Result)
            yield break;

        yield return enumerator.Current;
    }
}

And a helping class to manage the result:

class State
{
    public bool Result { get; set; }
}

It is safe to abort a detached thread.

The pain, that I see here is a thread creation which is heavy. That could be solved by using custom thread pool along with producer-consumer pattern that will be able to handle abort exceptions in order to remove broken thread from the pool.

Another problem is at Join line. What is the best pause here? Maybe that should be in user charge and shiped as a method argument.

OTHER TIPS

I found one solution that allows to put cancellation token into Items originating from thid parties:

public static IEnumerable<T> ToCancellable<T>(this IEnumerable<T> @this, CancellationToken token)
{
    var enumerator = @this.GetEnumerator();

    for (; ; )
    {
        var task = Task.Run(() => enumerator.MoveNext(), token);
        task.Wait(token);

        if (!task.Result)
            yield break;

        yield return enumerator.Current;
    }
}

Now I need to use:

Items.ToCancellable(cts.token)

And that will not hang after cancel request.

You can't really cancel a non-cancellable operation. Stephen Toub goes into details in "How do I cancel non-cancelable async operations?" on the Parallel FX Team's blog but the essence is that you need to understand what you actually want to do?

  1. Stop the asynchronous/long-running operation itself? Not doable in a cooperative way, if you don't have a way to signal the operation
  2. Stop waiting for the operation to finish, ignoring any results? That's doable, but can lead to unreliability for obvious reasons. You can start a Task with the long operation passing a cancellation token, or use a TaskCompletionSource as Stephen Toub describes.

You need to decide which behavior you want to find the proper solution

Why can't you pass the CancellationToken to Items()?

IEnumerable Items(CancellationToken ct)
{
    yield return 0;
    Task.Delay(Timeout.InfiniteTimeSpan, ct).Wait();
    yield return 1;
}

You would have to pass the same token to Items() as you pass to Foo(), of course.

Try using a TaskCompletionSource and returning that. You can then set the TaskCompletionSource to the result (or the error) of the inner task if it runs to completion (or faults). But you can set it to canceled immediately if the CancellationToken gets triggered.

Task<int> Foo(IEnumerable<int> items, CancellationToken token)
{
    var tcs = new TaskCompletionSource<int>();
    token.Register(() => tcs.TrySetCanceled());
    var innerTask = Task.Factory.StartNew(() =>
    {
        foreach (var i in items)
            token.ThrowIfCancellationRequested();
        return 7;
    }, token);
    innerTask.ContinueWith(task => tcs.TrySetResult(task.Result), TaskContinuationOptions.OnlyOnRanToCompletion);
    innerTask.ContinueWith(task => tcs.TrySetException(task.Exception), TaskContinuationOptions.OnlyOnFaulted);
    return tcs.Task;
}

This won't actually kill the inner task, but it'll give you a task that you can continue from immediately on cancellation. To kill the inner task since it's hanging out in an infinite timeout, I believe the only thing you can do is to grab a reference to Thread.CurrentThread where you start the task, and then call taskThread.Abort() from within Foo, which of course is bad practice. But in this case your question really comes down to "how can I make a long running function terminate without having access to the code", which is only doable via Thread.Abort.

Can you have Items be IEnumerable<Task<int>> instead of IEnumerable<int>? Then you could do

return Task.Run(() =>
{
    foreach (var task in tasks)
    {
        task.Wait(token);
        token.ThrowIfCancellationRequested();
        var i = task.Result;
    }
}, token);

Although something like this may be more straightforward to do using Reactive Framework and doing items.ToObservable. That would look like this:

static Task<int> Foo(IEnumerable<int> items, CancellationToken token)
{
    var sum = 0;
    var tcs = new TaskCompletionSource<int>();
    var obs = items.ToObservable(ThreadPoolScheduler.Instance);
    token.Register(() => tcs.TrySetCanceled());
    obs.Subscribe(i => sum += i, tcs.SetException, () => tcs.TrySetResult(sum), token);
    return tcs.Task;
}

How about creating a wrapper around the enumerable that is itself cancellable between items?

IEnumerable<T> CancellableEnum<T>(IEnumerable<T> items, CancellationToken ct) {
    foreach (var item in items) {
        ct.ThrowIfCancellationRequested();
        yield return item;
    }
}

...though that seems to be kind of what Foo() already does. If you have some place where this enumerable blocks literally infinitely (and it's not just very slow), then what you would do is add a timeout and/or a cancellation token to the task.Wait() on the consumer side.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top