Sunday, October 19, 2008

CouchDB Implementation

CouchDB is an Apache OpenSource project. It is Damien Katz's brain child and has a number of very attractive features based on very cool technologies. Such as ...
  • RESTful API
  • Schema-less document store (document in JSON format)
  • Multi-Version-Concurrency-Control model
  • User-defined query structured as map/reduce
  • Incremental Index Update mechanism
  • Multi-Master Replication model
  • Written in Erlang (Erlang is good)
There is a wide range of application scenarios where CouchDB can be a good solution fit, from an occasionally connected laptop-based application, high performance data cluster, and all the way up to virtual data storage in the cloud.

To understand deeper about CouchDB design, I am very fortunate to have a conversation with Damien, who is so kind to share many details with me. Here I want to capture what I have learnt from this conversation.

Underlying Storage Structure
CouchDB is a “document-oriented” database where document is a JSON string (with an optional binary attachment). The underlying structure is composed of a “storage” as well as multiple “view indexes”. The “storage” is used to store the documents and the “view indexes” is used for query processing.

Within a storage file, there are “contiguous” regions
which is used to store documents. There are 2 B+Tree indexes to speed up certain assess to the documents.
  • by_id_index (which use the document id as the key). It is mainly use to lookup the document by its document id, it points to a list of revisions (or a tree of revisions in case of conflicts in the replication scenario) since the last compaction. It also keep a the revision history (which won't be affected by compaction).
  • by_seqnum_index (which use a monotonically increasing number as the key). Seqnum is generated whenever a document is updated. (Note that all updates are happening is a serial fashion so the seqnum reflect a sequence of non-concurrent update). It is mainly use to keep track of last point of replication synchronization, last point of view index update.


Append Only Operation

All updates (creating documents, modifying documents and deleting documents) happens in an append only mechanism. Instead of modifying the existing documents, a new copy is created and append to the current region. After that, the b+tree nodes are also modified to point to the new document location. Modification to the b+tree nodes also done in an append-only fashion, which means a new b+tree node is copy and tail-append to the end of the file. This in turn trigger a modification to the parent node of the b+tree node, which cause a new copy of the parent node … until all the way back to the root b+tree node. And finally modify the file header to point to the new root node.

That means all updates will trigger 1 write to the document (except delete) and logN writes to each B+Tree node page. So it is O(logN) complexity.

Append-only operation provide an interesting MVCC (Multi-Version Concurrency Control) model because the file keep a history of all the versions of previous document state. As long as the client hold on to a previous root node of the B+Tree index, it can get a snapshot view. While update can continuously happen, the client won’t see any of the latest changes. Such consistency snapshot is very useful in online backup as well as online compaction.

Note that while read operation is perform concurrently with other read and write. Write operation is perform in a serial order across documents. In other words, at any time only one document update can be in progress (however, write of attachments within a document can happen in parallel).

GET document

When a client issue a HTTP REST GET call to CouchDB, the DBServer …
  • Look at the file header to find the root node of the by_id B+Tree index
  • Traverse down the B+tree to figure out the document location
  • Read the document and return back to client

PUT document (modification)

When a client issue a HTTP REST POST call to CouchDB, the DBServer …
  • Look at the file header to find the root node of the by_id B+Tree index
  • Traverse down the B+tree to figure out the leaf node as well as the document location
  • Read the document. Compare the revision, throw an error if they don’t match.
  • If they match, figure out the old seqnum of the current revision.
  • Generate a new (monotonic increasing) seqnum as well as a new revision
  • Find the last region to see if this document can fit in. If not, allocate another contiguous region.
  • Write the document (with the new revision) into the new region
  • Modify the by_id b+tree to point to the new document location
  • Modify the by_seqnum b+tree to add the new entry (of the new seqnum) and remove the old entry (of the old seqnum).
Note that the by_seqnum B+Tree index always point to the latest revision, previous revision is automatically forgotten.

PUT / POST document (creation)

When a client issue a HTTP REST PUT call to CouchDB, the DBServer …
  • Generate a new (monotonic increasing) seqnum as well as a new document id and revision
  • Find the last region to see if this document can fit in. If not, allocate another contiguous region.
  • Write the document (with the new revision) into the new region
  • Modify the by_id b+tree to point to the new document location
  • Modify the by_seqnum b+tree to add the new entry (of the new seqnum)

DELETE document (modify)
When a client issue a HTTP REST DELETE call to CouchDB, the DBServer …
  • Look at the file header to find the root node of the by_id B+Tree index
  • Traverse down the B+tree to figure out the leaf node as well as the document location
  • Read the document. Compare the revision, throw an error if they don’t match.
  • If they match, figure out the old seqnum of the current revision.
  • Generate a new (monotonic increasing) seqnum as well as a new revision
  • Modify the by_id b+tree revision history to show this revision path is deleted
  • Modify the by_seqnum b+tree to add the new entry (of the new seqnum) and remove the old entry (of the old seqnum).
Online Compaction

As an append-only operation, the storage file will grow over time. So we need to compact the file regularly.
  • Open a new storage file
  • Walk the by_seqnum b+tree index (which only points to the latest revision), locate the document
  • Copy the document to the new storage file (which automatically update the corresponding b+tree indexes in the new storage file).
Note that because of the characteristic of MVCC, the compaction will get a consistency snapshot and can happen concurrently without being interfered by the continuously update after the start of compaction. However, if the rate of update is too high, the compaction process can never catch up with the update which keep appending to the file. There is a throttling mechanism under development to slow down the client update rate.

View Indexes

CouchDB supports a concept of “view” to the database. A view is effectively the result of user-defined processing to the underlying document repository. The user-defined processing has to be organized as a two-step processing, “map” and “reduce”. (note that the reduce semantics is very different from Google’s Map/Reduce model). Map() is a user defined function which transform each documents into zero, one or multiple intermediate objects, which reduce() is another user defined function to consolidate the intermediate objects into the final result.

The intermediate objects of the map() and the reduce() is stored in the view indexes. As the storage gets updated, the previous results stored in the view indexes is no longer valid and has to be updated. CouchDB use an incremental update mechanism so that the refresh of the view indexes is highly efficient.

Views definitions are grouped into a design document.

Each view is defined by one “map” function and an optional “reduce” function.

map = function(doc) {
 …
 emit(k1, v1)
 …
 emit(k2, v2)
 …
}

reduce = function(keys, values) {
 …
 return result
}
The reduce() function needs to be commutative and associative so that the order of reduction can be arbitrary.

Views defined within each design document is materialized in a view file.


Initially, the view file is empty (no index has been built yet). View is built lazily when the first query is made.
  1. CouchDB will walk the by_seqnum B+Tree index of the storage file.
  2. Based on that, CouchDB get the latest revisions of all existing documents
  3. CouchDB remembers the last seqnum and then feed each document to the View Server using “map_doc”.
  4. View Server invoke the map(doc) function, for each emit(key, value) call, an entry is created.
  5. Finally, a set of entries is computed and return back to CouchDB.
  6. CouchDb will add those entries into the B+Tree index, key = emit_key + doc_id. For each of the B+Tree leave node.
  7. CouchDB will send all its containing map entry back to the View Server using “reduce”.
  8. View Server invoke the reduce(keys, values) function.
  9. The reduce result is computed and return back to CouchDB
  10. CouchDb will update the leave B+Tree node to point to the reduce value of its containing map results.
  11. After that, CouchDb move up one level to the parent of the leave B+Tree node. For each of the B+Tree parent node, CouchDB will send the corresponding reduce result of its children nodes to the View Server using “rereduce”.
  12. View Server invoke the reduce(keys, values) function again.
  13. Finally a rereduce result is computed and return back to CouchDB.
  14. CouchDB will update the parent B+Tree node to point to the rereduce value.
CouchDB continues to move up one level and repeat the calculation of rereduce result. Finally the rereduce result of the root node is also updated.


When done, the view index will look something like this …



Incremental View Update

CouchDB updates the view indexes lazily and incrementally. That means, when the documents are updated, CouchDB will not refresh the view index until the next query reaches CouchDB.

Then CouchDB refresh the index in the following way.
  • CouchDB will then walk the by_seqnum B+Tree index of the storage file, starting from the last seqnum.
  • CouchDB extract all the change documents since the last view query and feed them to the view server’s map function, and get back a set of map results.
  • CouchDb update the map result into the B+Tree index, some of the leave B+Tree node will be updated.
  • For those updated leave B+Tree node, CouchDB resend all its containing map entries back to view server to recomputed the reduce value. Then store the reduced value inside the B+Tree node.
  • All the parents of the updated leave B+Tree node, CouchDB need to recompute the rereduce value and store it inside the B+Tree node. Until all the way up to the root node.
Because of the consistent snapshot characteristic, a long-running view query can run concurrently (without interference) with the ongoing update of the DB. However, the query need to wait for the completion of the view index update before seeing the consistent result. There is also an option (under development) to immediately return a stale copy of the view in case the client can tolerate that.

Query processing

When client retrieve the result of a view, there are the following scenarios

Query on Map-only view
In this case, there is no reduce phase of the view indexes update. To perform the query processing, CouchDB simply search the B+Tree to locate the corresponding starting point of the key (note that the key is prefixed by the emit_key) and then return all the map results of that key

Query on Map with reduce
There are 2 cases. If the query is on the final reduce value over the whole view, then CouchDB will just return the rereduce value pointed by the root of B+Tree of the view index.

If the query is on the reduce value of each key (group_by_key = true), then CouchDB try to locate the boundary of each key. Since this range is probably not fitting exactly along the B+Tree node, CouchDB need to figure out the edge of both ends to locate the partially matched leave B+Tree node and resend its map result (with that key) to the View Server. This reduce result will then merge with existing rereduce result to compute the final reduce result of this key.


e.g. If the key span between leave node A to F, then the key falls partially in node A and node F need to be sent to reduce() again. The result will be rereduced with node E’s existing reduce value and node P’s existing rereduce value.


DB Replication

CouchDB supports multiple DB replicas running in difference machines and provide a mechanism to synchronize their data. This is useful in 2 common scenarios
  • Occasionally connected applications (e.g. PDA). In this case, user can work in a disconnected mode for a time period and store his data changes locally. Later on when he connects back to his corporate network, he can synchronize his changes back to his corporate DB.
  • Mission critical app (e.g. clusters). In this case, the DB will be replicate across multiple machines so that reliability can be achieved through redundancy and high performance can be achieved through load balancing
Underneath there is a replicator process which accepts replication commands. The command specifies the source DB and target DB. The replicator will then ask the source DB for all the updated documents after a particular seq_num. In other words, the replicator need to keep track of the last seq_num. Then it send a request to the target DB to pull the current revision history of all these documents and check whether the revision history of the target is older than the source. If so, it will push the change documents to the target, otherwise, it will skip sending the doc.

At the targetDB, conflicts can be detected when the document have been updated in the target DB. The conflict will then be flagged in the revision tree pointed by the by_id index.

Before this conflict is resolved, CouchDB will consider the revision with the longest path to be the winner and will show that in the views. However, CouchDB expects there is a separate process (maybe manually) to fix the conflict.

Now, building multi-master replica model based on bi-directional data synchronization on top of the replicator is pretty straightforward.

For example, we can have a pair-wise "gossip" process that runs periodically (or triggered by certain events). The process will do the following ...
  1. Copy the changes from source = replica A to target = replica B
  2. Reverse the direction, copy the changes from source = replica B to target = replica A
  3. Pick randomly between replicaA or replicaB, call it a winner.
  4. Call a user-provided merge(doc_revA) function to fix the revision tree. Basically running app-specific logic to bring the tree back to a list.
  5. Copy the changes back from the winner to the loser. This will replicate the fixes.


Data Consistency Considerations

CouchDB doesn’t have the transaction concept nor keep track of the inter-dependency between documents. It is important to make sure that the data integrity doesn’t span across more than one documents.

For example, data integrity may become an issue if you application read document-X and based on what it read to update document-Y. It is possible that after you read document-X, some other application may have change document-X into something else that you are not aware of. And you update document-Y based on a stale value. CouchDB cannot detect these kind of conflict because it happens in two different documents.

Additional data consistency issues happen in the data replication setup. Since the data synchronization happens in the background, there will be a latency to see the latest changes if it happens in other replicas. If the client connect to the replica in an undeterministic way, then the following scenario can happen …
  • Client read a document and later read the same document again, but the 2nd read return an earlier revision than the 1st read.
  • Client update a document and later read the document again, but it doesn’t see his own update.

20 comments:

Cosmin Lehene said...

Hi Ricky,

One of the big changes with CouchDB since it's part of Apache and since Damien Katz is part of IBM is that it won't be written in Erlang anymore. It's being ported to Java.

http://damienkatz.net/2008/04/couchdb_language_change.html

Ricky Ho said...

You know this is Damien's joke.

Batok said...

A very good explanation of couchdb.

Just one clarification. You can add any number of attachments to a single document.

Domingo Aguilera.

Anonymous said...

Ricky, have a look at the date :-)

