Question

I have a use-case wherein i want to insert and remove custom object (Stocks) from a blocking collection (larger picture being producer consumer queue).

The problem statement is precisely similar to this thread - update an ObservableCollection with a BlockingCollection

I dont want to use reactive extensions but want a traditional C# way to do this logic (this is a hard requirement unfortunately and completely understand the implications). My snippet of code is here

MainWindowViewModel.cs

public class MainWindow_VM : ViewModelBase
{
    public ObservableCollection<StockModel> stocks { get; set; }
    private readonly Dispatcher currentDispatcher;
    private BlockingCollection<StockModel> tasks = new BlockingCollection<StockModel>();
    #endregion

// All other standard ViewModel logic - Constructor, Command etc

    private void handlermethod(object sender, MarketDataEventArgs e)
    {
        Task.Factory.StartNew(AddUpdateObservableCollection);

        // Below thought process (maybe wrong) - How do i add the value to the BlockingCollection through a thread considering I have a ProducerConsumer class standard implementation (which has Enqueue and Dequeue Methods)
        using (ProducerConsumerQueue q = new ProducerConsumerQueue())
        {
              foreach (Stock s in e.updatedstock)
              {
                  StockModel sm = new StockModel();
                  sm.Symbol = s.Symbol;
                  sm.Bidprice = s.Bidprice;

                  q.EnqueueTask(s); 
               }
    }

    private void AddUpdateObservableCollection()
    {
        //Signalling mechanism still missing - when Stock comes into BlockingCollection - then this will start draining.
       // Also have to take care of Dispatcher stuff since you can only update ObservableCollection through Dispatcher

        foreach (StockModel sm in tasks)
        {
            if (sm != null)
            {
                if (stocks.Any(x => x.Symbol == sm.Symbol))
                {
                    var found = stocks.FirstOrDefault(x => x.Symbol == sm.Symbol);
                    int i = stocks.IndexOf(found);
                    stocks[i] = sm;
                }
                else
                {
                    stocks.Add(sm);
                }
            }
        }
    }
}
Was it helpful?

Solution

It looks like the problem is that your AddUpdateObservableCollection method sees that the tasks collection is empty, and then exits.

If you want it to be persistent, replace your foreach with:

foreach (StockModel sm in tasks.GetConsumingEnumerable())

The enumerator you get from GetConsumingEnumerable will block waiting on items to be added to the queue, and will continue to block until the other thread marks the collection as complete for adding. So when you're done adding things to the queue and want the AddUpdateObservableCollection method to exit, just call tasks.CompleteAdding(). The loop will empty the queue, see that no more items are forthcoming (because the IsAddingCompleted property is true), and exit.

I'm a little confused about how you're getting things into the blocking collection, though. Your code snippet isn't too clear. If you're having trouble with that, edit your question and explain what the problem is. The comment in the code for handlermethod is pretty cryptic.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top