Monday, October 27, 2008

Consistent Multi-Master DB Replication

As explain in my CouchDB implementation notes, the current replication mechanism doesn't provide consistency guarantees. This means if the client connects to different replicas at different time, she may see weird results, including ...
  • Client read a document X and later read the same document X again, but the 2nd read return an earlier revision of X than the 1st read.
  • Client update a document X and after some time, read the document X again, but he doesn’t see his previous update.
  • Client read a document X and based on its value, update document Y. Another client may see the update on document Y but doesn't see document X which document Y's update is based on.
  • Even if a client 1st update document X and then later on update document X the 2nd time, CouchDB may wrongly-perceive there is a conflict between the two updates (if they land on different replicas) and resort to a user-provided resolution strategy to resolve the conflict.
To prevent above situations from happening, here describe a possible extension of CouchDB to provides a "causal consistency guarantee" based on Vector Clock Gossiping technique. The target environment is a cluster of machines.

Here is a few definitions ...

Causal Consistency
  • It is not possible to see the effects before seeing its causes. In other words, when different replicas propagate their updates, it always apply the updates of the causes before applying updates of the "effect".
  • "Effects" and "Causes" are related by a "happens-before" relationship. ie: causes happens-before effect.

Logical Clock
  • A monotonically increasing sequence number that is atomically increase by one whenever an "event" occur.
  • Update a state locally
  • Sending a message
  • Receiving a message

Vector Clock
  • An array of logical clocks where each entry represents the logical clock of a different process
  • VC1 >= VC2 if for every i, VC1[i] >= VC2[i]
  • VC3 = merge(VC1, VC2) where for every i, VC3[i] = max(VC1[i], VC2[i])


The basic idea is ...
  • When the client issue a GET, the replica should only reply when it is sure that it has got a value later than what the client has seen before. Otherwise, it delays its response until that happens.
  • When the client issue an PUT/POST/DELETE, the replica immediately acknowledge the client but instead of applying the update immediately, it will put this request into a queue. After all other updates that this update depends on has been applied to the DB state, this update will be applied.
  • Replicas in the background will exchange their update logs so that all the updates will be propagated to all copies.

Each replica maintains ...
  • A "replica-VC" is associated with the whole replica, which is updated when an update request is received from a proxy, or when a gossip message is sent or received.
  • A "state-VC" is associated with the state, which is updated when a pending update from the queue is applied to the local DB
  • A set of other replica's VC, this is the vector clock obtained from other replicas during the last gossip message received from them

The client talks to the same proxy, which maintains the Client's Vector clock. This vector clock is important to filter out inconsistent data when the proxy talking to the replicas which the proxy can choose randomly.

Read (GET) Processing
  1. When the client issue a READ, the proxy can choose any replica to forward its GET (along with the Client's vector clock).
  2. The chosen replica will return the GET result only when it make sure its DB has got the state which is "more updated" than what the client has seen. (ie: stateVC >= clientVC). Otherwise, it will delay its response until this condition happen.
  3. The proxy may timeout and contact another replica
  4. The response of the replica contains its replicaVC. The proxy will refresh its clientVC = merge(clientVC, replicaVC)

Update (PUT/POST/DELETE) Processing
  1. When the client issue an UPDATE, the proxy can choose any replica to forward its UPDATE (which contains a uniqueId, the Client's vector clock and the operation's data).
  2. For fault tolerant reason, the proxy may pick multiple replica to forward its updates (e.g. it may pick M replicas to forward its request and return "success" to the client when N replicas ACK back).
  3. The chosen replica(s) will first advance its logical clock and the replicaVC.
  4. The replica compute a vector timestamp by copying from the clientVC and modify its entry to its logical clock. (ie: ts = clientVC; ts[myReplicaNo] = logicalClock)
  5. The replica attach this timestamp to the update request and put the UPDATE request into the queue. The update record "u" =
  6. The replica send an ACK message containing its replicaVC to the proxy. The proxy will refresh its clientVC = merge(clientVC, replicaVC)
Applying Pending Updates
  1. A pending update "u" can be applied to the state when all the "states" that it depends on has been applied. (ie: stateVC >= u.clientVC)
  2. Periodically, the updatelog is scanned for the above criteria
  3. When this happens, it applies the update "u" to the DB and then update the stateVC = merge(stateVC, u.ts)
  4. Note that while this mechanism guarantees that updates happens in "casual order", (ie: the "effect" will not be updated before its "causes"). It doesn't guarantees "total order". Because independent updates (or concurrent updates) can happen in arbitrary order, the order it happen in different replicas may be different.
Processing Gossip Messages

It is important that Replica exchange the request log among themselves so eventually everyone will have a complete picture for all the update request regardless of where that happens.

Periodically, each replica picks some other replica to send its update log. The strategy to pick who to communicate can be based on a random selection, or based on topology (only talk to neighbors), or based on degree of outdateness (the one with longest time we haven't talked). Once the target replica is selected, a complete update log together with its current replicaVC will be sent to the target replica.

On the other hand, when a replica receive a gossip message from another replica...
  • It will merge the update log of the message with its own update log. ie: For each update record u in the message's update log, it will add u to its own update log unless its replicaVC >= u.ts (which means it already has received a later update that suceed u)
  • Check to see some of the pending update is ready to be apply to the database. Adjust the stateVC accordingly
  • Delete some entries in the log after they have been applied to the DB and knowing that all other replicas has already got it. In other words, let c be the replicaId that "u" is created, then "u" is removable if for every replica i, otherReplicasVC[i][c] > u.ts[c]
  • Update the replicaVC = merge(replicaVC, message.replicaVC)


Anonymous said...


Your well planned designs are exciting. What program do you use for your diagrams? And do you get paid to create these solutions?

Jared Dobson

Ricky Ho said...

I use just powerpoint for my diagram.
My purpose of blogging is to write down my thoughts before I forget. And also can meet people with like minds. I don't get paid from doing this.

Anonymous said...

Oh cool, Thanks. :-)