Question

Lets say I have a stream of bids - and i want to enrich it with the bidders names:

[
  { bidder: 'user/7', bet: 20  },
  { bidder: 'user/8', bet: 21 }, 
  { bidder: 'user/7', bet: 25  }, 
  /*...., 2 seconds later */
  { bidder: 'user/8', bet: 25  },
  { bidder: 'user/9', bet: 30  },
  ...

Bidder names come from a webservice:

GET '/users?id=7&id=8' =>
[{ user: 'user/7', name: 'Hugo Boss'}, { user: 'user/8', name: "Karl Lagerfeld"}

Which i wrap into a reactive read-through-cache:

IObservable<User> Users(IObservable<string> userIds);

Compose

Now I want to compose that into following output:

[
  { bidder: 'user/7', bet: 20, name: 'Hugo Boss'  },
  { bidder: 'user/8', bet: 21, name: 'Karl Lagerfeld' }, 
  { bidder: 'user/7', bet: 25, name: 'Hugo Boss'  }, 
  /*...., 2 seconds later */
  { bidder: 'user/8', bet: 25, name: 'Karl Lagerfeld'  },
  { bidder: 'user/9', bet: 30, name: 'Somebody else'  },
  ...

Step 1

I guess I need to project the stream of bids to a stream of user ids. Simple Select. Then inside the read-throug-cache I split it up in chunks with Buffer(TimeSpan, int).

Now I have a stream of bids and one of users.

Step 2

But now, how to combine the two?

Hint: For the BIDs it would make sense to keep the order - but in my real code I don't care about the order. So I'd like to have a solution that does not rely on that the user cache returns users in the right order. Then a Zip could do the job.

I rather want to release all bids into my result stream as soon as I have the user information available.

Solution?

I'm pretty sure I need to maintain some temporary state somehow (window/buffer/...). But I don't know where and how. Maybe this should be implemented as a custom operator; or maybe there is one out there already?

Any ideas?

Edit: It seems like it isn't possible to actually compose this on top of streams. Instead I need to get a promise (either Task or IObservable) for the userid->user function and leave it up to the promise to bulk load and/or cache users, if appropriate.

Was it helpful?

Solution

Something like the following should work. Represent the cache as an ISubject and you have an async cache. Internally in the cache you could buffer queries over a time window and batch request them to the server.

Some dummy classes

public struct User
{
    public string id;
}

public struct Bid
{
    public string userId;
    public int bid;
}

The cache object itself

/// <summary>
/// Represent the cache by a subject that get's notified of
/// user requests and produces users. Internally this can
/// be buffered and chunked to the server. 
/// </summary>
public static ISubject<string, User> UsersCacheSubject;

Method for dispatching and async queries to the DB

public static Task<User> UsersCache(string id)
{
    var r = UsersCacheSubject
        .Where(user=>user.id==id)
        .Replay(1)
        .Take(1);


    UsersCacheSubject.OnNext(id);

    return r.FirstAsync().ToTask();
}

try it out

 public void TryItOut()
{
    IObservable<Bid> bidderObservable = Observable.Repeat(new Bid());
     var foo = from bidder in bidderObservable
              from user in UsersCache(bidder.userId)
              where bidder.userId == user.id
              select new {bidder, user};
 }

OTHER TIPS

Isn't this just as simple as doing the following?

var query =
    from b in bids
    from u in Users(Observable.Return(b.Bidder))
    select new
    {
        b.Bidder,
        b.Bet,
        u.Name
    };
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top