How do I execute multiple Tasks using Task Parallel Library and return after the first one to actually return data

StackOverflow https://stackoverflow.com/questions/9756241

سؤال

I am working on an application in which getting data back to a serial process as fast as possible is important, but there can be multiple sources to get that data from. As well, sometimes one source is faster than the other, but you don't know which source that will be. I am using ContinueWhenAny(...).Wait() to await the first Task to end in order to continue and return out of the calling method. However, I need to check the validity of the data first, and only then return (or if all Tasks have finished and none of them have valid data). Right now my code will return even invalid data if that's the Task that finishes first.

Is there a way to do something like "ContinueWhenAny" but only when the Task.Result meets a certain condition, otherwise wait for the next task/etc.. until the last task finishes?

As well, I need to make sure after one result is valid, that the other threads Cancel out. This part is already working fine.

Currently, my code looks like this (stripped of exception handling, just the nuts and bolts):

        ResultObject result = null;
        var tokenSource = new CancellationTokenSource();
        var tasks = listOfSources
                .Select(i => Task.Factory.StartNew(
                    () =>
                        {
                            i.CancellationToken = tokenSource.Token;
                            //Database Call
                            return i.getData(inputparameters);
                        }, tokenSource.Token));

        Task.Factory.ContinueWhenAny(
                tasks.ToArray(),
                firstCompleted =>
                    {
                        //This is the "result" I need to validate before setting and canceling the other threads
                        result = firstCompleted.Result;
                        tokenSource.Cancel();
                    }).Wait();
        return result;

Any ideas? I don't want to use ContinueWhenAll since if the first call takes 2 seconds and the second takes 10 seconds, I want to get back to the serial process in 2 seconds if the first call returns valid data, otherwise wait 10 seconds, hope that result has valid data, and only return invalid data if all Tasks have completed and return an invalid result.

--------- UPDATE ---- Thanks zmbq for the great idea. The updated (working) code is below and fulfills all my requirements. One caveat however, the difference between this code and the previous code is that this code will return a null result if none of the Tasks produce a valid result, rather than the previous code which returns the invalid result itself. It wouldn't be difficult to change this version to do so too, but I'm perfectly content returning a null in that case for my purposes.

        ResultObject result = null;
        var tokenSource = new CancellationTokenSource();
        var tasks = listOfSources
                .Select(i => Task.Factory.StartNew(
                    () =>
                        {
                            i.CancellationToken = tokenSource.Token;
                            //Database Call
                            return i.getData(inputparameters);
                        }, tokenSource.Token)).ToArray();

        result = GetFirstValidResult(tokenSource,tasks);

        return result;


   private ResultObject GetFirstValidResult(CancellationTokenSource tokenSource, Task<ResultObject>[] tasks)
    {
        ResultObject result = null;
        Task.Factory.ContinueWhenAny(
            tasks,
            firstCompleted =>
                {
                    var testResult = firstCompleted.Result;
                    if(testResult != null && testResult.IsValid())
                    {
                        result = testResult;
                        tokenSource.Cancel();
                    }
                    else
                    {
                        var remainingTasks = tasks.Except(new[]{firstCompleted}).ToArray();
                        if(remainingTasks.Any())
                        {
                            result = GetFirstValidResult(tokenSource, remainingTasks);
                        } 
                    }
                }).Wait();
        return result;
    }
هل كانت مفيدة؟

المحلول

Well, if your firstCompleted callback would check the result, and call ContinueWhenAny on the remaining tasks in case the result is illegal, you'd be done.

As always, I recommend you take a look at ZeroMQ. Fire the tasks and have each task write a message to an output queue if its result is legal. The main thread will block on the queue and return when there's a valid message, or when all the tasks have finished already.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top