Question

I'm trying to expose an observable sequence that gives observers all existing records in a database table plus any future items. For the sake of argument, lets say it's log entries. Therefore, I'd have something like this:

public class LogService
{
    private readonly Subject<LogEntry> entries;

    public LogService()
    {
        this.entries = new Subject<LogEntry>();

        this.entries
            .Buffer(...)
            .Subscribe(async x => WriteLogEntriesToDatabaseAsync(x));
    }

    public IObservable<LogEntry> Entries
    {
        get { return this.entries; }
    }

    public IObservable<LogEntry> AllLogEntries
    {
        get
        {
            // how the heck?
        }
    }

    public void Log(string message)
    {
        this.entries.OnNext(new LogEntry(message));
    }

    private async Task<IEnumerable<LogEntry>> GetLogEntriesAsync()
    {
        // reads existing entries from DB table and returns them
    }

    private async Task WriteLogEntriesToDatabaseAsync(IList<LogEntry> entries)
    {
        // writes entries to the database
    }
}

My initial thought for the implementation of AllLogEntries was something like this:

return Observable.Create<LogEntry>(
    async observer =>
    {
        var existingEntries = await this.GetLogEntriesAsync();

        foreach (var existingEntry in existingEntries)
        {
            observer.OnNext(existingEntry);
        }

        return this.entries.Subscribe(observer);
    });

But the problem with this is that there could log entries that have been buffered and not yet written to the database. Hence, those entries will be missed because they are not in the database and have already passed through the entries observable.

My next thought was to separate the buffered entries from the non-buffered and use the buffered when implementing AllLogEntries:

return Observable.Create<LogEntry>(
    async observer =>
    {
        var existingEntries = await this.GetLogEntriesAsync();

        foreach (var existingEntry in existingEntries)
        {
            observer.OnNext(existingEntry);
        }

        return this.bufferedEntries
            .SelectMany(x => x)
            .Subscribe(observer);
    });

There are two problems with this:

  1. It means clients of AllLogEntries also have to wait for the buffer timespan to pass before they receive their log entries. I want them to see log entries instantaneously.
  2. There is still a race condition in that log entries could be written to the database between the point at which I finish reading the existing ones and the point at which I return the future entries.

So my question is: how would I actually go about achieving my requirements here with no possibility of race conditions, and avoiding any major performance penalties?

Was it helpful?

Solution

To do this via the client code, you will probably have to implement a solution using polling and then look for differences between calls. Possibly combining a solution with

will give you sufficient solution.

Alternatively, I'd suggest you try to find a solution where the clients are notified when the DB/table is updated. In a web application, you could use something like SignalR to do this.

For example: http://techbrij.com/database-change-notifications-asp-net-signalr-sqldependency

If its not a web-application, a similar update mechanism via sockets may work.

See these links (these came from the accepted answer of SignalR polling database for updates):

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top