Monday, December 15, 2008

Does Cloud Computing make sense for large enterprises

While "cloud computing" provide an obvious cost-attractive operating model for technology startups as well as SME, questions always raised whether the same argument applies to large enterprises who already have an army of high-skilled system administrators running multiple geographically distributed 7 x 24 data centers. Why would they believe that Amazon can run a more effective data center operation than themselves ?

I believe such large enterprise will still be beneficial by leveraging the "cloud provider", but under a very different circumstances from the startups.

"Cloud" as an overflow buffer

For most enterprise applications, workload fluctuates according to seasonal traffic patterns or user behavioral changes. To ensure satisfactory performance over the whole period, enterprise need to provisional equipment for the peak load, even though the excess equipment will be sitting idle most of the time.

By leverage a cloud provider, enterprise no longer need to budget for the peak load. They just need to provision equipment for the average load so these equipments are fully utilized most of the time. When the peak load arrives that exceed existing equipment capacity, the enterprise can programmatically startup new equipments in the cloud and redirect the extra traffic to it.

Dropping the equipment budget from what the peak load requires from what the average load requires can be a big savings for IT budget.

"Cloud" as an new idea playground

In most large enterprises, IT department is struggling in support their business department on their rapidly changing need. To state competitive in the business, product and service group has to come up with new ideas quickly and be able to test it out with their customer quickly as well. However, making any changes on existing IT infrastructure that has production application run on is very risky. A lot of impact analysis, change planning, equipment purchases and approvals has to go through. This significantly delays the test-out of the new idea. In fact, unless the idea has mature enough to get significant management buy-in, it won't even pass the approval stage and simply dies before tested.

By leveraging a cloud provider, business department no longer need to come to their IT department for setting up their testing environment. They can just run their new "experimental" services in the cloud and watch their customer's acceptance in a completely seperated envrionment without worrying any impact to existing production applications.

After the new idea is tested and proven, business department then provision equipment in their internal IT and migrate their services from the cloud back to their inhouse data center. At this stage the approval is much easier as the new idea is proven already.

Hybrid Cloud

Therefore, for large enterprise it will not be a "all inhouse" or "all cloud" setup. In most cases, I believe it will be a hybrid environment where some parts of the application is running in the data center while other parts is running inside the cloud. These different components need to interact with each other in a transparent way without sacrificing security, performance and control. In fact, having the ability to migrate application components seamlessly between the "inhouse data center" and "public cloud" enable the enterprise to put their functionality in places with lowest cost, hence further optimize their cost efficiency.

Unfortunately, cloud providers are not motivated to encourage their customer to run in a hybrid environment (they want all your business, not just part of it). They are also not interested to provide an easy way for their customer to migrate their application from the cloud to their IT. This is a gap where I think new cloud technology startups may fill.

Virtualizing internal data center

How about running some virtualization software (e.g. VMWare, Xen ... etc) in house to make your data center look very similar to the cloud ?

While "virtualization" (which encourages sharing) is useful in a generic sense regardless whether the equipment is residing inhouse or in the cloud, the cost dynamics are quite different such that a different scheduling policy is needed.

For example, because cloud provider will charged whenever a VM instance is running, so it probably won't make sense to start the VM too early. Therefore I expect most of the cloud deployment scenarios for large enterprises are about launching multiple machines, do the heavy duty processing, and then shutdown all machines. In other words, the machines in the cloud will be keep idle for too long.

