Thursday, October 30, 2008

CouchDB Cluster

Lets look at how one can layer a cluster on top of CouchDB.

Couch Cluster

A “Couch Cluster” is composed of multiple “partitions”. Each partition is composed of multiple replicated DB instances. We call each replica a “virtual node”, which is basically a DB instance hosted inside a "physical node", which is a CouchDB process running in a machine. “Virtual node” can migrate across machines (which we also call “physical node”) for various reasons, such as …
  • when physical node crashes
  • when more physical nodes are provisioned
  • when the workload of physical nodes are unbalanced
  • when there is a need to reduce latency by migrating closed to the client

Proxy


The "Couch Cluster" is frontend by a "Proxy", which intercept all the client's call and forward it to the corresponding "virtual node". In doing so, the proxy has a "configuration DB" which store the topology and knows how the virtual nodes are distributed across physical nodes. The configuration DB will be updated at more DBs are created or destroyed. Changes of the configuration DB will be replicated among the proxies so each of them will eventually share the same picture of the cluster topology.


In this diagram, it shows a single DB, which is split into 2 partitions (the blue and orange partitions). Each partition has 3 replicas, where one of them is the primary and the other two are secondaries.

Create DB
  1. Client call Proxy with URL=http://proxy/dbname; HTTP_Command = PUT /dbname
  2. Proxy need to determine number of partitions and number of replications is needed, lets say we have 2 partitions and each partition has 3 copies. So there will be 6 virtual nodes. v1-1, v1-2, v1-3, v2-1, v2-2, v2-3.
  3. Proxy also need to determine which virtual node is the primary of its partition. Lets say v1-1, v2-1 are primary and the rest are secondaries.
  4. And then Proxy need to determine which physical node is hosting these virtual nodes. say M1 (v1-1, v2-2), M2 (v1-2, v2-3), M3 (v1-3, v2-1).
  5. Proxy record its decision to the configuration DB
  6. Proxy call M1 with URL=http://M1/dbname_p1; HTTP_Command = PUT /dbname_p1. And then call M1 again with URL=http://M1/dbname_p2; HTTP_Command = PUT /dbname_p2.
  7. Proxy repeat step 6 to M2, M3

List all DBs
  1. Client call Proxy with URL=http://proxy/_all_dbs; HTTP_Command = GET /_all_dbs
  2. Proxy lookup the configuration DB to determine all the DBs
  3. Proxy return to client

Get DB info
  1. Client call Proxy with URL=http://proxy/dbname; HTTP_Command = GET /dbname
  2. Proxy will lookup the configuration DB for all its partitions. For each partition, it locates the virtual node that host the primary copy (v1-1, v2-1). It also identifies the physical node that host these virtual nodes (M1, M3).
  3. For each physical node, say M1, the proxy call it with URL=http://M1/dbname_p1; HTTP_Command = GET /dbname
  4. Proxy do the same to M3
  5. Proxy combine the results of M1, and M3 and then forward to the client

Delete DB
  1. Client call Proxy with URL=http://proxy/dbname; HTTP_Command = DELETE /dbname
  2. Proxy lookup which machines is hosting the clustered DB and find M1, M2, M3.
  3. Proxy call M1 with URL=http://M1/dbname_p1; HTTP_Command = DELETE /dbname_p1. Then Proxy call M1 again with URL=http://M1/dbname_p2; HTTP_Command = DELETE /dbname_p2.
  4. Proxy do the same to M2, M3

Get all documents of a DB
  1. Client call Proxy with URL=http://proxy/dbname/_all_docs; HTTP_Command = GET /dbname/_all_docs
  2. Proxy will lookup the configuration DB for all its partitions. For each partition, it randomly locates the virtual node that host a copy (v1-2, v2-2). It also identifies the physical node that host these virtual nodes (M1, M2).
  3. Proxy call M1 with URL=http://M1/dbname_p1/_all_docs; HTTP_Command = GET /dbname_p1/_all_docs.
  4. Proxy do the same to M2
  5. Proxy combine the results of M1, and M3 and then forward to the client

Create / Update a document
  1. Client call Proxy with URL=http://proxy/dbname/docid; HTTP_Command = PUT /dbname/docid
  2. Proxy will invoke "select_partition(docid)" to determine the partition, and then lookup the primary copy of that partition (e.g. v1-1). It also identifies the physical node (e.g. M1) that host this virtual node.
  3. The proxy call M1 with URL=http://M1/dbname_p1/docid; HTTP_Command = PUT /dbname_p1/docid

Get a document
  1. Client call Proxy with URL=http://proxy/dbname/docid; HTTP_Command = GET /dbname/docid
  2. Proxy will invoke "select_partition(docid)" to determine the partition, and then randomly get a copy of that partition (e.g. v1-3). It also identifies the physical node (e.g. M3) that host this virtual node.
  3. The proxy call M3 with URL=http://M3/dbname_p1/docid; HTTP_Command = GET /dbname_p1/docid

Delete a document
  1. Client call Proxy with URL=http://proxy/dbname/docid?rev=1234; HTTP_Command = DELETE /dbname/docid?rev=1234
  2. Proxy will invoke "select_partition(docid)" to determine the partition, and then lookup the primary copy of that partition (e.g. v1-1). It also identifies the physical node (e.g. M1) that host this virtual node.
  3. The proxy call M1 with URL=http://M1/dbname_p1/docid?rev=1234; HTTP_Command = DELETE /dbname_p1/docid?rev=1234

Create a View design doc
  1. Client call Proxy with URL=http://proxy/dbname/_design/viewid; HTTP_Command = PUT /dbname/_design/viewid
  2. Proxy will determine all the virtual nodes of this DB, and identifies all the physical nodes (e.g. M1, M2, M3) that host these virtual nodes.
  3. The proxy call M1 with URL=http://M1/dbname_p1/_design/viewid; HTTP_Command = PUT /dbname_p1/_design/viewid. Then proxy call M1 again with URL=http://M1/dbname_p2/_design/viewid; HTTP_Command = PUT /dbname_p2/_design/viewid.
  4. Proxy do the same to M2, M3

Query a View
  1. Client call Proxy with URL=http://proxy/dbname/_view/viewid/attrid; HTTP_Command = GET /dbname/_view/viewid/attrid
  2. Proxy will determine all the partitions of "dbname", and for each partition, it randomly get a copy of that partition (e.g. v1-3, v2-2). It also identifies the physical node (e.g. M1, M3) that host these virtual nodes.
  3. The proxy call M1 with URL=http://M1/dbname_p1/_view/viewid/attrid; HTTP_Command = GET /dbname_p1/_view/viewid/attrid
  4. The proxy do the same to M3
  5. The proxy combines the result from M1, M3. If the "attrid" is a map only function, the proxy will just concatenate all the results together. But if the "attrid" has a reduce function defined, then the proxy will invoke the view engine's reduce() function with rereduce = true. Then the proxy return the combined result to the client.

