Can many instances of an async task share a reference to a concurrent collection and add items concurrently to it in C#?

StackOverflow https://stackoverflow.com/questions/18059904

Domanda

I'm just beginning to learn C# threading and concurrent collections, and am not sure of the proper terminology to pose my question, so I'll describe briefly what I'm trying to do. My grasp of the subject is rudimentary at best at this point. Is my approach below even feasible as I've envisioned it?

  1. I have 100,000 urls in a Concurrent collection that must be tested--is the link still good? I have another concurrent collection, initially empty, that will contain the subset of urls that an async request determines to have been moved (400, 404, etc errors).

  2. I want to spawn as many of these async requests concurrently as my PC and our bandwidth will allow, and was going to start at 20 async-web-request-tasks per second and work my way up from there.

Would it work if a single async task handled both things: it would make the async request and then add the url to the BadUrls collection if it encountered a 4xx error? A new instance of that task would be spawned every 50ms:

     class TestArgs args {
        ConcurrentBag<UrlInfo> myCollection  { get; set; }
        System.Uri currentUrl  { get; set; }
     }

      ConcurrentQueue<UrlInfo> Urls = new ConncurrentQueue<UrlInfo>();
        // populate the Urls queue
        <snip>

     // initialize the bad urls collection  
      ConcurrentBag<UrlInfo> BadUrls = new ConcurrentBag<UrlInfo>();


      // timer fires every 50ms, whereupon a new args object is created
      //  and the timer callback spawns a new task; an autoEvent would
      // reset the timer and dispose of it when the queue was empty


       void SpawnNewUrlTask(){
           // if queue is empty then reset the timer
           // otherwise:
           TestArgs args = {            
               myCollection = BadUrls,              
                currentUrl = getNextUrl()  // take an item from the queue
           };
           Task.Factory.StartNew( asyncWebRequestAndConcurrentCollectionUpdater, args);
       }



       public async Task asyncWebRequestAndConcurrentCollectionUpdater(TestArgs args) 
       {
           //make the async web request 
           // add the url to the bad collection if appropriate.  
       } 

Feasible? Way off?

È stato utile?

Soluzione

The approach seems fine, but there are some issues with the specific code you've shown.

But before I get to that, there have been suggestions in the comments that Task Parallelism is the way to go. I think that's misguided. There's a common misconception that if you want to have lots of work going on in parallel, you necessarily need lots of threads. That's only true if the work is compute-bound. But the work you're doing will be IO bound - this code is going to spend the vast majority of its time waiting for responses. It will do very little computation. So in practice, even if it only used a single thread, your initial target of 20 requests per second doesn't seem like a workload that would cause a single CPU core to break into a sweat.

In short, a single thread can handle very high levels of concurrent IO. You only need multiple threads if you need parallel execution of code, and that doesn't look likely to be the case here, because there's so little work for the CPU in this particular job.

