Question

Anyone know of a message bus implementation which offers granular control over consistency guarantees? Full ACID is too slow and no ACID is too wrong.

We're currently using Rhino ESB wrapping MSMQ for our messaging. When using durable, transactional messaging with distributed transactions, MSMQ can block the commit for considerable time while it waits on I/O completion.

Our messages fall into two general categories: business logic and denormalisation. The latter account for a significant percentage of message bus traffic.

Business logic messages require the guarantees of full ACID and MSMQ has proven quite adequate for this.

Denormalisation messages:

  1. MUST be durable.
  2. MUST NOT be processed until after the originating transaction completes.
  3. MAY be processed multiple times.
  4. MAY be processed even if the originating transaction rolls back, as long as 2) is adhered to.

(In some specific cases the durability requirements could probably be relaxed, but identifying and handling those cases as exceptions to the rule adds complexity.)

All denormalisation messages are handled in-process so there is no need for IPC.

If the process is restarted, all transactions may be assumed to have completed (committed or rolled back) and all denormalisation messages not yet processed must be recovered. It is acceptable to replay denormalisation messages which were already processed.

As far as I can tell, messaging systems which deal with transactions tend to offer a choice between full ACID or nothing, and ACID carries a performance penalty. We're seeing calls to TransactionScope#Commit() taking as long as a few hundred milliseconds in some cases depending on the number of messages sent.

Using a non-transactional message queue causes messages to be processed before their originating transaction completes, resulting in consistency problems.

Another part of our system which has similar consistency requirements but lower complexity is already using a custom implementation of something akin to a transaction log, and generalising that for this use case is certainly an option, but I'd rather not implement a low-latency, concurrent, durable, transactional messaging system myself if I don't have to :P

In case anyone's wondering, the reason for requiring durability of denormalisation messages is that detecting desyncs and fixing desyncs can be extremely difficult and extremely expensive respectively. People do notice when something's slightly wrong and a page refresh doesn't fix it, so ignoring desyncs isn't an option.

Was it helpful?

Solution 2

It turns out that MSMQ+SQL+DTC don't even offer the consistency guarantees we need. We previously encountered a problem where messages were being processed before the distributed transaction which queued them had been committed to the database, resulting in out-of-date reads. This is a side-effect of using ReadCommitted isolation to consume the queue, since:

  1. Start transaction A.
  2. Update database table in A.
  3. Queue message in A.
  4. Request commit of A.
  5. Message queue commits A
  6. Start transaction B.
  7. Read message in B.
  8. Read database table in B, using ReadCommitted <- gets pre-A data.
  9. Database commits A.

Our requirement is that B's read of the table block on A's commit, which requires Serializable transactions, which carries a performance penalty.

It looks like the normal thing to do is indeed to implement the necessary constraints and guarantees oneself, even though it sounds like reinventing the wheel.

Anyone got any comments on this?

OTHER TIPS

It's not exactly the answer you're looking for, but Jonathan Oliver has written extensively on how to avoid using distributed transactions in messaging and yet maintain transactional integrity:

http://blog.jonathanoliver.com/2011/04/how-i-avoid-two-phase-commit/ http://blog.jonathanoliver.com/2011/03/removing-2pc-two-phase-commit/ http://blog.jonathanoliver.com/2010/04/idempotency-patterns/

Not sure if this helps you but, hey.

If you want to do this by hand, here is a reliable approach. It satisfies (1) and (2), and it doesn't even need the liberties that you allow in (3) and (4).

  1. Producer (business logic) starts transaction A.
  2. Insert/update whatever into one or more tables.
  3. Insert a corresponding message into PrivateMessageTable (part of the domain, and unshared, if you will). This is what will be distributed.
  4. Commit transaction A. Producer has now simply and reliably performed its writes including the insertion of a message, or rolled everything back.
  5. Dedicated distributer job queries a batch of unprocessed messages from PrivateMessageTable.
  6. Distributer starts transaction B.
  7. Mark the unprocessed messages as processed, rolling back if the number of rows modified is different than expected (two instances running at the same time?).
  8. Insert a public representation of the messages into PublicMessageTable (a publically exposed table, in whatever way). Assign new, strictly sequential Ids to the public representations. Because only one process is doing these inserts, this can be guaranteed. Note that the table must be on the same host to avoid 2PC.
  9. Commit transaction B. Distributor has now distributed each message to the public table exactly once, with strictly sequantial Ids.
  10. A consumer (there can be several) queries the next batch of messages from PublicMessageTable with Id greater than its own LastSeenId.
  11. Consumer starts transaction C.
  12. Consumer inserts its own representation of the messages into its own table ConsumerMessageTable (thus advancing LastSeenId). Insert-ignore can help protect against multiple instances running. Note that this table can be in a completely different server.
  13. Commit transaction C. Consumer has now consumed each message exactly once, in the same order the messages were made publically available, without ever skipping a message.
  14. We can do whatever we want based on the consumed messages.

Of course, this requires very careful implementation.

It is even suitable for database clusters, as long as there is only a single write node, and both reads and writes perform causality checks. It may well be that having one of these is sufficient, but I'd have to consider the implications more carefully to make that claim.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top