I'm not sure I understand why you would pass around an entire aggregate root inside your messages, but apart from that there's a simple way that Rebus will enable you to sequence your messages.
I suggest you let all of your messages implement a common interface that captures this particular aspect, e.g. something like
public interface IHaveSequenceNumber
{
int SequenceNumber { get; }
}
Then, you create a simple message handler that handles IHaveSequenceNumber
and aborts the handler pipeline when old messages are encountered, e.g.
public class WillDiscardOldMessages : IHandleMessages<IHaveSequenceNumber>
{
public void Handle(IHaveSequenceNumber messageWithSequenceNumber)
{
if (IsTooOld(messageWithSequenceNumber))
{
MessageContext.GetCurrent().Abort(); //< make this the last handler
}
}
}
And then - pretty importantly :) - you ensure that your message filter is always first in the pipeline of handlers when dispatch begins:
Configure.With(...)
.Transport(...)
.SpecifyOrderOfHandlers(s => s.First<WillDiscardOldMessages>())
.(...)
The implementation of the IsTooOld()
method above I will leave to you - it will probably be simple if you have one single worker thread in your endpoint, whereas concurrent handling of these things is not trivial.
Does that make sense?