Question

I wondered if it's possible to use a queue (specifically as ConcurrentQueue) as the source of an IObservable? Something like;

  Queue = new ConcurrentQueue<IMessage>();
  var xs = Queue.AsEnumerable().ToObservable();

  xs.Subscribe((IMessage msg) =>
     {
        Console.WriteLine("Msg :" + msg.subject);
     });

I guess it doesn't really make sense because nothing is being dequeued. I'm trying to implement a non-blocking process which can subscribe to "messages" being pushed to observers, hence the use of a queue. I'm sure I should be able to do this with RX, but can't seem to get my head around it!

I'd be interested in any suggestions on how this could be implemented. Thanks!

Was it helpful?

Solution

You're right, converting a Queue (concurrent or simple, doesn't matter) would only enumerate it, but not de-queue. "Real" implementation is possible, but more complex - see the link to a similar question I asked on the RX forum (which is still a better source of information on RX comparing to StackOverflow):

How to implement a single worker consumer producer queue using RX?

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top