Monday, April 2, 2012

MongoDb Architecture

NOSQL has become a very heated topic for large web-scale deployment where scalability and semi-structured data driven the DB requirement towards NOSQL. There has been many NOSQL products evolving in over last couple years. In my past blogs, I have been covering the underlying distributed system theory of NOSQL, as well as some specific products such as CouchDB and Cassandra/HBase.

Last Friday I was very lucky to meet with Jared Rosoff from 10gen in a technical conference and have a discussion about the technical architecture of MongoDb. I found the information is very useful and want to share with more people.

One thing I am very impressed by MongoDb is that it is extremely easy to use and the underlying architecture is also very easy to understand.

Here are some simple admin steps to start/stop MongoDb server
# Install MongoDB
mkdir /data/lib

# Start Mongod server
.../bin/mongod # data stored in /data/db

# Start the command shell
> show dbs
> show collections

# Remove collection
> db.person.drop()

# Stop the Mongod server from shell
> use admin
> db.shutdownServer()

Major difference from RDBMS
MongoDb differs from RDBMS in the following way
  • Unlike an RDBMS record which is "flat" (a fixed number of simple data type), the basic unit of MongoDb is "document" which is "nested" and can contain multi-value fields (arrays, hash).
  • Unlike RDBMS where all records stored in a table must be confined to the table schema, documents of any structure can be stored in the same collection.
  • There is no "join" operation in the query. Overall, data is encouraged to be organized in a more denormalized manner and the more burden of ensuring data consistency is pushed to the application developers
  • There is no concept of "transaction" in MongoDb. "Atomicity" is guaranteed only at the document level (no partial update of a document will occurred).
  • There is no concept of "isolation", any data read by one client may have its value modified by another concurrent client.
By removing some of those features that a classical RDBMS will provide, MongoDb can be more light-weight and be more scalable in processing big data.
Query processingMongoDb belongs to the type of document-oriented DB. In this model, data is organized as JSON document, and store into a collection. Collection can be thought for equivalent to Table and Document is equivalent to records in RDBMS world.

Here are some basic example.
# create a doc and save into a collection
> p = {firstname:"Dave", lastname:"Ho"}
> db.person.insert({firstname:"Ricky", lastname:"Ho"})

# Show all docs within a collection
> db.person.find()

# Iterate result using cursor
> var c = db.person.find()
> p1 =
> p2 =

To specify the search criteria, an example document containing the fields that needs to match against need to be provided.
> p3 = db.person.findone({lastname:"Ho"})
Notice that in the query, the value portion need to be determined before the query is made (in other words, it cannot be based on other attributes of the document). For example, lets say if we have a collection of "Person", it is not possible to express a query that return person whose weight is larger than 10 times of their height.
# Return a subset of fields (ie: projection)
> db.person.find({lastname:"Ho"}, {firstname:true})

# Delete some records
> db.person.remove({firstname:"Ricky"})
To speed up the query, index can be used. In MongoDb, index is stored as a BTree structure (so range query is automatically supported). Since the document itself is a tree, the index can be specified as a path and drill into deep nesting level inside the document.
# To build an index for a collection
> db.person.ensureIndex({firstname:1})

# To show all existing indexes
> db.person.getIndexes()

# To remove an index
> db.person.dropIndex({firstname:1})

# Index can be build on a path of the doc.
> db.person.ensureIndex({"":1})

# A composite key can be used to build index
> db.person.ensureIndex({lastname:1, firstname:1})
Index can also be build on an multi-valued attribute such as an array. In this case, each element in the array will have a separate node in the BTree.

Building an index can be done in both offline foreground mode or online background mode. Foreground mode will proceed much faster but the DB cannot be access during the build index period. If the system is running in a replica set (describe below), it is recommended to rotate each member DB offline and build the index in foreground.

When there are multiple selection criteria in a query, MongoDb attempts to use one single best index to select a candidate set and then sequentially iterate through them to evaluate other criteria.

When there are multiple indexes available for a collection. When handling a query the first time, MongoDb will create multiple execution plans (one for each available index) and let them take turns (within certain number of ticks) to execute until the fastest plan finishes. The result of the fastest executor will be returned and the system remembers the corresponding index used by the fastest executor. Subsequent query will use the remembered index until certain number of updates has happened in the collection, then the system repeats the process to figure out what is the best index at that time.

Since only one index will be used, it is important to look at the search or sorting criteria of the query and build additional composite index to match the query better. Maintaining an index is not without cost as index need to be updated when docs are created, deleted and updated, which incurs overhead to the update operations. To maintain an optimal balance, we need to periodically measure the effectiveness of having an index (e.g. the read/write ratio) and delete less efficient indexes.

