Question

I am trying to apply Rx methods to an existing application that uses Tasks and Events. Thanks to a previous question I made significant headway but now have a scenario that I am not sure how to tackle in Rx so I thought I would ask for advice.

Here's the situation:

Step 1) Retrieve list of items each containing an identifier. This is a finite list which will complete (e.g. web service call returning data). I have this as an IObservable at the moment.

Step 2) For each unique identifier from Step 1) retrieve secondary information from a different source. It is possible and more efficient to get this secondary info for multiple identifiers in a single call (e.g a second web service).

Step 3) Combine the information into a single IObservable

I do not want to make a call in step 2 for something I have already requested.

I'm sure there must be a really elegant solution using Rx but I'm not familiar enough yet with Rx to nail it down. Any ideas would be welcome.

Regards Alan

Was it helpful?

Solution

OK, so - I'm cueing off the comment you made "Being new to Rx..." in Paul's answer.

Your problem is quite easy if the initial list of results comes back all at once in a single result as a List<T>. So I'm going assume it doesn't - it comes back as a stream of items which may include repeated information.

Scenario

Here's the contrived scenario: a service returns a stream of company names, which may have repeated names. Using a secondary service, you need to look up the stock symbol for each company, but just once. You want to make the look-up in batches for efficiency if possible. For each company you want to subscribe to it's prices using it's stock symbol from a third service.

You ultimately want to convert the stream of company names into a single stream containing the combined price ticker of all companies.

I'm not sure how practical what follows is in your case or in general, but it was an interesting diversion none the less, and hopefully educational!

Mocked Up Services

Here are some mocked-up service implementations to call - which you should be able to dump into the Program class of a Console app to try out. There's a lot to set up to mock this out, but the solution at the bottom which uses all this is really quite short:

First, here's a StockInfo type to capture company stock information, any a mini-database of examples:

public class StockInfo
{
    public static List<StockInfo> StockDatabase = new List<StockInfo> {
        new StockInfo { Symbol = "MSFT", Name = "Microsoft" },    
        new StockInfo { Symbol = "GOOG", Name = "Google" },
        new StockInfo { Symbol = "APPL", Name = "Apple" },
        new StockInfo { Symbol = "YHOO", Name = "Yahoo" },
        new StockInfo { Symbol = "DELL", Name = "Dell" },
    };

    public string Symbol { get; set; }        
    public string Name { get; set; }
}

Here's a service method that take a list of names and returns a list of StockInfo for them:

public static Task<List<StockInfo>> GetStockInfo(IList<string> symbols)
{
    return Task.Run(() => 
    {
        var results = from s in symbols
                      join i in StockInfo.StockDatabase
                      on s equals i.Name
                      select i;

        return results.ToList();                  
    });
}  

Here's a type to hold a stock price for a given StockInfo:

public class StockPrice
{
    public StockInfo StockInfo { get; set; }
    public double Price { get; set; }

    public override string ToString()
    {
        return StockInfo.Symbol + ":" + Price;
    }
}

This service call returns stream of prices at random intervals for a given StockInfo. Note we dump out the subscription request to the console; if our solution works, you'll never see a subscription request for the same company more than once:

private static Random random = new Random();

public static IObservable<StockPrice> GetPrices(StockInfo stockInfo)
{    
    return Observable.Create<StockPrice>(o =>
    {      
        Console.WriteLine("Subscribed for " + stockInfo.Symbol);

        return Scheduler.Default.ScheduleAsync(
            (Func<IScheduler, CancellationToken, Task>)(
            async (ctrl, ct) =>
        {
            double price = random.Next(1, 10);
            while(true)
            {
                await ctrl.Sleep(TimeSpan.FromSeconds(random.NextDouble() * 10));
                price += Math.Round(random.NextDouble() - 0.5d, 2);
                var stockPrice = new StockPrice {
                    StockInfo = stockInfo,
                    Price = price
                };
                o.OnNext(stockPrice);
            };  
        }));                       
    });
}

Here's the initial service call to gradually return some company names, with a few duplicates (this is all so contrived - I mean why wouldn't you do the de-duping on the service side!):

public static IObservable<string> GetStockNames()
{
    var exampleResults = new List<string> {
        "Microsoft",
        "Google",
        "Apple",
        "Microsoft",
        "Google",
        "Yahoo",
        "Microsoft",
        "Apple",
        "Apple",
        "Dell"
    };

    return exampleResults.ToObservable()
        .Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (x, _) => x);
}

Solution

With all that setup on board, we can proceed to a solution (add this to your Console App's Main method):

First, we decide on our batching - we will batch up to 3 companies, but not wait for more than 5 seconds for them to arrive.

const int batchSize = 3;
var maxWait = TimeSpan.FromSeconds(5);

Now we get our stream of company names:

var names = GetStockNames();  

We use Distinct() to extract the first instance of each name. We then use Buffer to batch up the names, taking at most batchSize names or waiting for at most maxWait timespan; whichever comes first (line 2). We then project each buffer (a list) into a lookup to our stock info service and covert the resulting list into a stream of individual StockInfo objects. (lines 3-5), flattening the whole thing into an IObservable<StockInfo>. This will then be a unique stream of StockInfo items:

var uniqueCompanies = names.Distinct()
                    .Buffer(maxWait, batchSize)      
                    .SelectMany(nameBatch => GetStockInfo(nameBatch)
                                            .ToObservable()
                                            .SelectMany(info => info));

Finally we can project each StockInfo into a stream of prices, and flatten the lot to get a single combined price stream:

var getPrices = uniqueCompanies.SelectMany(info => GetPrices(info));

getPrices.Subscribe(Console.WriteLine);

Output will be random data at random intervals, something like:

Subscribed for MSFT
Subscribed for GOOG
Subscribed for APPL
GOOG:6.37
MSFT:7.05
GOOG:7.12
APPL:5.34
Subscribed for YHOO
Subscribed for DELL
MSFT:7.35
YHOO:8.2

Hope that's of use!

OTHER TIPS

Tada:

GetAListOfThings()
    .SelectMany(list => GetSecondaryInformation(list)
        .Select(secInfo => new { list, secInfo }));
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top