Question

Please excuse any mistakes in terminology. In particular, I am using relational database terms.

There are a number of persistent key-value stores, including CouchDB and Cassandra, along with plenty of other projects.

A typical argument against them is that they do not generally permit atomic transactions across multiple rows or tables. I wonder if there's a general approach would would solve this issue.

Take for example the situation of a set of bank accounts. How do we move money from one bank account to another? If each bank account is a row, we want to update two rows as part of the same transaction, reducing the value in one and increasing the value in another.

One obvious approach is to have a separate table which describes transactions. Then, moving money from one bank account to another consists of simply inserting a new row into this table. We do not store the current balances of either of the two bank accounts and instead rely on summing up all the appropriate rows in the transactions table. It is easy to imagine that this would be far too much work, however; a bank may have millions of transactions a day and an individual bank account may quickly have several thousand 'transactions' associated with it.

A number (all?) of key-value stores will 'roll back' an action if the underlying data has changed since you last grabbed it. Possibly this could be used to simulate atomic transactions, then, as you could then indicate that a particular field is locked. There are some obvious issues with this approach.

Any other ideas? It is entirely possible that my approach is simply incorrect and I have not yet wrapped my brain around the new way of thinking.

Was it helpful?

Solution

If, taking your example, you want to atomically update the value in a single document (row in relational terminology), you can do so in CouchDB. You will get a conflict error when you try to commit the change if an other contending client has updated the same document since you read it. You will then have to read the new value, update and re-try the commit. There is an indeterminate (possibly infinite if there is a lot of contention) number of times you may have to repeat this process, but you are guaranteed to have a document in the database with an atomically updated balance if your commit ever succeeds.

If you need to update two balances (i.e. a transfer from one account to an other), then you need to use a separate transaction document (effectively another table where rows are transactions) that stores the amount and the two accounts (in and out). This is a common bookkeeping practice, by the way. Since CouchDB computes views only as needed, it is actually still very efficient to compute the current amount in an account from the transactions that list that account. In CouchDB, you would use a map function that emitted the account number as key and the amount of the transaction (positive for incoming, negative for outgoing). Your reduce function would simply sum the values for each key, emitting the same key and total sum. You could then use a view with group=True to get the account balances, keyed by account number.

OTHER TIPS

CouchDB isn't suitable for transactional systems because it doesn't support locking and atomic operations.

In order to complete a bank transfer you must do a few things:

  1. Validate the transaction, ensuring there are sufficient funds in the source account, that both accounts are open, not locked, and in good standing, and so on
  2. Decrease the balance of the source account
  3. Increase the balance of the destination account

If changes are made in between any of these steps the balance or status of the accounts, the transaction could become invalid after it is submitted which is a big problem in a system of this kind.

Even if you use the approach suggested above where you insert a "transfer" record and use a map/reduce view to calculate the final account balance, you have no way of ensuring that you don't overdraw the source account because there is still a race condition between checking the source account balance and inserting the transaction where two transactions could simultaneous be added after checking the balance.

So ... it's the wrong tool for the job. CouchDB is probably good at a lot of things, but this is something that it really can not do.

EDIT: It's probably worth noting that actual banks in the real world use eventual consistency. If you overdraw your bank account for long enough you get an overdraft fee. If you were very good you might even be able to withdraw money from two different ATMs at almost the same time and overdraw your account because there's a race condition to check the balance, issue the money, and record the transaction. When you deposit a check into your account they bump the balance but actually hold those funds for a period of time "just in case" the source account doesn't really have enough money.

To provide a concrete example (because there is a surprising lack of correct examples online): here's how to implement an "atomic bank balance transfer" in CouchDB (largely copied from my blog post on the same subject: http://blog.codekills.net/2014/03/13/atomic-bank-balance-transfer-with-couchdb/)

First, a brief recap of the problem: how can a banking system which allows money to be transfered between accounts be designed so that there are no race conditions which might leave invalid or nonsensical balances?

There are a few parts to this problem:

First: the transaction log. Instead of storing an account's balance in a single record or document — {"account": "Dave", "balance": 100} — the account's balance is calculated by summing up all the credits and debits to that account. These credits and debits are stored in a transaction log, which might look something like this:

{"from": "Dave", "to": "Alex", "amount": 50}
{"from": "Alex", "to": "Jane", "amount": 25}

And the CouchDB map-reduce functions to calculate the balance could look something like this:

POST /transactions/balances
{
    "map": function(txn) {
        emit(txn.from, txn.amount * -1);
        emit(txn.to, txn.amount);
    },
    "reduce": function(keys, values) {
        return sum(values);
    }
}

For completeness, here is the list of balances:

GET /transactions/balances
{
    "rows": [
        {
            "key" : "Alex",
            "value" : 25
        },
        {
            "key" : "Dave",
            "value" : -50
        },
        {
            "key" : "Jane",
            "value" : 25
        }
    ],
    ...
}

But this leaves the obvious question: how are errors handled? What happens if someone tries to make a transfer larger than their balance?