Storage Model
Written in C++, MongoDB uses a memory map file that directly map an on-disk data file to in-memory byte array where data access logic is implemented using pointer arithmetic. Each document collection is stored in one namespace file (which contains metadata information) as well as multiple extent data files (with an exponentially/doubling increasing size).

The data structure uses doubly-linked-list extensively. Each collection of data is organized in a linked list of extents each of which represents a contiguous disk space. Each extent points to a head/tail of another linked list of docs. Each doc contains a linked list to other documents as well as the actual data encoded in BSON format.

Data modification happens in place. In case the modification increases the size of record beyond its originally allocated space, the whole record will be moved to a bigger region with some extra padding bytes. The padding bytes is used as an growth buffer so that future expansion doesn't necessary require moving the data again. The amount of padding is dynamically adjusted per collection based on its modification statistics. On the other hand, the space occupied by the original doc will be free up. This is kept tracked by a list of free list of different size.

As we can imagine holes will be created over time as objects are created, deleted or modified, this fragmentation will hurt performance as less data is being read/write per disk I/O. Therefore, we need to run the "compact" command periodically, which copy the data to a contiguous space. This "compact" operation however is an exclusive operation and has to be done offline. Typically this is done in a replica setting by rotating each member offline one at a time to perform the compaction.

Index are implemented as BTree. Each BTree node contains a number of keys (within this node), as well as pointers to left children BTree nodes of each key.

Data update and Transaction
To update an existing doc, we can do the following
var p1 = db.person.findone({lastname:"Ho"})
p1["address"] = "San Jose"

# Do the same in one command
              {$set:{address:"San Jose"}},
Write by default doesn't wait. There are various wait options that the client can specified what conditions to wait before the call returns (this can also achievable by a followup "getlasterror" call), such as where the changes is persisted on disk, or changes has been propagated to sufficient members in the replica set. MongoDb also provides a sophisticated way to assign tags to members of replica set to reflect their physical topology so that customized write policy for each collection can be made based on their reliability requirement.

In RDBMS, "Serializability" is a very fundamental concept about the net effect of concurrently executing work units is equivalent to as if these work units are arrange in some order of sequential execution (one at a time). Therefore, each client can treat as if the DB is exclusively available. The underlying implementation of DB server many use LOCKs or Multi-version to provide the isolation. However, this concept is not available in MongoDb (and many other NOSQL as well)

In MongoDb, every data you read should be treated as a snapshot of the past, which means by the time you look at it, it may have been changed in the DB. Therefore, if you are making a modification based on some previously read data, by the time you send the modification request, the condition where your modification is based on may have changed. If this is not acceptable for your application's consistency requirement, you may need to re-validate the condition at the time you request the modification (ie: a "conditional_modify" should be made).

Under this scheme, a "condition" is attached along with the modification request so that the DB server can validate the condition before applying the modification. (of course, the condition checking and modification must be atomic so no update can happen in between). In MongoDb, this can be achieved by the "findAndModify" call.
var account ={id:1234})
var old_bal = account['balance']
var new_bal = old_bal + fund

# Pre-condition is specified in search criteria{id:1234, balance:old_bal},
                   {$set: {balance: new_bal}})

# Check if the prev command successfully
var success =

