Question

I am trying to implement the following requirements (C# 4.0);

  • One "producer" (on the UI thread - driven by user action) which submits upload requests
  • A ListView control which is databound to the collection of pending upload requests
  • One consumer which process the requests (FIFO), and keeps the UI up-to-date as items status' update, or the request is fulfilled.

So far, I haven't been able to work out how to do the above without using two collections, as follows;

public void AddUploadRequest(UploadRequest uploadRequest)
{
    this.UploadRequests.Add(uploadRequest);
    this.uploadRequestsBlocking.Add(uploadRequest);
}

... where UploadRequests is an ObservableCollection<UploadRequest> and uploadRequestsBlocking is a BlockingCollection<UploadRequest>

The full code can be found here; http://pastebin.com/620EqaY5 (ignore the gross injection of Dispatcher - this is just prototype code so far)

I notify my UI of updates as follows;

this.dispatcher.Invoke(() => this.UploadRequests.Remove(uploadRequest));

Is there a better way to achieve this functionality? Or, more importantly, are there any serious drawbacks to this approach?

One possible extension I was considering was to have multiple consumers, by using Parallel.ForEach over GetConsumingPartitioner (based on this example). Would anything about the current approach make this unsuitable? It does work OK, but I'm not 100% confident that I haven't committed some major threading faux-pas somewhere along the way.

Was it helpful?

Solution

The main (only) reason that you can't just databind to the BlockingCollection is that it does not implement INotifyCollectionChanged, so the UI would not get updated when a request is added / removed. That said, there are BlockingCollection wrappers around on the net that implement INotifyCollectionChanged. Or you could instead expose a CollectionView for the BlockingCollection and instead of dispatching a remove from an ObservableCollection, just dispatch a Refresh of the CollectionView.

    public ICollectionView UploadRequestsView {get;set;}

    public UploadRequester(Dispatcher dispatcher)
    {
        this.dispatcher = dispatcher;           
        this.uploadRequestsBlocking = new BlockingCollection<UploadRequest>();

        UploadRequestsView = CollectionViewSource.GetDefaultView(uploadRequestsBlocking);

        this.consumerTask = Task.Factory.StartNew(this.ConsumeUploadRequests);
    }

    public void AddUploadRequest(UploadRequest uploadRequest)
    {
        uploadRequestsBlocking.Add(uploadRequest);
        UploadRequestsView.Refresh()
    }

    private void ConsumeUploadRequests()
    {
        foreach (var uploadRequest in this.uploadRequestsBlocking.GetConsumingEnumerable())
        {
            uploadRequest.Status = "Uploading...";

            Thread.Sleep(2000);
            uploadRequest.Status = "Successfully uploaded";

            Thread.Sleep(500);
            dispatcher.Invoke(() => UploadRequestsView.Refresh());
        }
    }

OTHER TIPS

I would use TPL and no need for injecting the Dispatcher since TaskScheduler.FromCurrentSynchronizationContext() will give me the current Dispatcher of the MainUI thread.

public class UploadRequestProcessor
{
        public ObservableCollection<UploadRequest> UploadRequests { get; private set; }
        private readonly BlockingCollection<UploadRequest> uploadRequestsBlocking;
        private Task consumerTask;

        public UploadRequester()
        {
                this.UploadRequests = new ObservableCollection<UploadRequest>();
                this.uploadRequestsBlocking = new BlockingCollection<UploadRequest>();
                this.consumerTask = Task.Factory.StartNew(this.ConsumeUploadRequests).ContinueWith(nextTask => RemoveUploadRequests(nextTask.Result as BlockingCollection<UploadRequest>), TaskScheduler.FromCurrentSynchronizationContext());
        }

        public void AddUploadRequest(UploadRequest uploadRequest)
        {
                this.UploadRequests.Add(uploadRequest);
                this.uploadRequestsBlocking.Add(uploadRequest);
        }

        private void ConsumeUploadRequests()
        {
            var requestsToRemove = new BlockingCollection<UploadRequest>();
                foreach (var uploadRequest in this.uploadRequestsBlocking.GetConsumingEnumerable())
                {
                        uploadRequest.Status = "Uploading...";

                        Thread.Sleep(2000);
                        uploadRequest.Status = "Successfully uploaded";

                        Thread.Sleep(500);
                        requestsToRemove.Add(uploadRequest);

                }
            return requestsToRemove;
        }

        private void RemoveUploadRequests(BlockingCollection<UploadRequest> requestsToRemove)
        {
            foreach(var request in requestsToRemove)
            {
                UploadRequests.Remove(request);
            }
        }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top