Pregunta

I am trying to improve an event-driven processing system, which is having a few problems because the events are not guaranteed to arrive in the correct chronological sequence. This is due to batching and caching upstream which is currently out of my control.

The sequence errors are not a complete disaster for my processor, mainly because it's FSM oriented and so copes with "weird" transitions and mostly ignores them. Also the events are timestamped and expected to be delayed so we are used to reconstructing history. However there are a few cases which cause unwanted behaviour or unnecessary double-processing.

I'm looking for a technique which can either somehow queue up, group the events and sort them before processing, or perhaps identify "transactions" in the stream.

An example correct event stream would be like this, noting that all events are timestamped:

10:01 User Session Start (user A)
10:02 Device event 1
10:03 Device event 2
10:10 User Session End (user A)
10:15 User Session Start (user A)
10:16 User Session End (user A)
10:32 User Session Start (user B)
10:34 Device event 3
10:35 Device event 4
10:50 User Session End (user B)

My downstream processor keeps track of who is using a device, but also needs to correlate the other device events with the users. It does that by keeping state of the sessions, while receiving the other events.

Each event is in practise processed by different message queue workers, with a central database. So there are potential race hazards, but that's not the focus of this question.

The problems arise when the same stream arrives like this, where the ... indicate gaps between the three "batches" as they are received much later.

10:10 User Session End (user A)
10:01 User Session Start (user A)
10:02 Device event 1
10:03 Device event 2
...
10:16 User Session End (user A)
10:15 User Session Start (user A)
10:34 Device event 3
10:35 Device event 4
...
10:50 User Session End (user B)
10:32 User Session Start (user B)

I am particularly interested in "the final device event in a session". So here I need the 10:10 session and + the 10:03 Device event 2, to complete the picture. I know that any device event timeboxed between 10:01 and 10:10 is "owned" by user A, so when I receive device event 2, I can correlate it - OK. When I receive the 10:01 start event I can ignore it, as I already saw the corresponding end (just annoying). When I receive device event 1, I can't tell if it's the final one or not, so I process it. Then I receive device event 2 immediately after and re-do the same work, update the state to presume this is the last one. I cannot predict if there will be any more device events coming, so the FSM has to just remain with that assumption - which in this case is correct.

The next batch is harder to deal with. I get a second "empty" session from user A - not a problem in itself. Then I get some device events out of sequence which are for the user B session which I've not received yet. This isn't a critical problem, I can update the associated device model with this information, but cannot complete the processing yet.

Eventually the user B events arrive and I can correlate back with the device events, again ignoring "older" events where possible.

You can hopefully see this adds a lot of difficulty to the processing and is probably leading to some missing cases.

What can I do to massage this event stream to make it more processable?

I have been thinking about:

  • event sourcing (but it requires correct sequence)
  • re-buffering the queue for X minutes (but I still can't be sure how long)
  • implementing something like the Nagle Algorithm for chunking and pause/gap detection
  • Combine all the workers into one, with an FSM (mirroring the session-boxing) which then outputs the events once they've satisfied the inter-dependency sequence checks
  • Don't fix the queue and implement a random-order resilient processor

Because I can make some assumptions about the likely contents of the stream, I can considering a "transaction detector" or making no assumptions just a more generalised "stream re-order" approach.

I know sequence numbers would solve it easily, but as mentioned I cannot presently modify the upstream publisher.

I am not looking for an entire solution - just pointers to algorithms or techniques used for this class of problem, so I can do further research.

Update Clarification on conditions:

  • upper bound on the event delivery delay: No. Most devices report their data at the end of a user session, and the events are delivered relatively quickly. However some go offline for a while and then sync up later. In some cases the data arrives so late it's beyond the useful processing time period and we have to discard it. Often it arrives a bit late, and we have to provide live updates to administrators during time-critical and realtime workflows, to correct the processing outcomes.
  • if events are guaranteed to have correct timestamps : Yes
  • will be delivered eventually: Yes
  • can processing output be corrected: Yes this happens via the FSMs and is a great feature when it makes sense, but looks strange when it "goes backwards"
  • should output be produced as soon as you can - Sometimes. There are some time constraints where the processing needs to happen before a fixed future moment. A long time before that moment we're in no rush and could queue up the data. But when that moment of decision looms, and we cross timing thresholds, we now have to make choices and trigger workflows etc. After that time being realtime is important (in minutes not seconds).
  • Is associating events with the wrong sessions acceptable - No, that would be a major functional failure. However the sequence problems don't cause this, due to various logic and validation in the processor. [update: this has been found to happen, due to a hacky technique we quickly added to solve this!]
¿Fue útil?

Solución

SLAs

Service Level Agreements are perhaps not the most technically interesting approach, but nail down how long the data will be tolerated to be incomplete for. Your data will likely be feeding downstream report/decision making systems and you need to be sure it has the correct picture from upstream.

As a consequence this gives a definitive end-state for your processing. Either:

  • an error has been detected and reporting/fudging will take place,
  • the data is correct and can now be ignored for general processing, or
  • the upstream system owners will come to you with the rest of the data, after having had a bad hair day.

Buckets

Those session events can be used to form buckets into which the other events fall. As you only care about the most recent, the bucket need only hold the most recent message.

Track the last time the bucket was modified/checked. When it becomes sufficiently old, its presumably stable, and can be evaluated.

Incomplete buckets shouldn't be processed. (Though after sometime they should flag an alert.)

If a message doesn't fall into a bucket then it needs to be placed in a sorted list. When a new bucket is created the list can be intersected to fill the bucket.

Markov Chain Analysis

The data feed looks like a Markov Chain: Several underlying models being spliced together. If you can safely assume that each model delivers its contents serially then you can break the feed back into its constituent models and make an educated guess as to whether or not all data has been flushed or not.

Presumably when a cache flushes the contents were chronologically ordered. Hence whenever time jumps, particularly backwards, then its a different data source (aka underlying model).

Also presumably each cache services different devices, so when a device appears it indicates one data source. If several devices always appear in chronological order then presumably they are from the same data source. By extension if other details about the events are considered, some subset of them will always appear chronologically hence they represent a single data-source.

You can heuristically stipulate that all data has been received from all data-sources up to a given time if the chronology of the guessed data-sources have each an event at or past the given time.

Licenciado bajo: CC-BY-SA con atribución
scroll top