Unknown said...

Thanks for the excellent post! Perfect timing.

ryan said...

Interesting parallel: The update and snapshot mechanisms you describe sound a lot like what Jeff Bonwick described for how ZFS works at the ACM Reflections Projections conference in Urbana Illinois. http://www.acm.uiuc.edu/conference/2008/speakers#JeffBonwick

Инженер said...

Excellent work, Ricky! Thank you.

Pierre Lindenbaum said...

Thank you for this very interesting post !

Amr Ellafy said...

the Java thing was an April foul ! couchDB will keep with Erlang because java doesn't fit for such job (concurrency and parallel computing )

Anonymous said...

Thanks Ricky for posting this CouchDB Implementation blog entry. It covers just about everything to know about the project. I was wondering what it meant till a quick Google search led me to your blog

In fact, I find your blog interesting and have bookmarked it :)

Anonymous said...

Thanks for these interesting highlights about CouchDB =)

Anonymous said...

Thanks for this post.

robert said...

It covers just about everything to know about the project.

Yousuf Fauzan said...

Lovely.

However, I was wondering if the docid index update could be explained a bit more. Would really appreciate if you could amplify it a little.

J.F. Zarama said...

I am just learning CouchDB and I found this blog-entry very informative and useful;

N.B. The included diagrams are ver clear and useful to understand the subject; could you share with us what technology you use to produce them; thanks;

Anonymous said...

Thanks for this post.

Question : Please Exemple Implement Project with CouchDB ?

Anonymous said...

Hi, thank you for sharing this great info. Was just browsing through the net in my office and happened upon your blog. It is really very well written and quit comprehensive in explaining with a very simple language.

Unknown said...

Excellent braindump :)

Unknown said...

Hi Ricky

Question: If there are at most 2 values with the same key after grouping, will rereduce happen?

Detailed here: http://stackoverflow.com/questions/27008751/rereduce-and-group-true-in-couchdb

Unknown said...

Hi Ricky

http://stackoverflow.com/questions/27008751/rereduce-and-group-true-in-couchdb

If there are at most 2 values per each key after grouping, will rereduce happens in this case?