With CouchDB (and similar databases) this sort of business logic and error handling must be implemented at the application level. Naively, such a function might look like this:

def transfer(from_acct, to_acct, amount):
    txn_id = db.post("transactions", {"from": from_acct, "to": to_acct, "amount": amount})
    if db.get("transactions/balances") < 0:
        db.delete("transactions/" + txn_id)
        raise InsufficientFunds()

But notice that if the application crashes between inserting the transaction and checking the updated balances the database will be left in an inconsistent state: the sender may be left with a negative balance, and the recipient with money that didn't previously exist:

// Initial balances: Alex: 25, Jane: 25
db.post("transactions", {"from": "Alex", "To": "Jane", "amount": 50}
// Current balances: Alex: -25, Jane: 75

How can this be fixed?

To make sure the system is never in an inconsistent state, two pieces of information need to be added to each transaction:

  1. The time the transaction was created (to ensure that there is a strict total ordering of transactions), and

  2. A status — whether or not the transaction was successful.

There will also need to be two views — one which returns an account's available balance (ie, the sum of all the "successful" transactions), and another which returns the oldest "pending" transaction:

POST /transactions/balance-available
{
    "map": function(txn) {
        if (txn.status == "successful") {
            emit(txn.from, txn.amount * -1);
            emit(txn.to, txn.amount);
        }
    },
    "reduce": function(keys, values) {
        return sum(values);
    }
}

POST /transactions/oldest-pending
{
    "map": function(txn) {
        if (txn.status == "pending") {
            emit(txn._id, txn);
        }
    },
    "reduce": function(keys, values) {
        var oldest = values[0];
        values.forEach(function(txn) {
            if (txn.timestamp < oldest) {
                oldest = txn;
            }
        });
        return oldest;
    }

}

List of transfers might now look something like this:

{"from": "Alex", "to": "Dave", "amount": 100, "timestamp": 50, "status": "successful"}
{"from": "Dave", "to": "Jane", "amount": 200, "timestamp": 60, "status": "pending"}

Next, the application will need to have a function which can resolve transactions by checking each pending transaction in order to verify that it is valid, then updating its status from "pending" to either "successful" or "rejected":

def resolve_transactions(target_timestamp):
    """ Resolves all transactions up to and including the transaction
        with timestamp `target_timestamp`. """
    while True:
        # Get the oldest transaction which is still pending
        txn = db.get("transactions/oldest-pending")
        if txn.timestamp > target_timestamp:
            # Stop once all of the transactions up until the one we're
            # interested in have been resolved.
            break

        # Then check to see if that transaction is valid
        if db.get("transactions/available-balance", id=txn.from) >= txn.amount:
            status = "successful"
        else:
            status = "rejected"

        # Then update the status of that transaction. Note that CouchDB
        # will check the "_rev" field, only performing the update if the
        # transaction hasn't already been updated.
        txn.status = status
        couch.put(txn)

Finally, the application code for correctly performing a transfer:

def transfer(from_acct, to_acct, amount):
    timestamp = time.time()
    txn = db.post("transactions", {
        "from": from_acct,
        "to": to_acct,
        "amount": amount,
        "status": "pending",
        "timestamp": timestamp,
    })
    resolve_transactions(timestamp)
    txn = couch.get("transactions/" + txn._id)
    if txn_status == "rejected":
        raise InsufficientFunds()

A couple of notes:

  • For the sake of brevity, this specific implementation assumes some amount of atomicity in CouchDB's map-reduce. Updating the code so it does not rely on that assumption is left as an exercise to the reader.

  • Master/master replication or CouchDB's document sync have not been taken into consideration. Master/master replication and sync make this problem significantly more difficult.

  • In a real system, using time() might result in collisions, so using something with a bit more entropy might be a good idea; maybe "%s-%s" %(time(), uuid()), or using the document's _id in the ordering. Including the time is not strictly necessary, but it helps maintain a logical if multiple requests come in at about the same time.

BerkeleyDB and LMDB are both key-value stores with support for ACID transactions. In BDB txns are optional while LMDB only operates transactionally.

A typical argument against them is that they do not generally permit atomic transactions across multiple rows or tables. I wonder if there's a general approach would would solve this issue.

A lot of modern data stores don't support atomic multi-key updates (transactions) out of the box but most of them provide primitives which allow you to build ACID client-side transactions.

If a data store supports per key linearizability and compare-and-swap or test-and-set operation then it's enough to implement serializable transactions. For example, this approach is used in Google's Percolator and in CockroachDB database.

In my blog I created the step-by-step visualization of serializable cross shard client-side transactions, described the major use cases and provided links to the variants of the algorithm. I hope it will help you to understand how to implement them for you data store.

Among the data stores which support per key linearizability and CAS are:

  • Cassandra with lightweight transactions
  • Riak with consistent buckets
  • RethinkDB
  • ZooKeeper
  • Etdc
  • HBase
  • DynamoDB
  • MongoDB

By the way, if you're fine with Read Committed isolation level then it makes sense to take a look on RAMP transactions by Peter Bailis. They can be also implemented for the same set of data stores.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top