Question

I need to split processing sequence (like in this question How to organize sequence of data processors with .net RX) into several computation units in Azure environment.
The idea is to serialize Observable sequence to Azure Queues(or Service Bus) and to deserialize it back.
If producer or consumer is failed other party should be able to continue producing/consuming.

Could anyone suggest an elegant way to do so and what to use (Azure Queues or Service Bus)?
Has anyone used TCP Observable provider - http://rxx.codeplex.com/wikipage?title=TCP%20Qbservable%20Provider for such problems is it safe to the failures of one of the parties?

Was it helpful?

Solution

Imagine you have a messaage queue with the following API

class MQ {

    public MQ();

    // send a single message from your message queue
    public void send(string keyPath, string msg);

    // Receive a single message from your message queue
    public async Task<string> receive(keyPath);

}

To make this RX compatible

class MQRX: IObserver<string> {
    MQ _mq;
    string _keyPath

    MQRX(string keyPath){
        _mq = mq;
        _keyPath = keyPath;
    }

    IObservable<string> Observe(){
        return Observable.Defer(()=> mq.receive(keyPath).ToObservable() ).Repeat();
    }

    void OnNext(string msg){
        _mq.send(msg);
    }

    void OnError(Exception e){
        // The message queue might not
        // support serializing exceptions
        // or it might or you might build
        // a protocol for it.
    }
}

To use it in a fault tolerant way. Note Retry will resubscribe if there is an exception thrown upstream delivered by OnError

new MQRX("users/1/article/2").
    Retry().
    Subscribe((msg)=>Console.Writeln(msg));

On the writing side for example you could send a message every two seconds and retry the subscription to the generator if there is an error. Note there is unlikely to be an error in Observable.Interval that just generates a message every time interval but imagine reading from a file or some other message queue.

var mq = new MQRX("users/1/article/2");

Observable.Interval(TimeSpan.FromSeconds(2)).
    Select((x)=>x.ToString()).

Note you probably should use the IObservable Catch extension method rather than blindly retrying as you might get the same error over and over again. Retry(). Subscribe(mq);

OTHER TIPS

I wrote my own UDP to RX wrapper in 30 lines of VB code and it handles timeouts. TCP wrappers would be similar I guess.

Imports System.Reactive
Imports System.Reactive.Linq
Imports System.Reactive.Threading.Tasks
Imports System.Threading
Imports System.Net
Imports System.Net.Sockets

Public Module UDP
    ''' <summary>
    ''' Generates an IObservable as a UDP stream on the IP endpoint with an optional
    ''' timeout value between results.
    ''' </summary>
    ''' <param name="timeout"></param>
    ''' <returns></returns>
    ''' <remarks></remarks>
    Public Function StreamObserver(
                localPort As Integer, 
                Optional timeout As Nullable(Of TimeSpan) = Nothing
                ) As IObservable(Of UdpReceiveResult)

        Return Linq.Observable.Using(Of UdpReceiveResult, UdpClient)(
            Function() New UdpClient(localPort),
            Function(udpClient)
                Dim o = Observable.Defer(Function() udpClient.
                                                                ReceiveAsync().
                                                                ToObservable())
                If Not timeout Is Nothing Then
                    Return Observable.Timeout(o.Repeat(), timeout.Value)
                Else
                    Return o.Repeat()
                End If
            End Function
        )
    End Function
End Module

You could do the same for the write side if you needed. Then you just use normal RX techniques for serializing your data into UDP frames.

new UDP.StreamObserver(8080, TimeSpan.FromSeconds(2)).
        Select( function(udpResult) MyDeserializeFunction(udpResult)).
        Subscribe( sub (result) DoSomething(result), 
                   sub(error) DoSomethingWithError ) 
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top