if (!success) {
The concept of "transaction" is also missing in MongoDb. While MongoDb guarantee each document will be atomically modified (so no partial update will happen within a doc), but if the update modifies multiple documents, then there are no guarantee on the atomicity across documents.

Therefore, it is the application developers responsibility to implement the multi-update atomicity across multiple documents. We describe a common design pattern to achieve that. This technique is not specific to MongoDb and applicable to other NOSQL store, which can at least guarantee atomicity at the single record level.

The basic idea is to first create a separate document (called transaction) that links together all the documents that you want to modify. And then create a reverse link from each document (to be modified) back to the transaction. By carefully design the sequence of update in the documents and the transaction, we can achieve the atomicity of modifying multiple documents.

MongoDb's web site has also described a similar technique here (based on the same concept but the implementation is slightly different).

Replication Model
High availability is achieved in MongoDb via Replica Set, which provides data redundancy across multiple physical servers, including a single primary DB as well as multiple secondary DBs. For data consistency, all modifications (insert / update / deletes) request go to the primary DB where modification is made and asynchronously replicated to the other secondary DBs.

Within the replica set, members are interconnected with each other to exchange heartbeat message. A crashed server with missing heartbeat will be detected by other members and removed from the replica set membership. After the dead secondary recovers in future, it can rejoin the cluster by connecting to the primary to catchup the latest update since its last crashed. If the crashes happens in a lengthy period of time where the change log from the primary doesn't cover the whole crash period, then the recovered secondary need to reload the whole data from the primary (as if it was a brand new server).

In case of the primary DB crashes, a leader election protocol will be run among the remaining members to nominate the new primary, based on many factors such as the node priority, node uptime ... etc. After getting majority vote, the new primary server will take place. Notice that due to async replication, the newly elected primary DB doesn't necessary having all the latest updated from the crashed DB.

Client lib provides the API for the App to access the MongoDB server. At startup, the client lib will connect to some member (based on a seed list) of the Replica set and issue a "isMaster" command to gather the current picture of the set (who is the primary and secondaries). After that, the client lib connect to the single primary (where it will send all DB modification request) and some number of secondaries (where it will send read-only queries). The client library will periodically re-run the "isMaster" command to detect if any new members has joined the set. When an existing member in the set is crashed, connections to all existing clients will be dropped and forces a resynchronization of the latest picture.

There is also a special secondary DB called slave delay, which guarantee the data is propagated with a certain time lag with the master. This is used mainly to recover data after accidental deletion of data.

For data modification request, the client will send the request to the primary DB, by default the request will be returned once written to the primary, an optional parameter can be specified to indicate a certain number of secondaries need to receive the modification before return so the client can ensure the majority portion of members have got the request. Of course there is a tradeoff between latency and reliability.

For query request, by default the client will contact the primary which has the latest updated data. Optionally, the client can specify its willingness to read from any secondaries, and tolerate that the returned data may be outdated. This provide an opportunity to load balance the read request across all secondaries. Notice that in this case, a subsequent read following a write may not seen the update.

For read-mostly application, reading form any secondaries can be a big performance improvement. To select the fastest secondary DB member to issue query, the client driver periodically pings the members and will favor issuing the query to the one with lowest latency. Notice that read request is issued to only one node, there is no quorum read or read from multiple nodes in MongoDb.

The main purpose of Replica set is to provide data redundancy and also load balance read-request. It doesn't provide load balancing for write-request since all modification still has to go to the single master.

Another benefit of replica set is that members can be taken offline on an rotation basis to perform expensive operation such as compaction, indexing or backup, without impacting online clients using the alive members.

Sharding Model
To load balance write-request, we can use MongoDb shards. In the sharding setup, a collection can be partitioned (by a partition key) into chunks (which is a key range) and have chunks distributed across multiple shards (each shard will be a replica set). MongoDb sharding effectively provide an unlimited size for data collection, which is important for any big data scenario.

To reiterate, in the sharding model, a single partition key will be specified for each collection that is stored in the sharding cluster. The key space of the partition key is divided into contiguous key range called chunk, which is hosted by corresponding shards.

# To define the partition key
db.runcommand({shardcollection: "testdb.person",
         key: {firstname:1, lastname:1}})
In the shard setting, the client lib connects to a stateless routing server "MongoS", which behaves as if the "MongoD". The routing server plays an important role in forwarding the client request to the appropriate shard server according to the request characteristics.

For insert/delete/update request containing the partition key, based on the chunk/shard mapping information (obtained from the config Server and cache locally), the route server will forward the request to the corresponding primary server hosting the chunk whose key range covers the partition key of the modified doc. Given a particular partition key, the primary server containing that chunk can be unambiguously determined.

In case of query request, the routing server will examine whether the partition key is part of the selection criteria and if so will only "route" the request to the corresponding shard (primary or secondary). However, if the partition key is not part of the selection criteria, then the routing server will "scatter" the request to every shard (pick one member of each shard) which will compute its local search, and the result will be gathered at the routing server and return to the client. When the query requires the result to be sorted, and if the partition key is involved in the sort order, the routing server will route the request sequentially to multiple shards as the client iterate the result. In case the sort involves other key which is not the partition key, the router server will scatter the request to all shards which will perform its local sort, and then merge the result at the routing server (basically a distributed merge-sort).

As data are inserted into chunk and get close to its full capacity, we need to split the chunk. The routing server can detect this situation statistically based on the number of requests it forward as well as the number of other routing server exist. Then the routing server will contact the primary server of the shard that contains the full chunk and request for a chunk split. The shard server will compute the mid point of the key range that can evenly distribute the data and then split the chunk and update the configuration server about its split point. Notice that so far there is no data movement as data is still residing in the same shard server.

On the other hand, there is another "balancer" process (running in one of the routing server) whose job is to make sure each shard carry approximately same number of chunks. When the unbalance condition is detected, the balancer will contact the busy shard to trigger a chunk migration process. This migration process happens online where the origination contacts the destination to initiate a data transfer, and data will start to be copied from the origination to the destination. This process may take some time (depends on the data volume) during which update can happen continuously at the origination. These changes will be tracked at the origination and when the copy finishes, delta will then transfer and applied to the destination. After multiple rounds of applying deltas, the migration enters the final round and the origination will halt and withhold all request coming from the routing server. After the last round of changes have been applied to the destination, the destination will update the configuration server about the new shard configuration and notify the origination shard (which is still withholding the request) to return a StaleConfigException to the Routing server, which will then re-read the latest configuration from the configServer and re-submit the previous requests. At some future point in time, data at the origination will be physically deleted.

It is possible that under a high frequency update condition, the changes being applied to the destination is unable to catch up with the update rate happen at the origination. When this situation is detected, the whole migration process will be aborted. The routing server may pick a different chunk to start the migration afterwards.

Map/Reduce Execution
MongoDb provide a Map/Reduce framework to perform parallel data processing. The concept is similar to Hadoop Map/Reduce, but with the following small differences ...
  • It takes input from the query result of a collection rather than HDFS directory
  • The reduce output can be append to an existing collection rather than an empty HDFS directory
Map/Reduce in Mongo works in a slightly different way as follows
  1. Client define a map function, reduce function, query that scope the input data, and an output collection that store the output result.
  2. Client send the request to the MongoS routing server
  3. MongoS forward the request to the appropriated shards (route or scatter depends on whether partition key appears in the query). Notice that MongoS will pick one member of each shard, currently always send to the primaryDB
  4. Primary DB of each shard executes the query and pipe output to the user-defined map function, which emit a bunch of key value pairs stored in memory buffer. When the memory buffer is full, a user-defined reducer function will be invoked that partially reduce the key values pairs in the memory buffer, result stored on the local collection.
  5. When step (4) completes, the reduce function will be executed on all the previous partially reduced result to merge a single reduced result on this server.
  6. When step (5) finishes, MongoS notifies the corresponding shard servers that will store the output collection. (if the output collection is non-partitioned, only a single shard will be notified, otherwise all shards will be notified).
  7. The primary db of the shard(s) storing the final collection will call for every shard to collect the partially reduced data previously done. It will only ask for the result based on its corresponding key range.
  8. The primary db run the reduce() function again on the list of partially reduced result. Then store the final reduced result locally. If the user provide a finalize function, it will be invoked as well.
Here is a simple example to build an inverted index from document to topics{title:"NOSQL",
             about:["software", "db"]}){title:"Java programming",
             about:["software", "program"]}){title:"Mongo",
             about:["db", "technology"]}){title:"Oracle",
             about:["db", "software"]})