Replication within the Cluster
  1. Periodically, Proxy will replicate the changes of ConfigurationDB among themselves. This will ensure all the proxies are having the same picture of the topology.
  2. Periodically, Proxy will pick a DB, pick one of its partition, and replicate the changes from the primary to all the secondaries. This will make sure all the copies of each partition of DB are in sync.

Client data sync

Lets say the client also has a local DB, which is replicated from the cluster. This is important for occasionally connected scenario, where the client may disconnect with the cluster for a time period and work with the local DB for a while. Later on when the client connects back to the cluster again, the data between the local DB and the cluster need to be synchronized.

To replicate changes from the local DB to the cluster ...
  1. Client start a replicator, and send the POST /_replicate with {source : "http://localhost/localdb, target: "http://proxy/dbname"}
  2. The replicator, which has remembered the last seq_num of the source in the previous replication, and extract all the changes of the localDB since then.
  3. The replicator push these changes to the proxy.
  4. The proxy will examine the list of changes. For each changed document, it will call "select_partition(docid)" to determine the partition, and then lookup the primary copy of that partition and then the physical node that host this virtual node.
  5. The proxy will push this changed document to the physical node. In other words, the primary copy of the cluster will first receive the changes from the localDB. These changes will be replicated to the secondary copies at a latter time.
  6. When complete, the replicator will update the seq_num for the next replication.
To replicate the changes from the cluster to the localDB
  1. Client starts the replicator, which has remembered the last "seq_num" array of the cluster. The seq_num array contains all the seq_num of each virtual node of the cluster. This seq_num array is a opaque data structure which the replicator doesn't care.
  2. The replicator send a request to the proxy to extract the latest changs, along with the seq_num array
  3. The proxy first lookup who is the primary of each partition, and then it extract changes from them using the appropriate seq_num from the seq_num array.
  4. The proxy consolidate all changes from each primary copy of each partition, and send them back to the replicator, along with the updated array of seq_num.
  5. The replicator apply these changes to the localDB, and then update the seq_num array for the next replication.

Monday, October 27, 2008

Consistent Multi-Master DB Replication

As explain in my CouchDB implementation notes, the current replication mechanism doesn't provide consistency guarantees. This means if the client connects to different replicas at different time, she may see weird results, including ...
  • Client read a document X and later read the same document X again, but the 2nd read return an earlier revision of X than the 1st read.
  • Client update a document X and after some time, read the document X again, but he doesn’t see his previous update.
  • Client read a document X and based on its value, update document Y. Another client may see the update on document Y but doesn't see document X which document Y's update is based on.
  • Even if a client 1st update document X and then later on update document X the 2nd time, CouchDB may wrongly-perceive there is a conflict between the two updates (if they land on different replicas) and resort to a user-provided resolution strategy to resolve the conflict.
To prevent above situations from happening, here describe a possible extension of CouchDB to provides a "causal consistency guarantee" based on Vector Clock Gossiping technique. The target environment is a cluster of machines.

Here is a few definitions ...

Causal Consistency
  • It is not possible to see the effects before seeing its causes. In other words, when different replicas propagate their updates, it always apply the updates of the causes before applying updates of the "effect".
  • "Effects" and "Causes" are related by a "happens-before" relationship. ie: causes happens-before effect.

Logical Clock
  • A monotonically increasing sequence number that is atomically increase by one whenever an "event" occur.
Event
  • Update a state locally
  • Sending a message
  • Receiving a message

Vector Clock
  • An array of logical clocks where each entry represents the logical clock of a different process
  • VC1 >= VC2 if for every i, VC1[i] >= VC2[i]
  • VC3 = merge(VC1, VC2) where for every i, VC3[i] = max(VC1[i], VC2[i])

Architecture

The basic idea is ...
  • When the client issue a GET, the replica should only reply when it is sure that it has got a value later than what the client has seen before. Otherwise, it delays its response until that happens.
  • When the client issue an PUT/POST/DELETE, the replica immediately acknowledge the client but instead of applying the update immediately, it will put this request into a queue. After all other updates that this update depends on has been applied to the DB state, this update will be applied.
  • Replicas in the background will exchange their update logs so that all the updates will be propagated to all copies.

Each replica maintains ...
  • A "replica-VC" is associated with the whole replica, which is updated when an update request is received from a proxy, or when a gossip message is sent or received.
  • A "state-VC" is associated with the state, which is updated when a pending update from the queue is applied to the local DB
  • A set of other replica's VC, this is the vector clock obtained from other replicas during the last gossip message received from them

The client talks to the same proxy, which maintains the Client's Vector clock. This vector clock is important to filter out inconsistent data when the proxy talking to the replicas which the proxy can choose randomly.

Read (GET) Processing
  1. When the client issue a READ, the proxy can choose any replica to forward its GET (along with the Client's vector clock).
  2. The chosen replica will return the GET result only when it make sure its DB has got the state which is "more updated" than what the client has seen. (ie: stateVC >= clientVC). Otherwise, it will delay its response until this condition happen.
  3. The proxy may timeout and contact another replica
  4. The response of the replica contains its replicaVC. The proxy will refresh its clientVC = merge(clientVC, replicaVC)

Update (PUT/POST/DELETE) Processing
  1. When the client issue an UPDATE, the proxy can choose any replica to forward its UPDATE (which contains a uniqueId, the Client's vector clock and the operation's data).
  2. For fault tolerant reason, the proxy may pick multiple replica to forward its updates (e.g. it may pick M replicas to forward its request and return "success" to the client when N replicas ACK back).
  3. The chosen replica(s) will first advance its logical clock and the replicaVC.
  4. The replica compute a vector timestamp by copying from the clientVC and modify its entry to its logical clock. (ie: ts = clientVC; ts[myReplicaNo] = logicalClock)
  5. The replica attach this timestamp to the update request and put the UPDATE request into the queue. The update record "u" =
  6. The replica send an ACK message containing its replicaVC to the proxy. The proxy will refresh its clientVC = merge(clientVC, replicaVC)
Applying Pending Updates
  1. A pending update "u" can be applied to the state when all the "states" that it depends on has been applied. (ie: stateVC >= u.clientVC)
  2. Periodically, the updatelog is scanned for the above criteria
  3. When this happens, it applies the update "u" to the DB and then update the stateVC = merge(stateVC, u.ts)
  4. Note that while this mechanism guarantees that updates happens in "casual order", (ie: the "effect" will not be updated before its "causes"). It doesn't guarantees "total order". Because independent updates (or concurrent updates) can happen in arbitrary order, the order it happen in different replicas may be different.
Processing Gossip Messages

It is important that Replica exchange the request log among themselves so eventually everyone will have a complete picture for all the update request regardless of where that happens.

Periodically, each replica picks some other replica to send its update log. The strategy to pick who to communicate can be based on a random selection, or based on topology (only talk to neighbors), or based on degree of outdateness (the one with longest time we haven't talked). Once the target replica is selected, a complete update log together with its current replicaVC will be sent to the target replica.

On the other hand, when a replica receive a gossip message from another replica...
  • It will merge the update log of the message with its own update log. ie: For each update record u in the message's update log, it will add u to its own update log unless its replicaVC >= u.ts (which means it already has received a later update that suceed u)
  • Check to see some of the pending update is ready to be apply to the database. Adjust the stateVC accordingly
  • Delete some entries in the log after they have been applied to the DB and knowing that all other replicas has already got it. In other words, let c be the replicaId that "u" is created, then "u" is removable if for every replica i, otherReplicasVC[i][c] > u.ts[c]
  • Update the replicaVC = merge(replicaVC, message.replicaVC)

Sunday, October 19, 2008

CouchDB Implementation

CouchDB is an Apache OpenSource project. It is Damien Katz's brain child and has a number of very attractive features based on very cool technologies. Such as ...
  • RESTful API
  • Schema-less document store (document in JSON format)
  • Multi-Version-Concurrency-Control model
  • User-defined query structured as map/reduce
  • Incremental Index Update mechanism
  • Multi-Master Replication model
  • Written in Erlang (Erlang is good)
There is a wide range of application scenarios where CouchDB can be a good solution fit, from an occasionally connected laptop-based application, high performance data cluster, and all the way up to virtual data storage in the cloud.

To understand deeper about CouchDB design, I am very fortunate to have a conversation with Damien, who is so kind to share many details with me. Here I want to capture what I have learnt from this conversation.

Underlying Storage Structure
CouchDB is a “document-oriented” database where document is a JSON string (with an optional binary attachment). The underlying structure is composed of a “storage” as well as multiple “view indexes”. The “storage” is used to store the documents and the “view indexes” is used for query processing.

Within a storage file, there are “contiguous” regions
which is used to store documents. There are 2 B+Tree indexes to speed up certain assess to the documents.
  • by_id_index (which use the document id as the key). It is mainly use to lookup the document by its document id, it points to a list of revisions (or a tree of revisions in case of conflicts in the replication scenario) since the last compaction. It also keep a the revision history (which won't be affected by compaction).
  • by_seqnum_index (which use a monotonically increasing number as the key). Seqnum is generated whenever a document is updated. (Note that all updates are happening is a serial fashion so the seqnum reflect a sequence of non-concurrent update). It is mainly use to keep track of last point of replication synchronization, last point of view index update.


Append Only Operation

All updates (creating documents, modifying documents and deleting documents) happens in an append only mechanism. Instead of modifying the existing documents, a new copy is created and append to the current region. After that, the b+tree nodes are also modified to point to the new document location. Modification to the b+tree nodes also done in an append-only fashion, which means a new b+tree node is copy and tail-append to the end of the file. This in turn trigger a modification to the parent node of the b+tree node, which cause a new copy of the parent node … until all the way back to the root b+tree node. And finally modify the file header to point to the new root node.

That means all updates will trigger 1 write to the document (except delete) and logN writes to each B+Tree node page. So it is O(logN) complexity.

Append-only operation provide an interesting MVCC (Multi-Version Concurrency Control) model because the file keep a history of all the versions of previous document state. As long as the client hold on to a previous root node of the B+Tree index, it can get a snapshot view. While update can continuously happen, the client won’t see any of the latest changes. Such consistency snapshot is very useful in online backup as well as online compaction.

Note that while read operation is perform concurrently with other read and write. Write operation is perform in a serial order across documents. In other words, at any time only one document update can be in progress (however, write of attachments within a document can happen in parallel).

GET document

When a client issue a HTTP REST GET call to CouchDB, the DBServer …
  • Look at the file header to find the root node of the by_id B+Tree index
  • Traverse down the B+tree to figure out the document location
  • Read the document and return back to client

PUT document (modification)

When a client issue a HTTP REST POST call to CouchDB, the DBServer …
  • Look at the file header to find the root node of the by_id B+Tree index
  • Traverse down the B+tree to figure out the leaf node as well as the document location
  • Read the document. Compare the revision, throw an error if they don’t match.
  • If they match, figure out the old seqnum of the current revision.
  • Generate a new (monotonic increasing) seqnum as well as a new revision
  • Find the last region to see if this document can fit in. If not, allocate another contiguous region.
  • Write the document (with the new revision) into the new region
  • Modify the by_id b+tree to point to the new document location
  • Modify the by_seqnum b+tree to add the new entry (of the new seqnum) and remove the old entry (of the old seqnum).
Note that the by_seqnum B+Tree index always point to the latest revision, previous revision is automatically forgotten.

PUT / POST document (creation)

When a client issue a HTTP REST PUT call to CouchDB, the DBServer …
  • Generate a new (monotonic increasing) seqnum as well as a new document id and revision
  • Find the last region to see if this document can fit in. If not, allocate another contiguous region.
  • Write the document (with the new revision) into the new region
  • Modify the by_id b+tree to point to the new document location
  • Modify the by_seqnum b+tree to add the new entry (of the new seqnum)

DELETE document (modify)
When a client issue a HTTP REST DELETE call to CouchDB, the DBServer …
  • Look at the file header to find the root node of the by_id B+Tree index
  • Traverse down the B+tree to figure out the leaf node as well as the document location
  • Read the document. Compare the revision, throw an error if they don’t match.
  • If they match, figure out the old seqnum of the current revision.
  • Generate a new (monotonic increasing) seqnum as well as a new revision
  • Modify the by_id b+tree revision history to show this revision path is deleted
  • Modify the by_seqnum b+tree to add the new entry (of the new seqnum) and remove the old entry (of the old seqnum).
Online Compaction

As an append-only operation, the storage file will grow over time. So we need to compact the file regularly.
  • Open a new storage file
  • Walk the by_seqnum b+tree index (which only points to the latest revision), locate the document
  • Copy the document to the new storage file (which automatically update the corresponding b+tree indexes in the new storage file).
Note that because of the characteristic of MVCC, the compaction will get a consistency snapshot and can happen concurrently without being interfered by the continuously update after the start of compaction. However, if the rate of update is too high, the compaction process can never catch up with the update which keep appending to the file. There is a throttling mechanism under development to slow down the client update rate.

View Indexes

CouchDB supports a concept of “view” to the database. A view is effectively the result of user-defined processing to the underlying document repository. The user-defined processing has to be organized as a two-step processing, “map” and “reduce”. (note that the reduce semantics is very different from Google’s Map/Reduce model). Map() is a user defined function which transform each documents into zero, one or multiple intermediate objects, which reduce() is another user defined function to consolidate the intermediate objects into the final result.

The intermediate objects of the map() and the reduce() is stored in the view indexes. As the storage gets updated, the previous results stored in the view indexes is no longer valid and has to be updated. CouchDB use an incremental update mechanism so that the refresh of the view indexes is highly efficient.

Views definitions are grouped into a design document.

Each view is defined by one “map” function and an optional “reduce” function.

map = function(doc) {
 …
 emit(k1, v1)
 …
 emit(k2, v2)
 …
}

reduce = function(keys, values) {
 …
 return result
}
The reduce() function needs to be commutative and associative so that the order of reduction can be arbitrary.

Views defined within each design document is materialized in a view file.


Initially, the view file is empty (no index has been built yet). View is built lazily when the first query is made.
  1. CouchDB will walk the by_seqnum B+Tree index of the storage file.
  2. Based on that, CouchDB get the latest revisions of all existing documents
  3. CouchDB remembers the last seqnum and then feed each document to the View Server using “map_doc”.
  4. View Server invoke the map(doc) function, for each emit(key, value) call, an entry is created.
  5. Finally, a set of entries is computed and return back to CouchDB.
  6. CouchDb will add those entries into the B+Tree index, key = emit_key + doc_id. For each of the B+Tree leave node.
  7. CouchDB will send all its containing map entry back to the View Server using “reduce”.
  8. View Server invoke the reduce(keys, values) function.
  9. The reduce result is computed and return back to CouchDB
  10. CouchDb will update the leave B+Tree node to point to the reduce value of its containing map results.
  11. After that, CouchDb move up one level to the parent of the leave B+Tree node. For each of the B+Tree parent node, CouchDB will send the corresponding reduce result of its children nodes to the View Server using “rereduce”.
  12. View Server invoke the reduce(keys, values) function again.
  13. Finally a rereduce result is computed and return back to CouchDB.
  14. CouchDB will update the parent B+Tree node to point to the rereduce value.
CouchDB continues to move up one level and repeat the calculation of rereduce result. Finally the rereduce result of the root node is also updated.


When done, the view index will look something like this …



Incremental View Update

CouchDB updates the view indexes lazily and incrementally. That means, when the documents are updated, CouchDB will not refresh the view index until the next query reaches CouchDB.

Then CouchDB refresh the index in the following way.
  • CouchDB will then walk the by_seqnum B+Tree index of the storage file, starting from the last seqnum.
  • CouchDB extract all the change documents since the last view query and feed them to the view server’s map function, and get back a set of map results.
  • CouchDb update the map result into the B+Tree index, some of the leave B+Tree node will be updated.
  • For those updated leave B+Tree node, CouchDB resend all its containing map entries back to view server to recomputed the reduce value. Then store the reduced value inside the B+Tree node.
  • All the parents of the updated leave B+Tree node, CouchDB need to recompute the rereduce value and store it inside the B+Tree node. Until all the way up to the root node.
Because of the consistent snapshot characteristic, a long-running view query can run concurrently (without interference) with the ongoing update of the DB. However, the query need to wait for the completion of the view index update before seeing the consistent result. There is also an option (under development) to immediately return a stale copy of the view in case the client can tolerate that.

Query processing

When client retrieve the result of a view, there are the following scenarios

Query on Map-only view
In this case, there is no reduce phase of the view indexes update. To perform the query processing, CouchDB simply search the B+Tree to locate the corresponding starting point of the key (note that the key is prefixed by the emit_key) and then return all the map results of that key

Query on Map with reduce
There are 2 cases. If the query is on the final reduce value over the whole view, then CouchDB will just return the rereduce value pointed by the root of B+Tree of the view index.

If the query is on the reduce value of each key (group_by_key = true), then CouchDB try to locate the boundary of each key. Since this range is probably not fitting exactly along the B+Tree node, CouchDB need to figure out the edge of both ends to locate the partially matched leave B+Tree node and resend its map result (with that key) to the View Server. This reduce result will then merge with existing rereduce result to compute the final reduce result of this key.


e.g. If the key span between leave node A to F, then the key falls partially in node A and node F need to be sent to reduce() again. The result will be rereduced with node E’s existing reduce value and node P’s existing rereduce value.


DB Replication

CouchDB supports multiple DB replicas running in difference machines and provide a mechanism to synchronize their data. This is useful in 2 common scenarios
  • Occasionally connected applications (e.g. PDA). In this case, user can work in a disconnected mode for a time period and store his data changes locally. Later on when he connects back to his corporate network, he can synchronize his changes back to his corporate DB.
  • Mission critical app (e.g. clusters). In this case, the DB will be replicate across multiple machines so that reliability can be achieved through redundancy and high performance can be achieved through load balancing
Underneath there is a replicator process which accepts replication commands. The command specifies the source DB and target DB. The replicator will then ask the source DB for all the updated documents after a particular seq_num. In other words, the replicator need to keep track of the last seq_num. Then it send a request to the target DB to pull the current revision history of all these documents and check whether the revision history of the target is older than the source. If so, it will push the change documents to the target, otherwise, it will skip sending the doc.

At the targetDB, conflicts can be detected when the document have been updated in the target DB. The conflict will then be flagged in the revision tree pointed by the by_id index.

Before this conflict is resolved, CouchDB will consider the revision with the longest path to be the winner and will show that in the views. However, CouchDB expects there is a separate process (maybe manually) to fix the conflict.

Now, building multi-master replica model based on bi-directional data synchronization on top of the replicator is pretty straightforward.

For example, we can have a pair-wise "gossip" process that runs periodically (or triggered by certain events). The process will do the following ...
  1. Copy the changes from source = replica A to target = replica B
  2. Reverse the direction, copy the changes from source = replica B to target = replica A
  3. Pick randomly between replicaA or replicaB, call it a winner.
  4. Call a user-provided merge(doc_revA) function to fix the revision tree. Basically running app-specific logic to bring the tree back to a list.
  5. Copy the changes back from the winner to the loser. This will replicate the fixes.


Data Consistency Considerations

CouchDB doesn’t have the transaction concept nor keep track of the inter-dependency between documents. It is important to make sure that the data integrity doesn’t span across more than one documents.

For example, data integrity may become an issue if you application read document-X and based on what it read to update document-Y. It is possible that after you read document-X, some other application may have change document-X into something else that you are not aware of. And you update document-Y based on a stale value. CouchDB cannot detect these kind of conflict because it happens in two different documents.

Additional data consistency issues happen in the data replication setup. Since the data synchronization happens in the background, there will be a latency to see the latest changes if it happens in other replicas. If the client connect to the replica in an undeterministic way, then the following scenario can happen …
  • Client read a document and later read the same document again, but the 2nd read return an earlier revision than the 1st read.
  • Client update a document and later read the document again, but it doesn’t see his own update.

Tuesday, August 12, 2008

Distributed Storage

Here we explore the consistency aspect of a distributed storage model. The motivation of using a distributed storage is for scalability, fault resiliency and cost reasons. The architecture is based on a large number of inexpensive (and unreliable hardware).

At the software level, we need to deal with
  • Partitioning -- How to partition our data among different servers
  • Replication -- How do we maintain copies of data in a consistent way

Distributed storage architecture


Supported Operations

We support a REST-based CRUD operations ...
  • put(key, value)
  • post(key, value) -- Semantics equivalent to "append"
  • delete(key)
  • get(key)

Consistency Models

Three model will be discussed

Full consistency
  • Update request will not be returned until the changes has been applied
  • Read request will always return the latest successfully updated data
Eventual consistency
  • READ request may return an outdated copy, but will never return an inconsistent copy (which doesn't exist in any serializable history)
  • All update will eventually be processed and viewable. Also, given enough silence (no update for some period of time), GET will eventually return the latest value.
Read-what-I-wrote
  • READ request may return a copy which is equal to or later than the version of the last update of the same user
  • For UPDATE request, same behavior as "eventual consistency"

Algorithms for Processing

Full consistency

There is no need for the operation queue in this case. Lets skip the operation queue and directly update the persistent state.
A version is attached to the data value per key. The version number is advanced when the update is successful.

PUT processing
  • Make parallel write request to R replicas, wait for Q success response within timeout period, return success.
  • Otherwise return failure. The data value is inconsistent and no operation can be proceed for this key until the consistency issue is manually fixed. (lets take a naive approach for now). The probability of failing can be reduced by increasing the value of R and Q.

GET processing
  • Make parallel read request to R replicas, wait for Q response that has the same version number, return its data value, otherwise return failure.

Background replica synchronization
  • Exchange version number periodically with remaining (R-1) replicas, if my version is different from the quorum Q, update myself to make it the same.

Eventual consistency

We need the operation queue. There is a background thread that asynchronously process the operation queue to update the persistent state.

PUT processing
  • Make parallel write request to R replicas, wait for M success response within timeout period, return success. (When receiving a write request, the replica will read the current version number V of the state and attached version number V+1 to the update operation).
  • Otherwise return failure. The data value is inconsistent. Again, the probability of failing can be reduced by increasing the value of R.

GET processing
  • Make parallel read request to R replicas, wait for first response and return its data value, otherwise return failure.

Background replica synchronization
  • We need a more sophisticated conflict resolution algorithm to merge operations which has the same version number. Following is what come to my mind (without analyzing in depth)
  • Starting from the M replicas, operation request is propagated among replicas in the background.
  • When Q replicas got the same operation request, it applies the operation to the persistent state and update its version number.

Read-what-I-wrote


PUT processing
  • Same as Eventual Consistency model
  • After successful update, store the version number (latest updated) in the user session

GET processing
  • Make parallel read request to R replicas, wait for first response which has the version number higher than the one stored in the user session, then return its data value and update the version in user session.
  • Otherwise, wait a moment and resend the READ request. (The user request timeout value should be set to be higher than the expected latency for background replica data synchronization)

Background replica synchronization
  • Same as Eventual consistency model

Saturday, July 5, 2008

Branch and Bound Algorithm

Branch and Bound is a tree pruning technique for solving optimization problem using search technique.

Optimization problem is trying to find a path which maximize (or minimize) the profit. This solution can be represented as a state-space tree. Each branch represents all the possible next steps. The goal is to find a path which returns the maximum profit.



Knapsack problem

We have n kinds of items, 1 through n. Each item j has a value pj and a weight wj. The maximum weight that we can carry in the bag is c. How should we decide which item to pick such that the sum of value is maximized (optimized) while the sum of weight is less than c (fulfill constraint).


Minmax algorithm

2 players make moves in turn. Now is Player 1's turn, how should he choose his move in order to minimize his maximum lost when look ahead N steps.



Brute force approach
One naive approach is to layout all the possible combination of steps and calculate the profit of each path. This is exponential complexity.

So a tree pruning mechanism is usually employed to reduce the exponential explosion. Bound and branch is such a mechanism. The basic idea is to have a cheap way to compute the upper bound and lower bound of the profit of a subtree. If a subtree is know to have its upper bound lower than the lower bound of another subtree, then this subtree can be pruned because its best result is worse than the worst result of that another subtree.



Knapsack solution




Minimax solution (Alpha-Beta search)




Tuesday, June 10, 2008

Exploring Erlang with Map/Reduce

Under the category of "Concurrent Oriented Programming", Erlang has got some good attention recently due to some declared success from Facebook engineers of using Erlang in large scale applications. Tempted to figure out the underlying ingredients of Erlang, I decided to spent some time to learn the language.


Multi-threading Problem

Multiple threads of execution is a common programming model in modern languages because it enable a more efficient use of computing resources provided by multi-core and multi-machine architecture. One of question to be answered though, is how these parallel threads of execution interact and work co-operative to solve the application problem.

There are basically two models for communication between concurrent executions. One is based on a "Shared Memory" model which one thread of execution write the information into a shared place where other threads will read from. Java's thread model is based on such a "shared memory" semantics. The typical problem of this model is that concurrent update requires very sophisticated protection scheme, otherwise uncoordinated access can result in inconsistent data.

Unfortunately, this protection scheme is very hard to analyze once there are multiple threads start to interact in combinatorial explosion number of different ways. Hard to debug deadlock problem are frequently pop up. To reduce the complexity, using a coarse grain locking model is usually recommended but this may reduce the concurrency.

Erlang has picked the other model based on "message passing". In this model, any information that needs to be shared will be "copied" into a message and send to other executions. In this model, each thread of execution has its state "completely local" (not viewable by other thread of executions). Their local state is updated when they learn what is going on in other threads by receiving their messages. This model mirrors how people in real life interact with each other.


Erlang Sequential Processing

Coming from an object oriented imperative programming background, there are a couple of things I need to unlearn/learn in Erlang.

Erlang is a functional programming language and have no OO concepts. Erlang code is structured as "function" at a basic unit, grouped under a "module". Each "function" takes a number of inputs parameters and produce an output value. Like many functional programming language, Erlang encourage the use of "pure function" which is "side-effect-free" and "deterministic". "Side-effect-free" means there is no state changes within the execution of the function. "Deterministic" means the same output will always be produced from the same input.

Erlang has a very different concept in variable assignment in that all variables in Erlang is immutable. In other words, every variable can only be assigned once and from then onwards can never be changed. So I cannot do X = X + 1, and I have to use a new variable and assigned it with the changed value, e.g. Y = X + 1. This "immutability" characteristic simplify debugging a lot because I don't need to worry about how the value of X is changed at different point of execution (it simply won't change).

Another uncommon thing about Erlang is that there is no "while loop" construct in the language. To achieve the looping effect, you need to code the function in a recursive way, basically putting a terminal clause to check for the exit condition, as well as carefully structure the logic in a tail recursion fashion. Otherwise, you may run out of memory in case the stack grow too much. Tail recursion function means the function either returns a value (but not an expression) or a recursive function call. Erlang is smart enough to do tail recursion across multiple functions, such as if funcA calls funcB, which calls funcC, which call funcA. Tail recursion is especially important in writing server daemon which typically make a self recursive call after process a request.


Erlang Parallel Processing

The execution thread in Erlang is called a "Process". Don't be confused with OS-level processes, Erlang process is extremely light-weight, much lighter than Java threads. A process is created by a spawn(Node, Module, Function, Arguments) function call and it terminates when that function is return.

Erlang processes communicate with each other by passing messages. Process ids are used by the sender to specify the recipient addresses. The send call happens asynchronously and returns immediately. The receiving process will make a synchronous receive call and specify a number of matching patterns. Arriving messages that match the pattern will be delivered to the receiving process, otherwise it will stay in the queue forever. Therefore, it is good practices to have a match all pattern to clean up garbage message. The receive call also accepts a timeout parameter so that it will return if no matched messages happen within the timeout period.

Error handling in Erlang is also quite different from other programming languages. Although Erlang provides a try/catch model, it is not the preferred approach. Instead of catching the error and handle it within the local process, the process should simply die and let another process to take care of what should be done after its crash. Erlang have the concept of having processes "linked" to each other and monitor the life status among themselves. In a default setting, a dying process will propagate an exit signal to all the processes it links to (links are bi-directional). So there is a chaining effect that when one process die, the whole chain of processes will die. However, a process can redefine its behavior after receiving the exit signal. Instead of "dying", a process can choose to handle the error (perhaps by restarting the dead process).


Other Erlang Features
Pattern matching is a common programming construct in many places of Erlang, namely "Function calls", "Variable assignment", "Case statements" and "Receive messages". It takes some time to get used to this style. After that I feel this construct to be very powerful.

Another cool feature that Erlang provides is the code hot swap. By specifying the module name when making the function call, a running Erlang process can execute the latest code without restarting itself. This is a powerful features for code evolution because you don't need to shutdown the VM when deploying new code.

Since the function itself can be passed as a message to a remote process, execute code remotely is extremely easy in Erlang. The problem of installation, deployment is pretty much non-existent in Erlang

Map/Reduce using Erlang

After learning the basic concepts, my next step is to search for a problem and get some hands on with the language. Based on a work-partition, aggregation, parallel processing model, Map/Reduce seems to have the characteristic model that aligns very nicely into Erlang's parallel processing model. So I pick my project to implement a simple Map/Reduce framework in Erlang.

Here is the Erlang implementation ...




First of all, I need some Helper functions

-module(mapreduce).
-export([reduce_task/2, map_task/2,
        test_reduce_task/0, test_map_reduce/0,
        repeat_exec/2]).

%%% Execute the function N times,
%%%   and put the result into a list
repeat_exec(N,Func) ->
 lists:map(Func, lists:seq(0, N-1)).
 

%%% Identify the reducer process by
%%%   using the hashcode of the key
find_reducer(Processes, Key) ->
 Index = erlang:phash(Key, length(Processes)),
 lists:nth(Index, Processes).

%%% Identify the mapper process by random
find_mapper(Processes) ->
 case random:uniform(length(Processes)) of
   0 ->
     find_mapper(Processes);
   N ->
     lists:nth(N, Processes)
 end.

%%% Collect result synchronously from
%%%   a reducer process
collect(Reduce_proc) ->
 Reduce_proc ! {collect, self()},
 receive
   {result, Result} ->
     Result
 end.


Main function
The MapReduce() function is the entry point of the system.
  1. It first starts all the R number of Reducer processes
  2. It starts all the M number of Mapper processes, passing them the R reducer processes ids
  3. For each line of input data, it randomly pick one of the M mapper processes and send the line to it
  4. Wait until the completion has finished
  5. Collect result from the R reducer processes
  6. Return the collected result
The corresponding Erlang code is as follows ...
%%% The entry point of the map/reduce framework
map_reduce(M, R, Map_func,
          Reduce_func, Acc0, List) ->

 %% Start all the reducer processes
 Reduce_processes =
   repeat_exec(R,
     fun(_) ->
       spawn(mapreduce, reduce_task,
             [Acc0, Reduce_func])
     end),

 io:format("Reduce processes ~w are started~n",
           [Reduce_processes]),

 %% Start all mapper processes
 Map_processes =
   repeat_exec(M,
     fun(_) ->
       spawn(mapreduce, map_task,
             [Reduce_processes, Map_func])
     end),

 io:format("Map processes ~w are started~n",
           [Map_processes]),

 %% Send the data to the mapper processes
 Extract_func =
   fun(N) ->
     Extracted_line = lists:nth(N+1, List),
     Map_proc = find_mapper(Map_processes),
     io:format("Send ~w to map process ~w~n",
               [Extracted_line, Map_proc]),
     Map_proc ! {map, Extracted_line}
   end,

 repeat_exec(length(List), Extract_func),

 timer:sleep(2000),

 %% Collect the result from all reducer processes
 io:format("Collect all data from reduce processes~n"),
 All_results =
   repeat_exec(length(Reduce_processes),
     fun(N) ->
       collect(lists:nth(N+1, Reduce_processes))
     end),
 lists:flatten(All_results).


Map Process

The Map processes, once started, will perform the following ...
  1. Receive the input line
  2. Execute the User provided Map function to turn into a list of key, value pairs
  3. For each key and value, select a reducer process and send the key, value to it
The corresponding Erlang code will be as follows ...

%%% The mapper process
map_task(Reduce_processes, MapFun) ->
 receive
   {map, Data} ->
     IntermediateResults = MapFun(Data),
     io:format("Map function produce: ~w~n",
               [IntermediateResults ]),
     lists:foreach(
       fun({K, V}) ->
         Reducer_proc =
           find_reducer(Reduce_processes, K),
         Reducer_proc ! {reduce, {K, V}}
       end, IntermediateResults),

     map_task(Reduce_processes, MapFun)
 end.


Reduce Process
On the other hand, the reducer processes will execute as follows ...
  1. Receive the key, value from the Mapper process
  2. Get the current accumulated value by the key. If no accumulated value is found, use the initial accumulated value
  3. Invoke the user provided reduce function to calculate the new accumulated value
  4. Store the new accumulated value under the key

The corresponding Erlang code will be as follows ...

%%% The reducer process
reduce_task(Acc0, ReduceFun) ->
 receive
   {reduce, {K, V}} ->
     Acc = case get(K) of
             undefined ->
               Acc0;
             Current_acc ->
               Current_acc
           end,
     put(K, ReduceFun(V, Acc)),
     reduce_task(Acc0, ReduceFun);
   {collect, PPid} ->
     PPid ! {result, get()},
     reduce_task(Acc0, ReduceFun)
 end.

Word Count Example
To test the Map/Reduce framework using a word count example ...

%%% Testing of Map reduce using word count
test_map_reduce() ->
 M_func = fun(Line) ->
            lists:map(
              fun(Word) ->
                {Word, 1}
              end, Line)
          end,

 R_func = fun(V1, Acc) ->
            Acc + V1
          end,

 map_reduce(3, 5, M_func, R_func, 0,
            [[this, is, a, boy],
             [this, is, a, girl],
             [this, is, lovely, boy]]).

This is the result when execute the test program.

Erlang (BEAM) emulator version 5.6.1 [smp:2] [async-threads:0]

Eshell V5.6.1  (abort with ^G)
1> c (mapreduce).
{ok,mapreduce}
2>
2> mapreduce:test_map_reduce().
Reduce processes [<0.37.0>,<0.38.0>,<0.39.0>,<0.40.0>,<0.41.0>] are started
Map processes [<0.42.0>,<0.43.0>,<0.44.0>] are started
Send [this,is,a,boy] to map process <0.42.0>
Send [this,is,a,girl] to map process <0.43.0>
Map function produce: [{this,1},{is,1},{a,1},{boy,1}]
Send [this,is,lovely,boy] to map process <0.44.0>
Map function produce: [{this,1},{is,1},{a,1},{girl,1}]
Map function produce: [{this,1},{is,1},{lovely,1},{boy,1}]
Collect all data from reduce processes
[{is,3},{this,3},{boy,2},{girl,1},{a,2},{lovely,1}]
3>


The complete Erlang code is attached here ...

-module(mapreduce).
-export([reduce_task/2, map_task/2,
        test_reduce_task/0, test_map_reduce/0,
        repeat_exec/2]).

%%% Execute the function N times,
%%%   and put the result into a list
repeat_exec(N,Func) ->
 lists:map(Func, lists:seq(0, N-1)).
 

%%% Identify the reducer process by
%%%   using the hashcode of the key
find_reducer(Processes, Key) ->
 Index = erlang:phash(Key, length(Processes)),
 lists:nth(Index, Processes).

%%% Identify the mapper process by random
find_mapper(Processes) ->
 case random:uniform(length(Processes)) of
   0 ->
     find_mapper(Processes);
   N ->
     lists:nth(N, Processes)
 end.

%%% Collect result synchronously from
%%%   a reducer process
collect(Reduce_proc) ->
 Reduce_proc ! {collect, self()},
 receive
   {result, Result} ->
     Result
 end.


%%% The reducer process
reduce_task(Acc0, ReduceFun) ->
 receive
   {reduce, {K, V}} ->
     Acc = case get(K) of
             undefined ->
               Acc0;
             Current_acc ->
               Current_acc
           end,
     put(K, ReduceFun(V, Acc)),
     reduce_task(Acc0, ReduceFun);
   {collect, PPid} ->
     PPid ! {result, get()},
     reduce_task(Acc0, ReduceFun)
 end.

%%% The mapper process
map_task(Reduce_processes, MapFun) ->
 receive
   {map, Data} ->
     IntermediateResults = MapFun(Data),
     io:format("Map function produce: ~w~n",
               [IntermediateResults ]),
     lists:foreach(
       fun({K, V}) ->
         Reducer_proc =
           find_reducer(Reduce_processes, K),
         Reducer_proc ! {reduce, {K, V}}
       end, IntermediateResults),

     map_task(Reduce_processes, MapFun)
 end.


%%% The entry point of the map/reduce framework
map_reduce(M, R, Map_func,
          Reduce_func, Acc0, List) ->

 %% Start all the reducer processes
 Reduce_processes =
   repeat_exec(R,
     fun(_) ->
       spawn(mapreduce, reduce_task,
             [Acc0, Reduce_func])
     end),

 io:format("Reduce processes ~w are started~n",
           [Reduce_processes]),

 %% Start all mapper processes
 Map_processes =
   repeat_exec(M,
     fun(_) ->
       spawn(mapreduce, map_task,
             [Reduce_processes, Map_func])
     end),

 io:format("Map processes ~w are started~n",
           [Map_processes]),

 %% Send the data to the mapper processes
 Extract_func =
   fun(N) ->
     Extracted_line = lists:nth(N+1, List),
     Map_proc = find_mapper(Map_processes),
     io:format("Send ~w to map process ~w~n",
               [Extracted_line, Map_proc]),
     Map_proc ! {map, Extracted_line}
   end,

 repeat_exec(length(List), Extract_func),

 timer:sleep(2000),

 %% Collect the result from all reducer processes
 io:format("Collect all data from reduce processes~n"),
 All_results =
   repeat_exec(length(Reduce_processes),
     fun(N) ->
       collect(lists:nth(N+1, Reduce_processes))
     end),
 lists:flatten(All_results).

%%% Testing of Map reduce using word count
test_map_reduce() ->
 M_func = fun(Line) ->
            lists:map(
              fun(Word) ->
                {Word, 1}
              end, Line)
          end,

 R_func = fun(V1, Acc) ->
            Acc + V1
          end,

 map_reduce(3, 5, M_func, R_func, 0,
            [[this, is, a, boy],
             [this, is, a, girl],
             [this, is, lovely, boy]]).
  


Summary

From this exercise of implementing a simple Map/Reduce model using Erlang, I found that Erlang is very powerful in developing distributed systems.

Sunday, May 25, 2008

Parallel data processing language for Map/Reduce

In my previous post, I introduce Map/Reduce model as a powerful model for parallelism. However, although Map/Reduce is simple, powerful and provide a good opportunity to parallelize algorithm, it is based on a rigid procedural structure that require injection of custom user code and therefore it not easy to understand the big picture from a high level. You need to drill into the implementation code of the map and reduce function in order to figure out what is going on.

It will be desirable to have a higher level declarative language that describe the parallel data processing model. This is similar to the idea of SQL query where the user specify the "what" and leave the "how" to the underlying processing engine. In this post, we will explore the possibility of such a declarative language. We will start from the Map/Reduce model and see how it can be generalized into a "Parallel data processing model".

Lets revisit Map/Reduce in a more abstract sense.

The Map/Reduce processing model composes of the following steps ...
  • From many distributed data store, InputReader extract out data tuples A = <a1,a2,...> and feed them randomly into the many Map tasks.
  • For each tuple A, the Map task emit zero to many tuples A'
  • The output A' will be sorted by its key, A' with the same key will reach the same Reduce task
  • The Reduce task aggregate over the group of tuples A' (of the same key) and then turn them into a tuple B = reduce(array<A'>)
  • The OutputWriter store the data tuple B into the distributed data store.
Paralleizing more sophisticated algorithm typically involve multiple phases of Map/Reduce phases, each phase may have a different Map task and Reduce task.


Looking at the abstract Map/Reduce model, there are some similarities with the SQL query model. We can express the above Map/Reduce model using a SQL-like query language.

INSERT INTO A FROM InputReader("dfs:/data/myInput")

INSERT INTO A'
 SELECT flatten(map(*)) FROM A

INSERT INTO B
 SELECT reduce(*) FROM A' GROUP BY A'.key

INSERT INTO  "dfs:/data/myOutput"  FROM B

Similarly, SQL queries can also be expressed by different forms of map() and reduce() functions. Lets look at a couple typical SQL query examples.

Simple Query
SELECT a1, a2 FROM A
 WHERE a3 > 5 AND a4 < 6

Here is the corresponding Map and Reduce function
def map(tuple)
 /* tuple is implemented as a map, key by attribute name */
 if  (tuple["a3"] > 5  &&  tuple["a4"] < 6)
   key = random()
   emit key, "a1" => tuple["a1"], "a2" => tuple["a2"]
 end
end

def reduce(tuples)
 tuples.each do |tuple|
   store tuple
 end
end

Query with Grouping
SELECT sum(a1), avg(a2) FROM A
 GROUP BY a3, a4
   HAVING count() < 10
Here is the coresponding Map and Reduce function
def map(tuple)
 key = [tuple["a3"], tuple["a4"]]
 emit key, "a1" => tuple["a1"], "a2" => tuple["a2"]
end

def reduce(tuples)
 sums = {"a1" => 0, "a2" => 0}
 count = 0

 tuples.each do |tuple|
   count += 1
   sums.each_key do |attr|
     sums[attr] += tuple[attr]
   end
 end

 if count < 10
 /* omit denominator check for simplcity */
   store {"type" => B, "b1" => sums["a1"], "b2" => sums["a2"] / count}
 end
end

Query with Join
SELECT a2, p2
 FROM A JOIN P
         ON A.a1 = P.p1
Here is the corresponding Map and Reduce function
def map(tuple)
 if (tuple["type"] == A)
   key = tuple["a1"]
   emit key, "a2" => tuple["a2"]
 elsif (tuple["type"] == P)
   key = tuple["p1"]
   emit key, "p2" => tuple["p2"]
 end
end

def reduce(tuples)
 all_A_tuples = []
 all_P_tuples = []

 tuples.each do |tuple|
   if (tuple["type"] == A)
     all_A_tuples.add(tuple)
     all_P_tuples.each do |p_tuple|
       joined_tuple = p_tuple.merge(tuple)
       joined_tuple["type"] = B
       store joined_tuple
     end
   elsif (tuple["type"] == P)
     /* do similar things */
   end
 end
end

As you can see, transforming a SQL query to Map/Reduce function is pretty straightforward.

We put the following logic inside the map() function
  • Select columns that appears in the SELECT clause
  • Evaluate the WHERE clause and filter out tuples that doesn't match the condition
  • Compute the key for the JOIN clause or the GROUP clause
  • Emit the tuple

On the other hand, we put the following logic inside the reduce() function
  • Compute the aggregate value of the columns appears in the SELECT clause
  • Evaluate the HAVING clause and filter things out
  • Compute the cartesian product of the JOIN clause
  • Store the final tuple
As we've seen the potential opportunity to use a "SQL-like" declarative language to express the parallel data processing and use a Map/Reduce model to execute it, the open source Hadoop community is working on a project call Pig to develop such a language.

PIG is similar to SQL in the following way.
  • PIG's tuple is same as SQL record, containing multiple fields
  • PIG has define its own set
  • Like SQL optimizer which compiles the query into an execution plan, PIG compiler compiles its query into a Map/Reduce task.

However, there are a number of important difference between PIG (in its current form) and the SQL language.
  • While fields within a SQL record must be atomic (contain one single value), fields within a PIG tuple can be multi-valued, e.g. a collection of another PIG tuples, or a map with key be an atomic data and value be anything
  • Unlike relational model where each DB record must have a unique combination of data fields, PIG tuple doesn't require uniqueness.
  • Unlike SQL query where the input data need to be physically loaded into the DB tables, PIG extract the data from its original data sources directly during execution.
  • PIG is lazily executed. It use a backtracking mechansim from its "store" statement to determine which statement needs to be executed.
  • PIG is procedural and SQL is declarative. In fact, PIG looks a lot like a SQL query execution plan.
  • PIG enable easy plug-in of user defined functions
For more details, please refer to PIG's project site.