Вопрос

Мне интересно, можно ли использовать очередь (в частности, ConcurrentQueue) в качестве источника IObservable?Что-то вроде;

  Queue = new ConcurrentQueue<IMessage>();
  var xs = Queue.AsEnumerable().ToObservable();

  xs.Subscribe((IMessage msg) =>
     {
        Console.WriteLine("Msg :" + msg.subject);
     });

Я думаю, это не имеет смысла, потому что ничего не удаляется из очереди.Я пытаюсь реализовать неблокирующий процесс, который может подписываться на «сообщения», отправляемые наблюдателям, следовательно, использование очереди.Я уверен, что смогу сделать это с помощью RX, но, похоже, не могу уложиться в этом!

Мне были бы интересны любые предложения о том, как это можно реализовать.Спасибо!

Это было полезно?

Решение

Вы правы, преобразование очереди (параллельной или простой, не имеет значения) будет только перечислять ее, но не удалять из очереди.«Реальная» реализация возможна, но более сложная — см. ссылку на аналогичный вопрос, который я задал на форуме RX (который по-прежнему является лучшим источником информации о RX по сравнению со StackOverflow):

Как реализовать единую очередь производителей-потребителей-работников с помощью RX?

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top