Вопрос

Each message the handler receives contains the entire state of the aggregate root. The system is then able to perform the required operations based on this data. In my scenario grant access based on the data in the message e.g. access to room A & B. A message contains the entire set of granted access. These messages might arrive out-of-order, as messaging systems like MSMQ does not guarantee ordered delivery.

A scenario where message #1 grants access to room A & B, but the message #2 only grants access to room A. If they arrive out-of-order, then access is granted to room A, and the later on to room A & B. This is not the desired outcome. Only access to room A should be granted. Each message contains a timestamp, which is set when published. I would like to use this timestamp to remove old messages that arrive out-of-order e.g. if message #2 arrives before message #1, then message 1# should be discarded.

I could implement this filter in each handler method, but that would be tedious, so I am hoping that Rebus has something along the line of EAI Message Filters?

I'm open for other options/implementations?

Это было полезно?

Решение

I'm not sure I understand why you would pass around an entire aggregate root inside your messages, but apart from that there's a simple way that Rebus will enable you to sequence your messages.

I suggest you let all of your messages implement a common interface that captures this particular aspect, e.g. something like

public interface IHaveSequenceNumber
{
    int SequenceNumber { get; }
}

Then, you create a simple message handler that handles IHaveSequenceNumber and aborts the handler pipeline when old messages are encountered, e.g.

public class WillDiscardOldMessages : IHandleMessages<IHaveSequenceNumber>
{
    public void Handle(IHaveSequenceNumber messageWithSequenceNumber)
    {
        if (IsTooOld(messageWithSequenceNumber))
        {
            MessageContext.GetCurrent().Abort(); //< make this the last handler
        }
    }
}

And then - pretty importantly :) - you ensure that your message filter is always first in the pipeline of handlers when dispatch begins:

Configure.With(...)
    .Transport(...)
    .SpecifyOrderOfHandlers(s => s.First<WillDiscardOldMessages>())
    .(...)

The implementation of the IsTooOld() method above I will leave to you - it will probably be simple if you have one single worker thread in your endpoint, whereas concurrent handling of these things is not trivial.

Does that make sense?

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top