However, for inhouse equipment, there is no such cost involved. The enterprises would like to have all available resources (e.g. their employee's desktop can be used while they gone home) to be registered to the pool as soon as they are available. So the deployment scenario will become, register resources whenever available, resources sit idle waiting for job allocation, do the allocated job and then go back to the pool to wait for the next one.

Tuesday, November 25, 2008

Hadoop Map/Reduce Implementation

In my previous post, I talk about the methodology of transforming a sequential algorithm into parallel. After that, we can implement the parallel algorithm, one of the popular framework we can use is the Apache Opensource Hadoop Map/Reduce framework.

Functional Programming

Multithreading is one of the popular way of doing parallel programming, but major complexity of multi-thread programming is to co-ordinate the access of each thread to the shared data. We need things like semaphores, locks, and also use them with great care, otherwise dead locks will result.

If we can eliminate the shared state completely, then the complexity of co-ordination will disappear. This is the fundamental concept of functional programming. Data is explicitly passed between functions as parameters or return values which can only be changed by the active function at that moment. Imagine functions are connected to each other via a directed acyclic graph. Since there is no hidden dependency (via shared state), functions in the DAG can run anywhere in parallel as long as one is not an ancestor of the other. In other words, analyze the parallelism is much easier when there is no hidden dependency from shared state.

Map/Reduce functions

Map/reduce is a special form of such a DAG which is applicable in a wide range of use cases. It is organized as a “map” function which transform a piece of data into some number of key/value pairs. Each of these elements will then be sorted by their key and reach to the same node, where a “reduce” function is use to merge the values (of the same key) into a single result.

map(input_record) {
emit(k1, v1)
emit(k2, v2)

reduce (key, values) {
aggregate = initialize()
while (values.has_next) {
    aggregate = merge(
collect(key, aggregate)

The Map/Reduce DAG is organized in this way.

A parallel algorithm is usually structure as multiple rounds of Map/Reduce

Distributed File Systems

The distributed file system is designed to handle large files (multi-GB) with sequential read/write operation. Each file is broken into chunks, and stored across multiple data nodes as local OS files.

There is a master “NameNode” to keep track of overall file directory structure and the placement of chunks. This NameNode is the central control point and may re-distributed replicas as needed.

To read a file, the client API will calculate the chunk index based on the offset of the file pointer and make a request to the NameNode. The NameNode will reply which DataNodes has a copy of that chunk. From this points, the client contacts the DataNode directly without going through the NameNode.

To write a file, client API will first contact the NameNode who will designate one of the replica as the primary (by granting it a lease). The response of the NameNode contains who is the primary and who are the secondary replicas. Then the client push its changes to all DataNodes in any order, but this change is stored in a buffer of each DataNode. After changes are buffered at all DataNodes, the client send a “commit” request to the primary, which determines an order to update and then push this order to all other secondaries. After all secondaries complete the commit, the primary will response to the client about the success.

All changes of chunk distribution and metadata changes will be written to an operation log file at the NameNode. This log file maintain an order list of operation which is important for the NameNode to recover its view after a crash. The NameNode also maintain its persistent state by regularly check-pointing to a file.

In case of the NameNode crash, all lease granting operation will fail and so any write operation is effectively fail also. Read operation should continuously to work as long as the clinet program has a handle to the DataNode. To recover from NameNode crash, a new NameNode can take over after restoring the state from the last checkpoint file and replay the operation log.

When a DataNode crashes, it will be detected by the NameNode after missing its hearbeat for a while. The NameNode removes the crashed DataNode from the cluster and spread its chunks to other surviving DataNodes. This way, the replication factor of each chunk will be maintained across the cluster.

Later when the DataNode recover and rejoin the cluster, it reports all its chunks to the NameNode at boot time. Each chunk has a version number which will advanced at each update. Therefore, the NameNode can easily figure out if any of the chunks of a DataNode becomes stale. Those stale chunks will be garbage collected at a later time.

Job Execution

Hadoop MapRed is based on a “pull” model where multiple “TaskTrackers” poll the “JobTracker” for tasks (either map task or reduce task).

The job execution starts when the client program uploading three files: “job.xml” (the job config including map, combine, reduce function and input/output data path, etc.), “job.split” (specifies how many splits and range based on dividing files into ~16 – 64 MB size), “job.jar” (the actual Mapper and Reducer implementation classes) to the HDFS location (specified by the “mapred.system.dir” property in the “hadoop-default.conf” file). Then the client program notifies the JobTracker about the Job submission. The JobTracker returns a Job id to the client program and starts allocating map tasks to the idle TaskTrackers when they poll for tasks.

Each TaskTracker has a defined number of "task slots" based on the capacity of the machine. There are heartbeat protocol allows the JobTracker to know how many free slots from each TaskTracker. The JobTracker will determine appropriate jobs for the TaskTrackers based on how busy thay are, their network proximity to the data sources (preferring same node, then same rack, then same network switch). The assigned TaskTrackers will fork a MapTask (separate JVM process) to execute the map phase processing. The MapTask extracts the input data from the splits by using the “RecordReader” and “InputFormat” and it invokes the user provided “map” function which emits a number of key/value pair in the memory buffer.

When the buffer is full, the output collector will spill the memory buffer into disk. For optimizing the network bandwidth, an optional “combine” function can be invoked to partially reduce values of each key. Afterwards, the “partition” function is invoked on each key to calculate its reducer node index. The memory buffer is eventually flushed into 2 files, the first index file contains an offset pointer of each partition. The second data file contains all records sorted by partition and then by key.

When the map task has finished executing all input records, it start the commit process, it first flush the in-memory buffer (even it is not full) to the index + data file pair. Then a merge sort for all index + data file pairs will be performed to create a single index + data file pair.

The index + data file pair will then be splitted into are R local directories, one for each partition. After all the MapTask completes (all splits are done), the TaskTracker will notify the JobTracker which keeps track of the overall progress of job. JobTracker also provide a web interface for viewing the job status.

When the JobTracker notices that some map tasks are completed, it will start allocating reduce tasks to subsequent polling TaskTrackers (there are R TaskTrackers will be allocated for reduce task). These allocated TaskTrackers remotely download the region files (according to the assigned reducer index) from the completed map phase nodes and concatenate (merge sort) them into a single file. Whenever more map tasks are completed afterwards, JobTracker will notify these allocated TaskTrackers to download more region files (merge with previous file). In this manner, downloading region files are interleaved with the map task progress. The reduce phase is not started at this moment yet.

Eventually all the map tasks are completed. The JobTracker then notifies all the allocated TaskTrackers to proceed to the reduce phase. Each allocated TaskTracker will fork a ReduceTask (separate JVM) to read the downloaded file (which is already sorted by key) and invoke the “reduce” function, which collects the key/aggregatedValue into the final output file (one per reducer node). Note that each reduce task (and map task as well) is single-threaded. And this thread will invoke the reduce(key, values) function in assending (or descending) order of the keys assigned to this reduce task. This provides an interesting property that all entries written by the reduce() function is sorted in increasing order. The output of each reducer is written to a temp output file in HDFS. When the reducer finishes processing all keys, the temp output file will be renamed atomically to its final output filename.

The Map/Reduce framework is resilient to crashes of any components. TaskTracker nodes periodically report their status to the JobTracker which keeps track of the overall job progress. If the JobTracker hasn’t heard from any TaskTracker nodes for a long time, it assumes the TaskTracker node has been crashed and will reassign its tasks appropriately to other TaskTracker nodes. Since the map phase result is stored in the local disk, which will not be available when the TaskTracker node crashes. In case a map-phase TaskTracker node crashes, the crashed MapTasks (regardless of whether it is complete or not) will be reassigned to a different TaskTracker node, which will rerun all the assigned splits. However, the reduce phase result is stored in HDFS, which is available even the TaskTracker node crashes. Therefore, in case a reduce-phase TaskTracker node crashes, only the incomplete ReduceTasks need to be reassigned to a different TaskTracker node, where the incompleted reduce tasks will be re-run.

The job submission process is asynchronous. Client program can poll for the job status at any time by supplying the job id.

Sunday, November 23, 2008

Amazon Web Services

"Cloud computing" is one of the hot topics these days. Especially with the economy downturn, enterprises are looking every opportunity to reduce their spending. Cloud computing provides a number of attractions ...
  • Since resource capacity can be provisioned just in a few clicks, enterprises no longer need to budget for their peak load. They just need budget for the average load (rather than the peak load) and rent for extra resources from the cloud during peak traffic period. This is huge savings for them.
  • By outsourcing the operation environment, enterprise pass the problem of load balancing, fail recovery to the cloud provider. They no longer need to maintain their inhouse IT expertise which is required for running their web sites. This is also huge savings.
There are primarily two architectural models of how cloud providers deliver their services. The IaaS, and PaaS model.

IaaS – Infrastructure as a Service

The IaaS model take a very bottom-up approach and focus in just providing a “virtual machine”. They give their users all the freedom to pick any technology that suits their organization skills sets, budget and particular deployment scenarios. Since existing technology stack can be used in the cloud without any change, existing applications can be migrated to the cloud almost transparently. This is very attractive to enterprise who wants to enjoy the cost benefits that cloud computing provides without requiring them to make a lot of changes to their existing applications. On the other hand, it is very easy to migrate back from the cloud to internal IT. There is less concern of being lock-in.

However, the freedom also comes with a cost. Since there is a lot of infrastructure decision (such as load balancing, fail recovery) still need to be figured out, the user’s organization need to retain pretty much same amount of IT expertise to make these decisions and operate it.

PaaS – Platform as a Service

PaaS model takes a different approach in that they are not just covering the lowest machine layer but also provides rich technology in the upper technology stacks based the vendor's experience in running large scale websites. Enterprises can keep their focus in developing business logic and not worry about how the infrastructure should be setup.

The downside of PaaS is that enterprises has to write their application in specific language and API which is predefined by the cloud provider. That means existing application have to be first rewritten before they can be run inside PaaS. Also, to a high degree, the written application will be locked in to the cloud provider’s particular technology stack.

Predicting adoption

We predict PaaS model to be more attractive to small companies and startups who are primarily green field development and want to quickly deliver their features without worrying too much about infrastructure setup. On the other hand, larger enterprise who have more legacy system, or companies who just wants to test out cloud computing with minimum effort, or those who concern about vendor’s lock-in, will find IaaS to be much more attractive.

Amazon Web Services

Amazon is the current leader of Cloud provider based on the IaaS model. At the heart of its technology stack, they have the virtual machine layer called EC2. Amazon also provides a set of surrounding services for data storage, metadata storage, message queues, payment processing, content cache … etc.

EC2 – Elastic Computing

Amazon has procured a large number of commoditized Intel boxes running virtualization software Xen. On top of Xen, Linux or Windows can be run as the guest OS . The guest operating system can have many variations with different set of software packages installed.

Each configuration is bundled as a custom machine image (called AMI). Amazon host a catalog of AMI for the users to choose from. Some AMI is free while other requires a usage charge. User can also customize their own setup by starting from a standard AMI, make their special configuration changes and then create a specific AMI that is customized for their specific needs. The AMIs are stored in Amazon’s storage subsystem S3.

Amazon also classifies their machines in terms of their processor power (no of cores, memory and disk size) and charged their usage at a different rate. These machines can be run in different network topology specified by the users. There is an “availability zone” concept which is basically a logical data center. “Availability zone” has no interdependency and is therefore very unlikely to fail at the same time. To achieve high availability, users should consider putting their EC2 instances in different availability zones.

“Security Group” is the virtual firewall of Amazon EC2 environment. EC2 instances can be grouped under “security group” which specifies which port is open to which incoming range of IP addresses. So EC2 instances that running applications at various level of security requirements can be put into appropriated security groups and managed using ACL (access control list). Somewhat very similar to what network administrator configure their firewalls.

User can start the virtual machine (called an EC2 instance) by specifying the AMI, the machine size, the security group, and its authentication key via command line or an HTTP/XML message. So it is very easy to startup the virtual machine and start running the user’s application. When the application completes, the user can also shutdown the EC2 instance via command line or HTTP/XML message. The user is only charged for the actual time when the EC2 instance is running.

One of the issue of extremely dynamic machine configuration (such as EC2) is that a lot of configuration setting is transient and does not survive across reboot. For example, the node name and IP address may have been changed, all the data stored in local files is lost. Latency and network bandwidth between machines may also have changed. Fortunately, Amazon provides a number of ways to mitigate these issues.
  • By paying some charge, user can reserve a stable IP address, called “elastic IP”, which can be attached to EC2 instance after they bootup. External facing machine is typically done this way.
  • To deal with data persistence, Amazon also provides a logical network disk, called “elastic block storage” to store the data. By paying some charges, EBS is reserved for the user and it survives across EC2 reboots. User can attach the EBS to EC2 instances after the reboot.

S3 – Simple Storage Service

Amazon S3 provides a HTTP/XML services to save and retrieve content. It provides a file system-like metaphor where “objects” are group under “buckets”. Based on a REST design, each object and bucket has its own URL.

With HTTP verbs (PUT, GET, DELETE, POST), user can create a bucket, list all the objects within the bucket, create object within a bucket, retrieve an object, remove an object, remove a bucket … etc.

Under S3, each object has a unique URI which serves as its key. There is no query mechanism in S3 and User has to lookup the object by its key. Each object is stored as an opaque byte array with maximum 5GB size. S3 also provides an interesting partial object retrieval mechanism by specifying the ranges of bytes in the URL.

However, partial put is not current support but it can be simulated by breaking the large object into multiple small objects and then do the assembly at the app level. Breaking down the object also help to speed up the upload and download by doing the data transfer in parallel.

Within Amazon S3, each S3 objects are replicated across 2 (or more) data center and also cache at the edge for fast retrieval.

Amazon S3 is based on an “eventual consistent” model which means it is possible that an application won’t see the change it just made. Therefore, some degree of tolerance of inconsistent view is required by the application. Application should avoid the situation of having two concurrent modifications to the same object. And application should wait for some time between updates, and also should expect all the data it reads is potentially stale for few seconds.

There is also no versioning concept in S3, but it is not hard to build one on top of S3.

EBS – Elastic Block Storage

Based on RAID disks, EBS provides a persistent block storage device for data persistence where user can attach it to a running EC2 instance within the same availability zone. EBS is typically used as a file system that is mounted to EC2 instance, or as raw devices for database.

Although EBS is a network devices to the EC2 instance, benchmark from Amazon shows that it has higher performance than local disk access. Unlike S3 which is based on eventual consistent model, EBS provides strict consistency where latest updates are immediately available.

SimpleDB – queriable data storage

Unlike S3 where data has to be looked up by key, SimpleDB provides a semi-structured data store with querying capability. Each object can be stored as a number of attributes where the user can search the object by the attribute name.

Similar to the concepts of “buckets “ and “objects” in S3, SimpleDB is organized as a set of “items” grouped by “domains”. However, each item can have a number of “attributes” (up to 256). Each attribute can store one or multiple values and the value must be a string (or a string array in case of multi-valued attribute). Each attribute can store up to 1K bytes, so it is not appropriate to store binary content.

SimpleDB is typically used as a metadata store in conjuction with S3 where the actual data is being stored. SimpleDB is also schema-less. Each item can define its own set of attributes and is free to add more or remove some attributes at runtime.

SimpleDB provides a query capability which is quite different from SQL. The “where” clause can only match an attribute value with a constant but not with other attributes. On the other hand, the query result only return the name of the matched items but not the attributes, which means subsequent lookup by item name is needed. Also, there is no equivalent of “order by” and the returned query result is unsorted.

Since all attribute are store as strings (even number, dates … etc). All comparison operation is done based on lexical order. Therefore, special encoding is needed for data type such as date, number to string to make sure comparison operation is done correctly.

SimpleDB is also based on an eventual consistency model like S3.

SQS – Simple Queue Service

Amazon provides a queue services for application to communicate in an asynchronous way with each other. Message (up to 256KB size) can be sent to queues. Each queue is replicated across multiple data centers.

Enterprises use HTTP protocol to send messages to a queue. “At least once” semantics is provided, which means, when the sender get back a 200 OK response, SQS guarantees that the message will be received by at least one receiver.

Receiving messages from a queue is done by polling rather than event driven calling interface. Since messages are replicated across queues asynchronously, it is possible that receivers only get some (but not all) messages sent to the queue. But the receiver keep polling the queue, he will eventually get all messages sent to the queue. On the other hand, message can be delivered out of order or delivered more than once. So the message processing logic needs to be idempotent as well as independent of message arrival order.

Once message is taken by a receiver, the message is invisible to other receivers for a period of time but it is not gone yet. The original receiver is supposed to process the message and make an explicit call to remove the message permanently from the queue. If such “removal” request is not made within the timeout period, the message will be visible in the queue again and will be picked up by subsequent receivers.

CloudWatch -- Monitoring Services

CloudWatch provides an API to extract system level metrics for each VM (e.g. CPU, network I/O and disk I/O) as well as for each load balancer services (e.g. response time, request rate). The collected metrics is modeled as a multi-dimensional data cube and therefore can be queried and aggregated (e.g. min/max/avg/sum/count) in different dimensions, such as by time, or by machine groups (by ami, by machine class, by particular machine instance id, by auto-scaling group).

This metrics is also used to drive the auto-scaling services (described below). Note that the metrics are predefined by Amazon and custom metrics (application level metrics) is not supported at this moment.

Load Balancing Services

Load balancer provides a way to group identical VMs into a pool. Amazon provides a way to create a software load balancer in a region and then attach EC2 instances (of the same region) to the it. The EC2 instances under a particular load balancer can be in different availability zone but they have to be in the same region.

Auto-Scaling Services

Auto-scaling allows the user to group a number of EC2 instances (typically behind the same load balancer) and specify a set of triggers to grow and shrink the group. Trigger defines the condition which is matching the collected metrics from the CloudWatch and match that against some threshold values. When match, the associated action can be to grow or shrink the group.

Auto-scaling allows resource capacity (number of EC2 instances) automatically adjusted to the actual workload. This way user can automatically spawn more VMs as the workload increases and shutdown the VM as the load decreases.

Elastic Map/Reduce

Amazon provides an easy way to run Hadoop Map/Reduce in the EC2 environment. They provide a web UI interface to start/stop a Hadoop Cluster and submit jobs to it. For a detail of how Hadoop works, see here.

Under elastic MR, both input and output data are stored into S3 rather than HDFS. This means data need to be loaded to S3 before the Hadoop processing can be started. Elastic also provides a job flow definition so user can concatenate multiple Map/Reduce job together. Elastic MR supports the program to be written in Java (jar) or any programming language (Hadoop streaming) as well as PIG and Hive.

Relational DB Services

RDS is basically running MySQL in the EC2.

Virtual Private Cloud

VPC is a VPN solution such that the user can extend its data center to include EC2 instances running in the Amazon cloud. Notice that this is an "elastic data center" because its size can grow and shrink when the user starts / stops EC2 instances.

User can create a VPC object which represents an isolated virtual network in the Amazon cloud environment and user can create multiple virtual subnets under a VPC. When starting the EC2 instance, the subnet id need to be specified so that the EC2 instance will be put into the subnet under the corresponding VPC.

EC2 instances under the VPC is completely isolated from the rest of Amazon's infrastructure at the network packet routing level (of course it is software-implemented isolation). Then a pair of gateway objects (VPN Gateway on the Amazon side and Customer gateway on the data center side) need to be created. Finally a connection object is created that binds these 2 gateway objects together and then attached to the VPC object.

After these steps, the two gateway will do the appropriate routing between your data center and the Amazon VPC with VPN technologies used underneath to protect the network traffic.

Things to watch out

While Amazon AWS provides a very transparent model for enterprise to migrate their existing IT infrastructure, there are a number of limitations that needs to pay attention to …
  • Multicast communication is not supported between EC2 instances. This means application has to communicate using TCP point-to-point protocol. Some cluster replication framework based on IP multicast simply doesn’t work in EC2 environment.
  • EBS currently can be attached to a single EC2 instance. This means some application (e.g. Oracle cluster) which based on having multiple machines accessing a shared disk simply won’t work in EC2 environment.
  • Except EC2, using any of the other API that Amazon provides is lock-in to Amazon’s technology stack. This issue may be somewhat mitigated as there are open source clone (e.g. Eucalyptus) to the Amazon AWS services

Sunday, November 16, 2008

Design for parallelism

There has been a lot of interests around parallel computing recently. One of the main reasons is that we all know the Moore's law (which promise to double the CPU power on a single chip every 18 months) has reached its limit. We cannot no expect the speed of a single CPU to go much further. Instead of attempting to advance the clock rate of a CPU, many of the chip manufacturer has shifted their development focus to multi-core machines.

On the other hand, highly scalable system based on large pool of inexpensive commodity hardware has demonstrated significant success. Google has published the Map/Reduce model which is their underlying computing infrastructure and there are open source clone like Apache Hadoop. All these provides a very rich framework for implementing massively parallel system.

However, most software algorithms that we are using today are sequential in nature. We need to refactor them in order to fit into the parallel computing architecture

How do we do that ?

There are two different approaches to restructure a sequential algorithm into parallel, “functional decomposition” is typically used to deal with complex logic flow; and “map reduce” is used to deal with algorithm with large volume of input data with simple logic flow.

Functional Decomposition

This model attempts to break down the sequential algorithm into multiple “work units” from a functionality perspective and see if different work units can be executed in parallel. The whole analysis and design will typically go through the following steps.


The purpose of this step is to identify the function boundary of each work unit, which is the basic unit of execution that occurs in a specific machine sequentially
  • Analyze the processing steps from a functionality boundary perspective. Break down the whole processing into a sequence of work units where each work unit represents a focused function.
  • At this stage, we typically breakdown to the finest level of granularity so that we have more flexibility in the design stage to maximize the degree of parallelism.
Dependency analysis

After we break down the whole process into the finest grain of work units, we analyze the sequential dependency between different work units.

Lets say workUnitB is following workUnitA in the sequential version of algorithm, and R(B) and W(B) represents the read set and write set of work unit B. Then workUnitB is directly dependent on workUnitA if any of the following conditions is true
  • W(B) and W(A) overlaps
  • R(B) and W(A) overlaps
  • W(B) and R(A) overlaps
If we represent each work unit as a node and each “directly dependent” relationship as an arc, we will end up having a DAG (directed acyclic graph). The DAG gives us a good picture about what is the maximum parallelism that we can obtain. The critical path of the DAG provides the lower bound of the total execution time.

Analyzing communication overhead
However, as data need to be fed from an upstream work unit to its downstream work units, communication is not free as it consumes bandwidth and latency. In fact, parallelism introduces communication and coordination overhead. This purpose of this step is to understand the associated communication cost when data flow between work units.

Depends on the chosen framework technology, the communication mechanism can be one of the following …
  • TCP Point to point: Persistent TCP connections are maintained between different machines and will be used to pass data between its residing work units.
  • Multicast pub/sub: Downstream work units subscribe their interests to upstream work units and use a multicast mechanism to deliver data. The implementation of multicast can be based on IP multicast or epidemic message spreading over an overlay network.
  • Queue: Upstream work unit put their result into a queue, which is polled by its downstream work units. FIFO semantics is provided.
  • DFS: Upstream work unit put their results into a distributed file system, which is consumed by downstream work units. Unlike a queue, the communicating work units need to synchronize their access to the DFS themselves.

Aggregating work units

The purpose of this step is to regroup the work unit into coarser granularity to reduce communication overhead.

For example, if workUnitA is feeding large amount of data into workUnitB, then both work units should be put into the same machine to reduce the network bandwidth consumption. When there are multiple work units residing in the same machine, then they can be further aggregated into a larger unit. This aggregation can reduce the number of nodes in the dependency graph and hence make the scheduling more straightforward.

Another DAG is produced at the end of this step where each node represents the work aggregate.

Schedule execution

The work aggregates eventually need to be executed in some machines in the network. It is the responsibility of the scheduler to ship the job to available processors, and synchronize their execution.

A node (in the DAG) is ready for execution when all the preceding nodes are completed. There is also a pool of idle processors. A simple-mind scheduler will schedule a ready-to-execute node to a randomly picked processor from the idle pool. After the processor finishes executing a node, it will report back to the scheduler which will update the DAG and the idle processor pool. The cycle repeats.

A more sophisticated scheduler will consider more factors such as the network bandwidth between processors, estimated execution time of each node … etc. in order to provide an optimal scheduling where network bandwidth consumption is minimized.

Map Reduce

For data intensive application, large amount of data need to be processed within a single work unit although the DAG itself is simple. In this model, just running different work unit in parallel is not sufficient, the execution within a work unit also need to be parallelized and run across multiple machines.

The design methodology is different here. Instead of focusing in the flow between work units, we need to focus the input data pattern of a single work unit. Map/Reduce model is a common choice to handle this scenario. The analysis and design will typically go through the following steps.
  1. Identify the repetition of input data, determine the basic unit of input record. ie: input
  2. Identify the selection criteria of each input record. ie: select() function
  3. For each input record, determine how many entries to be emitted and how the emit entries should be grouped and process together. ie: handle_map(), key(), value() function
  4. Determine the aggregation logic of grouped entries. ie: handle_reduce() function
  5. Identify the selection criteria of each aggregated result. ie: having() function
If we use the Map/Reduce framework such as Hadoop, we can structure the map() and reduce() function as follows:


By following a systematic methodology to transform a sequential application into parallel one, we can take advantage of the parallelism to make the application more scalable.

Thursday, October 30, 2008

CouchDB Cluster

Lets look at how one can layer a cluster on top of CouchDB.

Couch Cluster

A “Couch Cluster” is composed of multiple “partitions”. Each partition is composed of multiple replicated DB instances. We call each replica a “virtual node”, which is basically a DB instance hosted inside a "physical node", which is a CouchDB process running in a machine. “Virtual node” can migrate across machines (which we also call “physical node”) for various reasons, such as …
  • when physical node crashes
  • when more physical nodes are provisioned
  • when the workload of physical nodes are unbalanced
  • when there is a need to reduce latency by migrating closed to the client


The "Couch Cluster" is frontend by a "Proxy", which intercept all the client's call and forward it to the corresponding "virtual node". In doing so, the proxy has a "configuration DB" which store the topology and knows how the virtual nodes are distributed across physical nodes. The configuration DB will be updated at more DBs are created or destroyed. Changes of the configuration DB will be replicated among the proxies so each of them will eventually share the same picture of the cluster topology.

In this diagram, it shows a single DB, which is split into 2 partitions (the blue and orange partitions). Each partition has 3 replicas, where one of them is the primary and the other two are secondaries.

Create DB
  1. Client call Proxy with URL=http://proxy/dbname; HTTP_Command = PUT /dbname
  2. Proxy need to determine number of partitions and number of replications is needed, lets say we have 2 partitions and each partition has 3 copies. So there will be 6 virtual nodes. v1-1, v1-2, v1-3, v2-1, v2-2, v2-3.
  3. Proxy also need to determine which virtual node is the primary of its partition. Lets say v1-1, v2-1 are primary and the rest are secondaries.
  4. And then Proxy need to determine which physical node is hosting these virtual nodes. say M1 (v1-1, v2-2), M2 (v1-2, v2-3), M3 (v1-3, v2-1).
  5. Proxy record its decision to the configuration DB
  6. Proxy call M1 with URL=http://M1/dbname_p1; HTTP_Command = PUT /dbname_p1. And then call M1 again with URL=http://M1/dbname_p2; HTTP_Command = PUT /dbname_p2.
  7. Proxy repeat step 6 to M2, M3

List all DBs
  1. Client call Proxy with URL=http://proxy/_all_dbs; HTTP_Command = GET /_all_dbs
  2. Proxy lookup the configuration DB to determine all the DBs
  3. Proxy return to client

Get DB info
  1. Client call Proxy with URL=http://proxy/dbname; HTTP_Command = GET /dbname
  2. Proxy will lookup the configuration DB for all its partitions. For each partition, it locates the virtual node that host the primary copy (v1-1, v2-1). It also identifies the physical node that host these virtual nodes (M1, M3).
  3. For each physical node, say M1, the proxy call it with URL=http://M1/dbname_p1; HTTP_Command = GET /dbname
  4. Proxy do the same to M3
  5. Proxy combine the results of M1, and M3 and then forward to the client

Delete DB
  1. Client call Proxy with URL=http://proxy/dbname; HTTP_Command = DELETE /dbname
  2. Proxy lookup which machines is hosting the clustered DB and find M1, M2, M3.
  3. Proxy call M1 with URL=http://M1/dbname_p1; HTTP_Command = DELETE /dbname_p1. Then Proxy call M1 again with URL=http://M1/dbname_p2; HTTP_Command = DELETE /dbname_p2.
  4. Proxy do the same to M2, M3

Get all documents of a DB
  1. Client call Proxy with URL=http://proxy/dbname/_all_docs; HTTP_Command = GET /dbname/_all_docs
  2. Proxy will lookup the configuration DB for all its partitions. For each partition, it randomly locates the virtual node that host a copy (v1-2, v2-2). It also identifies the physical node that host these virtual nodes (M1, M2).
  3. Proxy call M1 with URL=http://M1/dbname_p1/_all_docs; HTTP_Command = GET /dbname_p1/_all_docs.
  4. Proxy do the same to M2
  5. Proxy combine the results of M1, and M3 and then forward to the client

Create / Update a document
  1. Client call Proxy with URL=http://proxy/dbname/docid; HTTP_Command = PUT /dbname/docid
  2. Proxy will invoke "select_partition(docid)" to determine the partition, and then lookup the primary copy of that partition (e.g. v1-1). It also identifies the physical node (e.g. M1) that host this virtual node.
  3. The proxy call M1 with URL=http://M1/dbname_p1/docid; HTTP_Command = PUT /dbname_p1/docid

Get a document
  1. Client call Proxy with URL=http://proxy/dbname/docid; HTTP_Command = GET /dbname/docid
  2. Proxy will invoke "select_partition(docid)" to determine the partition, and then randomly get a copy of that partition (e.g. v1-3). It also identifies the physical node (e.g. M3) that host this virtual node.
  3. The proxy call M3 with URL=http://M3/dbname_p1/docid; HTTP_Command = GET /dbname_p1/docid

Delete a document
  1. Client call Proxy with URL=http://proxy/dbname/docid?rev=1234; HTTP_Command = DELETE /dbname/docid?rev=1234
  2. Proxy will invoke "select_partition(docid)" to determine the partition, and then lookup the primary copy of that partition (e.g. v1-1). It also identifies the physical node (e.g. M1) that host this virtual node.
  3. The proxy call M1 with URL=http://M1/dbname_p1/docid?rev=1234; HTTP_Command = DELETE /dbname_p1/docid?rev=1234

Create a View design doc
  1. Client call Proxy with URL=http://proxy/dbname/_design/viewid; HTTP_Command = PUT /dbname/_design/viewid
  2. Proxy will determine all the virtual nodes of this DB, and identifies all the physical nodes (e.g. M1, M2, M3) that host these virtual nodes.
  3. The proxy call M1 with URL=http://M1/dbname_p1/_design/viewid; HTTP_Command = PUT /dbname_p1/_design/viewid. Then proxy call M1 again with URL=http://M1/dbname_p2/_design/viewid; HTTP_Command = PUT /dbname_p2/_design/viewid.
  4. Proxy do the same to M2, M3

Query a View
  1. Client call Proxy with URL=http://proxy/dbname/_view/viewid/attrid; HTTP_Command = GET /dbname/_view/viewid/attrid
  2. Proxy will determine all the partitions of "dbname", and for each partition, it randomly get a copy of that partition (e.g. v1-3, v2-2). It also identifies the physical node (e.g. M1, M3) that host these virtual nodes.
  3. The proxy call M1 with URL=http://M1/dbname_p1/_view/viewid/attrid; HTTP_Command = GET /dbname_p1/_view/viewid/attrid
  4. The proxy do the same to M3
  5. The proxy combines the result from M1, M3. If the "attrid" is a map only function, the proxy will just concatenate all the results together. But if the "attrid" has a reduce function defined, then the proxy will invoke the view engine's reduce() function with rereduce = true. Then the proxy return the combined result to the client.

Replication within the Cluster
  1. Periodically, Proxy will replicate the changes of ConfigurationDB among themselves. This will ensure all the proxies are having the same picture of the topology.
  2. Periodically, Proxy will pick a DB, pick one of its partition, and replicate the changes from the primary to all the secondaries. This will make sure all the copies of each partition of DB are in sync.

Client data sync

Lets say the client also has a local DB, which is replicated from the cluster. This is important for occasionally connected scenario, where the client may disconnect with the cluster for a time period and work with the local DB for a while. Later on when the client connects back to the cluster again, the data between the local DB and the cluster need to be synchronized.

To replicate changes from the local DB to the cluster ...
  1. Client start a replicator, and send the POST /_replicate with {source : "http://localhost/localdb, target: "http://proxy/dbname"}
  2. The replicator, which has remembered the last seq_num of the source in the previous replication, and extract all the changes of the localDB since then.
  3. The replicator push these changes to the proxy.
  4. The proxy will examine the list of changes. For each changed document, it will call "select_partition(docid)" to determine the partition, and then lookup the primary copy of that partition and then the physical node that host this virtual node.
  5. The proxy will push this changed document to the physical node. In other words, the primary copy of the cluster will first receive the changes from the localDB. These changes will be replicated to the secondary copies at a latter time.
  6. When complete, the replicator will update the seq_num for the next replication.
To replicate the changes from the cluster to the localDB
  1. Client starts the replicator, which has remembered the last "seq_num" array of the cluster. The seq_num array contains all the seq_num of each virtual node of the cluster. This seq_num array is a opaque data structure which the replicator doesn't care.
  2. The replicator send a request to the proxy to extract the latest changs, along with the seq_num array
  3. The proxy first lookup who is the primary of each partition, and then it extract changes from them using the appropriate seq_num from the seq_num array.
  4. The proxy consolidate all changes from each primary copy of each partition, and send them back to the replicator, along with the updated array of seq_num.
  5. The replicator apply these changes to the localDB, and then update the seq_num array for the next replication.

Monday, October 27, 2008

Consistent Multi-Master DB Replication

As explain in my CouchDB implementation notes, the current replication mechanism doesn't provide consistency guarantees. This means if the client connects to different replicas at different time, she may see weird results, including ...
  • Client read a document X and later read the same document X again, but the 2nd read return an earlier revision of X than the 1st read.
  • Client update a document X and after some time, read the document X again, but he doesn’t see his previous update.
  • Client read a document X and based on its value, update document Y. Another client may see the update on document Y but doesn't see document X which document Y's update is based on.
  • Even if a client 1st update document X and then later on update document X the 2nd time, CouchDB may wrongly-perceive there is a conflict between the two updates (if they land on different replicas) and resort to a user-provided resolution strategy to resolve the conflict.
To prevent above situations from happening, here describe a possible extension of CouchDB to provides a "causal consistency guarantee" based on Vector Clock Gossiping technique. The target environment is a cluster of machines.

Here is a few definitions ...

Causal Consistency
  • It is not possible to see the effects before seeing its causes. In other words, when different replicas propagate their updates, it always apply the updates of the causes before applying updates of the "effect".
  • "Effects" and "Causes" are related by a "happens-before" relationship. ie: causes happens-before effect.

Logical Clock
  • A monotonically increasing sequence number that is atomically increase by one whenever an "event" occur.
  • Update a state locally
  • Sending a message
  • Receiving a message

Vector Clock
  • An array of logical clocks where each entry represents the logical clock of a different process
  • VC1 >= VC2 if for every i, VC1[i] >= VC2[i]
  • VC3 = merge(VC1, VC2) where for every i, VC3[i] = max(VC1[i], VC2[i])


The basic idea is ...
  • When the client issue a GET, the replica should only reply when it is sure that it has got a value later than what the client has seen before. Otherwise, it delays its response until that happens.
  • When the client issue an PUT/POST/DELETE, the replica immediately acknowledge the client but instead of applying the update immediately, it will put this request into a queue. After all other updates that this update depends on has been applied to the DB state, this update will be applied.
  • Replicas in the background will exchange their update logs so that all the updates will be propagated to all copies.

Each replica maintains ...
  • A "replica-VC" is associated with the whole replica, which is updated when an update request is received from a proxy, or when a gossip message is sent or received.
  • A "state-VC" is associated with the state, which is updated when a pending update from the queue is applied to the local DB
  • A set of other replica's VC, this is the vector clock obtained from other replicas during the last gossip message received from them

The client talks to the same proxy, which maintains the Client's Vector clock. This vector clock is important to filter out inconsistent data when the proxy talking to the replicas which the proxy can choose randomly.

Read (GET) Processing
  1. When the client issue a READ, the proxy can choose any replica to forward its GET (along with the Client's vector clock).
  2. The chosen replica will return the GET result only when it make sure its DB has got the state which is "more updated" than what the client has seen. (ie: stateVC >= clientVC). Otherwise, it will delay its response until this condition happen.
  3. The proxy may timeout and contact another replica
  4. The response of the replica contains its replicaVC. The proxy will refresh its clientVC = merge(clientVC, replicaVC)

Update (PUT/POST/DELETE) Processing
  1. When the client issue an UPDATE, the proxy can choose any replica to forward its UPDATE (which contains a uniqueId, the Client's vector clock and the operation's data).
  2. For fault tolerant reason, the proxy may pick multiple replica to forward its updates (e.g. it may pick M replicas to forward its request and return "success" to the client when N replicas ACK back).
  3. The chosen replica(s) will first advance its logical clock and the replicaVC.
  4. The replica compute a vector timestamp by copying from the clientVC and modify its entry to its logical clock. (ie: ts = clientVC; ts[myReplicaNo] = logicalClock)
  5. The replica attach this timestamp to the update request and put the UPDATE request into the queue. The update record "u" =
  6. The replica send an ACK message containing its replicaVC to the proxy. The proxy will refresh its clientVC = merge(clientVC, replicaVC)
Applying Pending Updates
  1. A pending update "u" can be applied to the state when all the "states" that it depends on has been applied. (ie: stateVC >= u.clientVC)
  2. Periodically, the updatelog is scanned for the above criteria
  3. When this happens, it applies the update "u" to the DB and then update the stateVC = merge(stateVC, u.ts)
  4. Note that while this mechanism guarantees that updates happens in "casual order", (ie: the "effect" will not be updated before its "causes"). It doesn't guarantees "total order". Because independent updates (or concurrent updates) can happen in arbitrary order, the order it happen in different replicas may be different.
Processing Gossip Messages

It is important that Replica exchange the request log among themselves so eventually everyone will have a complete picture for all the update request regardless of where that happens.

Periodically, each replica picks some other replica to send its update log. The strategy to pick who to communicate can be based on a random selection, or based on topology (only talk to neighbors), or based on degree of outdateness (the one with longest time we haven't talked). Once the target replica is selected, a complete update log together with its current replicaVC will be sent to the target replica.

On the other hand, when a replica receive a gossip message from another replica...
  • It will merge the update log of the message with its own update log. ie: For each update record u in the message's update log, it will add u to its own update log unless its replicaVC >= u.ts (which means it already has received a later update that suceed u)
  • Check to see some of the pending update is ready to be apply to the database. Adjust the stateVC accordingly
  • Delete some entries in the log after they have been applied to the DB and knowing that all other replicas has already got it. In other words, let c be the replicaId that "u" is created, then "u" is removable if for every replica i, otherReplicasVC[i][c] > u.ts[c]
  • Update the replicaVC = merge(replicaVC, message.replicaVC)

Sunday, October 19, 2008

CouchDB Implementation

CouchDB is an Apache OpenSource project. It is Damien Katz's brain child and has a number of very attractive features based on very cool technologies. Such as ...
  • RESTful API
  • Schema-less document store (document in JSON format)
  • Multi-Version-Concurrency-Control model
  • User-defined query structured as map/reduce
  • Incremental Index Update mechanism
  • Multi-Master Replication model
  • Written in Erlang (Erlang is good)
There is a wide range of application scenarios where CouchDB can be a good solution fit, from an occasionally connected laptop-based application, high performance data cluster, and all the way up to virtual data storage in the cloud.

To understand deeper about CouchDB design, I am very fortunate to have a conversation with Damien, who is so kind to share many details with me. Here I want to capture what I have learnt from this conversation.

Underlying Storage Structure
CouchDB is a “document-oriented” database where document is a JSON string (with an optional binary attachment). The underlying structure is composed of a “storage” as well as multiple “view indexes”. The “storage” is used to store the documents and the “view indexes” is used for query processing.

Within a storage file, there are “contiguous” regions
which is used to store documents. There are 2 B+Tree indexes to speed up certain assess to the documents.
  • by_id_index (which use the document id as the key). It is mainly use to lookup the document by its document id, it points to a list of revisions (or a tree of revisions in case of conflicts in the replication scenario) since the last compaction. It also keep a the revision history (which won't be affected by compaction).
  • by_seqnum_index (which use a monotonically increasing number as the key). Seqnum is generated whenever a document is updated. (Note that all updates are happening is a serial fashion so the seqnum reflect a sequence of non-concurrent update). It is mainly use to keep track of last point of replication synchronization, last point of view index update.

Append Only Operation

All updates (creating documents, modifying documents and deleting documents) happens in an append only mechanism. Instead of modifying the existing documents, a new copy is created and append to the current region. After that, the b+tree nodes are also modified to point to the new document location. Modification to the b+tree nodes also done in an append-only fashion, which means a new b+tree node is copy and tail-append to the end of the file. This in turn trigger a modification to the parent node of the b+tree node, which cause a new copy of the parent node … until all the way back to the root b+tree node. And finally modify the file header to point to the new root node.

That means all updates will trigger 1 write to the document (except delete) and logN writes to each B+Tree node page. So it is O(logN) complexity.

Append-only operation provide an interesting MVCC (Multi-Version Concurrency Control) model because the file keep a history of all the versions of previous document state. As long as the client hold on to a previous root node of the B+Tree index, it can get a snapshot view. While update can continuously happen, the client won’t see any of the latest changes. Such consistency snapshot is very useful in online backup as well as online compaction.

Note that while read operation is perform concurrently with other read and write. Write operation is perform in a serial order across documents. In other words, at any time only one document update can be in progress (however, write of attachments within a document can happen in parallel).

GET document

When a client issue a HTTP REST GET call to CouchDB, the DBServer …
  • Look at the file header to find the root node of the by_id B+Tree index
  • Traverse down the B+tree to figure out the document location
  • Read the document and return back to client

PUT document (modification)

When a client issue a HTTP REST POST call to CouchDB, the DBServer …
  • Look at the file header to find the root node of the by_id B+Tree index
  • Traverse down the B+tree to figure out the leaf node as well as the document location
  • Read the document. Compare the revision, throw an error if they don’t match.
  • If they match, figure out the old seqnum of the current revision.
  • Generate a new (monotonic increasing) seqnum as well as a new revision
  • Find the last region to see if this document can fit in. If not, allocate another contiguous region.
  • Write the document (with the new revision) into the new region
  • Modify the by_id b+tree to point to the new document location
  • Modify the by_seqnum b+tree to add the new entry (of the new seqnum) and remove the old entry (of the old seqnum).
Note that the by_seqnum B+Tree index always point to the latest revision, previous revision is automatically forgotten.

PUT / POST document (creation)

When a client issue a HTTP REST PUT call to CouchDB, the DBServer …
  • Generate a new (monotonic increasing) seqnum as well as a new document id and revision
  • Find the last region to see if this document can fit in. If not, allocate another contiguous region.
  • Write the document (with the new revision) into the new region
  • Modify the by_id b+tree to point to the new document location
  • Modify the by_seqnum b+tree to add the new entry (of the new seqnum)

DELETE document (modify)
When a client issue a HTTP REST DELETE call to CouchDB, the DBServer …
  • Look at the file header to find the root node of the by_id B+Tree index
  • Traverse down the B+tree to figure out the leaf node as well as the document location
  • Read the document. Compare the revision, throw an error if they don’t match.
  • If they match, figure out the old seqnum of the current revision.
  • Generate a new (monotonic increasing) seqnum as well as a new revision
  • Modify the by_id b+tree revision history to show this revision path is deleted
  • Modify the by_seqnum b+tree to add the new entry (of the new seqnum) and remove the old entry (of the old seqnum).
Online Compaction

As an append-only operation, the storage file will grow over time. So we need to compact the file regularly.
  • Open a new storage file
  • Walk the by_seqnum b+tree index (which only points to the latest revision), locate the document
  • Copy the document to the new storage file (which automatically update the corresponding b+tree indexes in the new storage file).
Note that because of the characteristic of MVCC, the compaction will get a consistency snapshot and can happen concurrently without being interfered by the continuously update after the start of compaction. However, if the rate of update is too high, the compaction process can never catch up with the update which keep appending to the file. There is a throttling mechanism under development to slow down the client update rate.

View Indexes

CouchDB supports a concept of “view” to the database. A view is effectively the result of user-defined processing to the underlying document repository. The user-defined processing has to be organized as a two-step processing, “map” and “reduce”. (note that the reduce semantics is very different from Google’s Map/Reduce model). Map() is a user defined function which transform each documents into zero, one or multiple intermediate objects, which reduce() is another user defined function to consolidate the intermediate objects into the final result.

The intermediate objects of the map() and the reduce() is stored in the view indexes. As the storage gets updated, the previous results stored in the view indexes is no longer valid and has to be updated. CouchDB use an incremental update mechanism so that the refresh of the view indexes is highly efficient.

Views definitions are grouped into a design document.

Each view is defined by one “map” function and an optional “reduce” function.

map = function(doc) {
 emit(k1, v1)
 emit(k2, v2)

reduce = function(keys, values) {
 return result
The reduce() function needs to be commutative and associative so that the order of reduction can be arbitrary.

Views defined within each design document is materialized in a view file.

Initially, the view file is empty (no index has been built yet). View is built lazily when the first query is made.
  1. CouchDB will walk the by_seqnum B+Tree index of the storage file.
  2. Based on that, CouchDB get the latest revisions of all existing documents
  3. CouchDB remembers the last seqnum and then feed each document to the View Server using “map_doc”.
  4. View Server invoke the map(doc) function, for each emit(key, value) call, an entry is created.
  5. Finally, a set of entries is computed and return back to CouchDB.
  6. CouchDb will add those entries into the B+Tree index, key = emit_key + doc_id. For each of the B+Tree leave node.
  7. CouchDB will send all its containing map entry back to the View Server using “reduce”.
  8. View Server invoke the reduce(keys, values) function.
  9. The reduce result is computed and return back to CouchDB
  10. CouchDb will update the leave B+Tree node to point to the reduce value of its containing map results.
  11. After that, CouchDb move up one level to the parent of the leave B+Tree node. For each of the B+Tree parent node, CouchDB will send the corresponding reduce result of its children nodes to the View Server using “rereduce”.
  12. View Server invoke the reduce(keys, values) function again.
  13. Finally a rereduce result is computed and return back to CouchDB.
  14. CouchDB will update the parent B+Tree node to point to the rereduce value.
CouchDB continues to move up one level and repeat the calculation of rereduce result. Finally the rereduce result of the root node is also updated.

When done, the view index will look something like this …

Incremental View Update

CouchDB updates the view indexes lazily and incrementally. That means, when the documents are updated, CouchDB will not refresh the view index until the next query reaches CouchDB.

Then CouchDB refresh the index in the following way.
  • CouchDB will then walk the by_seqnum B+Tree index of the storage file, starting from the last seqnum.
  • CouchDB extract all the change documents since the last view query and feed them to the view server’s map function, and get back a set of map results.
  • CouchDb update the map result into the B+Tree index, some of the leave B+Tree node will be updated.
  • For those updated leave B+Tree node, CouchDB resend all its containing map entries back to view server to recomputed the reduce value. Then store the reduced value inside the B+Tree node.
  • All the parents of the updated leave B+Tree node, CouchDB need to recompute the rereduce value and store it inside the B+Tree node. Until all the way up to the root node.
Because of the consistent snapshot characteristic, a long-running view query can run concurrently (without interference) with the ongoing update of the DB. However, the query need to wait for the completion of the view index update before seeing the consistent result. There is also an option (under development) to immediately return a stale copy of the view in case the client can tolerate that.

Query processing

When client retrieve the result of a view, there are the following scenarios

Query on Map-only view
In this case, there is no reduce phase of the view indexes update. To perform the query processing, CouchDB simply search the B+Tree to locate the corresponding starting point of the key (note that the key is prefixed by the emit_key) and then return all the map results of that key

Query on Map with reduce
There are 2 cases. If the query is on the final reduce value over the whole view, then CouchDB will just return the rereduce value pointed by the root of B+Tree of the view index.

If the query is on the reduce value of each key (group_by_key = true), then CouchDB try to locate the boundary of each key. Since this range is probably not fitting exactly along the B+Tree node, CouchDB need to figure out the edge of both ends to locate the partially matched leave B+Tree node and resend its map result (with that key) to the View Server. This reduce result will then merge with existing rereduce result to compute the final reduce result of this key.

e.g. If the key span between leave node A to F, then the key falls partially in node A and node F need to be sent to reduce() again. The result will be rereduced with node E’s existing reduce value and node P’s existing rereduce value.

DB Replication

CouchDB supports multiple DB replicas running in difference machines and provide a mechanism to synchronize their data. This is useful in 2 common scenarios
  • Occasionally connected applications (e.g. PDA). In this case, user can work in a disconnected mode for a time period and store his data changes locally. Later on when he connects back to his corporate network, he can synchronize his changes back to his corporate DB.
  • Mission critical app (e.g. clusters). In this case, the DB will be replicate across multiple machines so that reliability can be achieved through redundancy and high performance can be achieved through load balancing
Underneath there is a replicator process which accepts replication commands. The command specifies the source DB and target DB. The replicator will then ask the source DB for all the updated documents after a particular seq_num. In other words, the replicator need to keep track of the last seq_num. Then it send a request to the target DB to pull the current revision history of all these documents and check whether the revision history of the target is older than the source. If so, it will push the change documents to the target, otherwise, it will skip sending the doc.

At the targetDB, conflicts can be detected when the document have been updated in the target DB. The conflict will then be flagged in the revision tree pointed by the by_id index.

Before this conflict is resolved, CouchDB will consider the revision with the longest path to be the winner and will show that in the views. However, CouchDB expects there is a separate process (maybe manually) to fix the conflict.

Now, building multi-master replica model based on bi-directional data synchronization on top of the replicator is pretty straightforward.

For example, we can have a pair-wise "gossip" process that runs periodically (or triggered by certain events). The process will do the following ...
  1. Copy the changes from source = replica A to target = replica B
  2. Reverse the direction, copy the changes from source = replica B to target = replica A
  3. Pick randomly between replicaA or replicaB, call it a winner.
  4. Call a user-provided merge(doc_revA) function to fix the revision tree. Basically running app-specific logic to bring the tree back to a list.
  5. Copy the changes back from the winner to the loser. This will replicate the fixes.

Data Consistency Considerations

CouchDB doesn’t have the transaction concept nor keep track of the inter-dependency between documents. It is important to make sure that the data integrity doesn’t span across more than one documents.

For example, data integrity may become an issue if you application read document-X and based on what it read to update document-Y. It is possible that after you read document-X, some other application may have change document-X into something else that you are not aware of. And you update document-Y based on a stale value. CouchDB cannot detect these kind of conflict because it happens in two different documents.

Additional data consistency issues happen in the data replication setup. Since the data synchronization happens in the background, there will be a latency to see the latest changes if it happens in other replicas. If the client connect to the replica in an undeterministic way, then the following scenario can happen …
  • Client read a document and later read the same document again, but the 2nd read return an earlier revision than the 1st read.
  • Client update a document and later read the document again, but it doesn’t see his own update.

Tuesday, August 12, 2008

Distributed Storage

Here we explore the consistency aspect of a distributed storage model. The motivation of using a distributed storage is for scalability, fault resiliency and cost reasons. The architecture is based on a large number of inexpensive (and unreliable hardware).

At the software level, we need to deal with
  • Partitioning -- How to partition our data among different servers
  • Replication -- How do we maintain copies of data in a consistent way

Distributed storage architecture

Supported Operations

We support a REST-based CRUD operations ...
  • put(key, value)
  • post(key, value) -- Semantics equivalent to "append"
  • delete(key)
  • get(key)

Consistency Models

Three model will be discussed

Full consistency
  • Update request will not be returned until the changes has been applied
  • Read request will always return the latest successfully updated data
Eventual consistency
  • READ request may return an outdated copy, but will never return an inconsistent copy (which doesn't exist in any serializable history)
  • All update will eventually be processed and viewable. Also, given enough silence (no update for some period of time), GET will eventually return the latest value.
  • READ request may return a copy which is equal to or later than the version of the last update of the same user
  • For UPDATE request, same behavior as "eventual consistency"

Algorithms for Processing

Full consistency

There is no need for the operation queue in this case. Lets skip the operation queue and directly update the persistent state.
A version is attached to the data value per key. The version number is advanced when the update is successful.

PUT processing
  • Make parallel write request to R replicas, wait for Q success response within timeout period, return success.
  • Otherwise return failure. The data value is inconsistent and no operation can be proceed for this key until the consistency issue is manually fixed. (lets take a naive approach for now). The probability of failing can be reduced by increasing the value of R and Q.

GET processing
  • Make parallel read request to R replicas, wait for Q response that has the same version number, return its data value, otherwise return failure.

Background replica synchronization
  • Exchange version number periodically with remaining (R-1) replicas, if my version is different from the quorum Q, update myself to make it the same.

Eventual consistency

We need the operation queue. There is a background thread that asynchronously process the operation queue to update the persistent state.

PUT processing
  • Make parallel write request to R replicas, wait for M success response within timeout period, return success. (When receiving a write request, the replica will read the current version number V of the state and attached version number V+1 to the update operation).
  • Otherwise return failure. The data value is inconsistent. Again, the probability of failing can be reduced by increasing the value of R.

GET processing
  • Make parallel read request to R replicas, wait for first response and return its data value, otherwise return failure.

Background replica synchronization
  • We need a more sophisticated conflict resolution algorithm to merge operations which has the same version number. Following is what come to my mind (without analyzing in depth)
  • Starting from the M replicas, operation request is propagated among replicas in the background.
  • When Q replicas got the same operation request, it applies the operation to the persistent state and update its version number.


PUT processing
  • Same as Eventual Consistency model
  • After successful update, store the version number (latest updated) in the user session

GET processing
  • Make parallel read request to R replicas, wait for first response which has the version number higher than the one stored in the user session, then return its data value and update the version in user session.
  • Otherwise, wait a moment and resend the READ request. (The user request timeout value should be set to be higher than the expected latency for background replica data synchronization)

Background replica synchronization
  • Same as Eventual consistency model