Question

I've got a simple messaging framework built around a single instance of "Host" attached to multiple instances of "Client". Each instance communicates with the other by pushing messages to it. The instances then process their own messages in the order which they were received using a separate task.

Here's a simplified version of the code:

interface IMessage
{
}

class Host
{
    ConcurrentBag<Client> _Clients = new ConcurrentBag<Client>();
    BlockingCollection<IMessage> _IncomingMessages = new BlockingCollection<IMessage>();

    public Host()
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var message in _IncomingMessages.GetConsumingEnumerable())
            {
                ProcessIncomingMessage(message);
            }
        });
    }

    public void AddClient(Client client)
    {
        _Clients.Add(client);
    }

    private void ProcessIncomingMessage(IMessage message)
    {
        // consume the message and typically generate new messages
        // For now just echo the message back
        Broadcast(message);  
    }

    private void Broadcast(IMessage message)
    {
        foreach (var client in _Clients)
        {
            client.PushMessage(message);
        }
    }
}


class Client
{
    private Host _Host;
    private BlockingCollection<IMessage> _IncomingMessages = new BlockingCollection<IMessage>();

    public Client(Host host)
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var message in _IncomingMessages.GetConsumingEnumerable())
            {
                ProcessIncomingMessage(message);
            }
        });

        _Host = host;
        _Host.AddClient(this);
    }

    public void PushMessage(IMessage message)
    {
        _IncomingMessages.Add(message);
    }

    private void ProcessIncomingMessage(IMessage message)
    {
        // interpret the message and update state
    }
}

My requirements are that each instance processing their messages make use of parallel execution (if possible) and that each message be processed in the order it was received one at a time per class.

Is it a bad idea to spawn new tasks which just pull from GetConsumingEnumerable()?

I feel like I could simplify this using reactive extensions and IObserveable but I'm still fairly new to using Rx and am unsure how it would be structured to meet my parallel execution needs.

Any guidance would be much appreciated!

Was it helpful?

Solution

You can certainly make your life a lot easier by using Rx (specifically, IObservable) as the intermediary for passing messages to the Host, and then passing messages from the Host to all the clients. The two changes to the code (you can do one or the other, or both) would be to pass in an IObservable<IMessage> to Host as it's input and have it produce an IObservable<IMessage>, and similarly to pass in an IObservable<IMessage> to Client.

From what I can see, the Host and Client don't need to know about each other (which is excellent). This will not work if the Host has to actively manage the clients (kick them off for example) - in these cases the below code would need to be changed to have the Host manage client subscriptions, which is not significant, but would certainly involve clients requesting subscriptions from the host, and the host holding the resulting IDisposable.

Also, the below doesn't handle errors or unsubscriptions in general, which can be added without much difficulty. It is very much a skeleton with equivalent functionality to yours.

class Host
{
    private Subject<IMessage> _outbound;

    public Host(IObservable<IMessage> messages)
    {
        _outbound = new Subject<IMessage>();        
        messages.SubscribeOn(Scheduler.TaskPool).Subscribe(ProcessIncomingMessage);
    }

    private void ProcessIncomingMessage(IMessage message)
    {
        _outbound.OnNext(message); // just echo
    }

    public IObservable<IMessage> Messages { get { return _outbound.AsObservable(); } }
}

class Client
{
    public Client(IObservable<IMessage> messages)
    {
        messages.SubscribeOn(Scheduler.TaskPool).Subscribe(ProcessIncomingMessage);
    }

    private void ProcessIncomingMessage(IMessage message)
    {
        // interpret the message and update state
    }
}

Then usage is simple:

var source = new Subject<long>();

var host = new Host(source);
var client1 = new Client(host.Messages);
var client2 = new Client(host.Messages);
var client3 = new Client(host.Messages);

source.OnNext(1); // assuming each of these is an IMessage
source.OnNext(2);
source.OnNext(3);

Of course you can use any IObservable source to drive the Host.

Note that if the processing of messages is 'long running', you want to use Scheduler.NewThread in the SubscribeOn calls. Otherwise each message is currently getting added to the Task Pool to be processed.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top