(This misconception predates await and async by years. In fact, it predates the TPL - see http://www.interact-sw.co.uk/iangblog/2004/09/23/threadless for a .NET 1.1 era illustration of how you can handle thousands of concurrent requests with a tiny number of threads. The underlying principles still apply today because Windows networking IO still basically works the same way.)

Not that there's anything particularly wrong with using multiple threads here, I'm just pointing out that it's a bit of a distraction.

Anyway, back to your code. This line is problematic:

Task.Factory.StartNew( asyncWebRequestAndConcurrentCollectionUpdater, args);

While you've not given us all your code, I can't see how that will be able to compile. The overloads of StartNew that accept two arguments require the first to be either an Action, an Action<object>, a Func<TResult>, or a Func<object,TResult>. In other words, it has to be a method that either takes no arguments, or accepts a single argument of type object (and which may or may not return a value). Your 'asyncWebRequestAndConcurrentCollectionUpdater' takes an argument of type TestArgs.

But the fact that it doesn't compile isn't the main problem. That's easily fixed. (E.g., change it to Task.Factory.StartNew(() => asyncWebRequestAndConcurrentCollectionUpdater(args));) The real issue is what you're doing is a bit weird: you're using Task.StartNew to invoke a method that already returns a Task.

Task.StartNew is a handy way to take a synchronous method (i.e., one that doesn't return a Task) and run it in a non-blocking way. (It'll run on the thread pool.) But if you've got a method that already returns a Task, then you didn't really need to use Task.StartNew. The weirdness becomes more apparent if we look at what Task.StartNew returns (once you've fixed the compilation error):

Task<Task> t = Task.Factory.StartNew(
    () => asyncWebRequestAndConcurrentCollectionUpdater(args));

That Task<Task> reveals what's happening. You've decided to wrap a method that was already asynchronous with a mechanism that is normally used to make non-asynchronous methods asynchronous. And so you've now got a Task that produces a Task.

One of the slightly surprising upshots of this is that if you were to wait for the task returned by StartNew to complete, the underlying work would not necessarily be done:

t.Wait(); // doesn't wait for asyncWebRequestAndConcurrentCollectionUpdater to finish!

All that will actually do is wait for asyncWebRequestAndConcurrentCollectionUpdater to return a Task. And since asyncWebRequestAndConcurrentCollectionUpdater is already an async method, it will return a task more or less immediately. (Specifically, it'll return a task the moment it performs an await that does not complete immediately.)

If you want to wait for the work you've kicked off to finish, you'll need to do this:

t.Result.Wait();

or, potentially more efficiently, this:

t.Unwrap().Wait();

That says: get me the Task that my async method returned, and then wait for that. This may not be usefully different from this much simpler code:

Task t = asyncWebRequestAndConcurrentCollectionUpdater("foo");
... maybe queue up some other tasks ...
t.Wait();

You may not have gained anything useful by introducing `Task.Factory.StartNew'.

I say "may" because there's an important qualification: it depends on the context in which you start the work. C# generates code which, by default, attempts to ensure that when an async method continues after an await, it does so in the same context in which the await was initially performed. E.g., if you're in a WPF app and you await while on the UI thread, when the code continues it will arrange to do so on the UI thread. (You can disable this with ConfigureAwait.)

So if you're in a situation in which the context is essentially serialized (either because it's single-threaded, as will be the case in a GUI app, or because it uses something resembling a rental model, e.g. the context of an particular ASP.NET request), it may actually be useful to kick an async task off via Task.Factory.StartNew because it enables you to escape the original context. However, you just made your life harder - tracking your tasks to completion is somewhat more complex. And you might have been able to achieve the same effect simply by using ConfigureAwait inside your async method.

And it may not matter anyway - if you're only attempting to manage 20 requests a second, the minimal amount of CPU effort required to do that means that you can probably manage it entirely adequately on one thread. (Also, if this is a console app, the default context will come into play, which uses the thread pool, so your tasks will be able to run multithreaded in any case.)

But to get back to your question, it seems entirely reasonable to me to have a single async method that picks a url off the queue, makes the request, examines the response, and if necessary, adds an entry to the bad url collection. And kicking the things off from a timer also seems reasonable - that will throttle the rate at which connections are attempted without getting bogged down with slow responses (e.g., if a load of requests end up attempting to talk to servers that are offline). It might be necessary to introduce a cap for the maximum number of requests in flight if you hit some pathological case where you end up with tens of thousands of URLs in a row all pointing to a server that isn't responding. (On a related note, you'll need to make sure that you're not going to hit any per-client connection limits with whichever HTTP API you're using - that might end up throttling the effective throughput.)

You will need to add some sort of completion handling - just kicking off asynchronous operations and not doing anything to handle the results is bad practice, because you can end up with exceptions that have nowhere to go. (In .NET 4.0, these used to terminate your process, but as of .NET 4.5, by default an unhandled exception from an asynchronous operation will simply be ignored!) And if you end up deciding that it is worth launching via Task.Factory.StartNew remember that you've ended up with an extra layer of wrapping, so you'll need to do something like myTask.Unwrap().ContinueWith(...) to handle it correctly.

Altri suggerimenti

Of course you can. Concurrent collections are called 'concurrent' because they can be used... concurrently by multiple threads, with some warranties about their behaviour.

A ConcurrentQueue will ensure that each element inserted in it is extracted exactly once (concurrent threads will never extract the same item by mistake, and once the queue is empty, all the items have been extracted by a thread).

EDIT: the only thing that could go wrong is that 50ms is not enough to complete the request, and so more and more tasks cumulate in the task queue. If that happens, your memory could get filled, but the thing would work anyway. So yes, it is feasible.

Anyway, I would like to underline the fact that a task is not a thread. Even if you create 100 tasks, the framework will decide how many of them will be actually executed concurrently.

If you want to have more control on the level of parallelism, you should use asynchronous requests. In your comments, you wrote "async web request", but I can't understand if you wrote async just because it's on a different thread or because you intend to use the async API. If you were using the async API, I'd expect to see some handler attached to the completion event, but I couldn't see it, so I assumed you're using synchronous requests issued from an asynchronous task. If you're using asynchronous requests, then it's pointless to use tasks, just use the timer to issue the async requests, since they are already asynchronous.

When I say "asynchronous request" I'm referring to methods like WebRequest.GetResponseAsync and WebRequest.BeginGetResponse.

EDIT2: if you want to use asynchronous requests, then you can just make requests from the timer handler. The BeginGetResponse method takes two arguments. The first one is a callback procedure, that will be called to report the status of the request. You can pass the same procedure for all the requests. The second one is an user-provided object, which will store status about the request, you can use this argument to differentiate among different requests. You can even do it without the timer. Something like:

private readonly int desiredConcurrency = 20;

struct RequestData
{
  public UrlInfo url;
  public HttpWebRequest request;
}

/// Handles the completion of an asynchronous request
/// When a request has been completed,
/// tries to issue a new request to another url.
private void AsyncRequestHandler(IAsyncResult ar)
{
  if (ar.IsCompleted)
  {
    RequestData data = (RequestData)ar.AsyncState;
    HttpWebResponse resp = data.request.EndGetResponse(ar);
    if (resp.StatusCode != 200)
    {
      BadUrls.Add(data.url);
    }

    //A request has been completed, try to start a new one
    TryIssueRequest();
  }
}

/// If urls is not empty, dequeues a url from it
/// and issues a new request to the extracted url.
private bool TryIssueRequest()
{
  RequestData rd;
  if (urls.TryDequeue(out rd.url))
  {
    rd.request = CreateRequestTo(rd.url); //TODO implement
    rd.request.BeginGetResponse(AsyncRequestHandler, rd);
    return true;
  }
  else
  {
    return false;
  }
}

//Called by a button handler, or something like that
void StartTheRequests()
{
  for (int requestCount = 0; requestCount < desiredConcurrency; ++requestCount)
  {
    if (!TryIssueRequest()) break;
  }
}
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top