m = function() {
 for (var i in this.about) {
     emit(this.about[i], this.title)

r = function(k, vals) {
 return({topic:k, title:vals})
}, r, {query:{},

Overall speaking, I found MongoDb is very powerful and easy to use. I look forward to use MongoDb with Node.js and will share my experience in future blogs.

Especially thanks to Jared Rosoff who provides me a lot of details of how MongoDb is implemented.


Jiacheng Yang said...

Thanks for sharing the notes. I didn't use MongoDB before. But your explanation is clear enough for me to understand how it works.

Jagbir said...

Its so delightful, impressive and flawless reading about MongoDB. Thanks for lot for taking time and penning down all this.

Artur Ejsmont said...

Very good article. Great diagrams :) thanks a lot for sharing.

Suraj Tamang said...

Awesome blog about mongoDB arch. Thanks for sharing....

matro said...

very interesting post, thank you.

Scott said...

Slight correction: mongos is not stateless.

It creates cursor ids (mapping them to shards for queries) and other state related to a session, like authentication on the connection.

Richard MareŇ° said...

Nice article, thanks.

NinethSense said...

It is nice that I happen to read this. Very informative. Thanks for sharing

BI_Buff said...

A very nice and detailed Post on MongoDB.
Thank you

Felipe Antunes said...

Thank you, mr. Ho. For this awesome and very complete article.

BryanChance said...

Fantastic write-up on MongoDB and NOSQL in general. I've played around with MongoDB and CouchDB but never used in production environment. Thank you so much.

mdmd said...

The most important point is that MongoDB 'leaves it to the application developer' to implement durability or consistency. The management level forces you to use it, but expects the same features as they are used from a RDBMS. I can tell this is hell; you have to come up with solutions for that, and it is never taken into account on the project plan, that it is a major task to implement the feature set of any RDBMS, beside the rest of the application use cases. In the end you lose any advantage of NoSQL; it is getting slower and slower the more durable, high-available and consistent your data has to be...

Unknown said...

I have been a fan of "Pragmatic Programming Techniques" since I read the post on NoSQL concepts, HBase. Written what you need to read. Good Job.

Sambit Tripathy

Unknown said...

Thanks for sharing information. It’s quite interesting. You can also get complete info on MangoDB here

Unknown said...

can please someone tell me the complete architecture of load balancer in mongodb and who it work? where it placed to do the balancing work? what are the techniques it used to balance the data among the replicas and shards.
I'm working on balancer of mongodb and i want to improve the working of balancer.