Showing posts with label scalability. Show all posts
Showing posts with label scalability. Show all posts

Sunday, August 17, 2014

Lambda Architecture Principles

"Lambda Architecture" (introduced by Nathan Marz) has gained a lot of traction recently.  Fundamentally, it is a set of design patterns of dealing with Batch and Real time data processing workflow that fuel many organization's business operations.  Although I don't realize any novice ideas has been introduced, it is the first time these principles are being outlined in such a clear and unambiguous manner.

In this post, I'd like to summarize the key principles of the Lambda architecture, focus more in the underlying design principles and less in the choice of implementation technologies, which I may have a different favors from Nathan.

One important distinction of Lambda architecture is that it has a clear separation between the batch processing pipeline (ie: Batch Layer) and the real-time processing pipeline (ie: Real-time Layer).  Such separation provides a means to localize and isolate complexity for handling data update.  To handle real-time query, Lambda architecture provide a mechanism (ie: Serving Layer) to merge/combine data from the Batch Layer and Real-time Layer and return the latest information to the user.

Data Source Entry

At the very beginning, data flows in Lambda architecture as follows ...
  • Transaction data starts streaming in from OLTP system during business operations.  Transaction data ingestion can be materialized in the form of records in OLTP systems, or text lines in App log files, or incoming API calls, or an event queue (e.g. Kafka)
  • This transaction data stream is replicated and fed into both the Batch Layer and Realtime Layer
Here is an overall architecture diagram for Lambda.



Batch Layer

For storing the ground truth, "Master dataset" is the most fundamental DB that captures all basic event happens.  It stores data in the most "raw" form (and hence the finest granularity) that can be used to compute any perspective at any given point in time.  As long as we can maintain the correctness of master dataset, every perspective of data view derived from it will be automatically correct.

Given maintaining the correctness of master dataset is crucial, to avoid the complexity of maintenance, master dataset is "immutable".  Specifically data can only be appended while update and delete are disallowed.  By disallowing changes of existing data, it avoids the complexity of handling the conflicting concurrent update completely.

Here is a conceptual schema of how the master dataset can be structured.  The center green table represents the old, traditional-way of storing data in RDBMS.  The surrounding blue tables illustrates the schema of how the master dataset can be structured, with some key highlights
  • Data are partitioned by columns and stored in different tables.  Columns that are closely related can be stored in the same table
  • NULL values are not stored
  • Each data record is associated with a time stamp since then the record is valid




Notice that every piece of data is tagged with a time stamp at which the data is changed (or more precisely, a change record that represents the data modification is created).  The latest state of an object can be retrieved by extracting the version of the object with the largest time stamp.

Although master dataset stores data in the finest granularity and therefore can be used to compute result of any query, it usually take a long time to perform such computation if the processing starts with such raw form.  To speed up the query processing, various data at intermediate form (called Batch View) that aligns closer to the query will be generated in a periodic manner.  These batch views (instead of the original master dataset) will be used to serve the real-time query processing. 

To generate these batch views, the "Batch Layer" use a massively parallel, brute force approach to process the original master dataset.  Notice that since data in master data set is timestamped, the data candidate can be identified simply from those that has the time stamp later than the last round of batch processing.  Although less efficient, Lambda architecture advocates that at each round of batch view generation, the previous batch view should just be simply discarded and the new batch view is computed  from master dataset.  This simple-mind, compute-from-scratch approach has some good properties in stopping error propagation (since error cannot be accumulated), but the processing may not be optimized and may take a longer time to finish.  This can increase the "staleness" of the batch view.

Real time Layer

As discussed above, generating the batch view requires scanning a large volume of master dataset that takes few hours.  The batch view will therefore be stale for at least the processing time duration (ie: between the start and end of the Batch processing).  But the maximum staleness can be up to the time period between the end of this Batch processing and the end of next Batch processing (ie: the batch cycle).  The following diagram illustrate this staleness.


Even the batch view is stale period, business operates as usual and transaction data will be streamed in continuously.  To answer user's query with the latest, up-to-date information.  The business transaction records need to be captured and merged into the real-time view.  This is the responsibility of the Real-time Layer.  To reduce the latency of latest information availability close to zero, the merge mechanism has to be done in an incremental manner such that no batching delaying the processing will be introduced.  This requires the real time view update to be very different from the batch view update, which can tolerate a high latency.  The end goal is that the latest information that is not captured in the Batch view will be made available in the Realtime view.

The logic of doing the incremental merge on Realtime view is application specific.  As a common use case, lets say we want to compute a set of summary statistics (e.g. mean, count, max, min, sum, standard deviation, percentile) of the transaction data since the last batch view update.  To compute the sum, we can simply add the new transaction data to the existing sum and then write the new sum back to the real-time view.  To compute the mean, we can multiply the existing count with existing mean, adding the transaction sum and then divide by the existing count plus one.  To implement this logic, we need to READ data from the Realtime view, perform the merge and WRITE the data back to the Realtime view.  This requires the Realtime serving DB (which host the Realtime view) to support both random READ and WRITE.  Fortunately, since the realtime view only need to store the stale data up to one batch cycle, its scale is limited to some degree.
Once the batch view update is completed, the real-time layer will discard the data from the real time serving DB that has time stamp earlier than the batch processing.  This not only limit the data volume of Realtime serving DB, but also allows any data inconsistency (of the realtime view) to be clean up eventually.  This drastically reduce the requirement of sophisticated multi-user, large scale DB.  Many DB system support multiple user random read/write and can be used for this purpose.

Serving Layer

The serving layer is responsible to host the batch view (in the batch serving database) as well as hosting the real-time view (in the real-time serving database).  Due to very different accessing pattern, the batch serving DB has a quite different characteristic from the real-time serving DB.

As mentioned in above, while required to support efficient random read at large scale data volume, the batch serving DB doesn't need to support random write because data will only be bulk-loaded into the batch serving DB.  On the other hand, the real-time serving DB will be incrementally (and continuously) updated by the real-time layer, and therefore need to support both random read and random write.

To maintain the batch serving DB updated, the serving layer need to periodically check the batch layer progression to determine whether a later round of batch view generation is finished.  If so, bulk load the batch view into the batch serving DB.  After completing the bulk load, the batch serving DB has contained the latest version of batch view and some data in the real-time view is expired and therefore can be deleted.  The serving layer will orchestrate these processes.  This purge action is especially important to keep the size of the real-time serving DB small and hence can limit the complexity for handling real-time, concurrent read/write.

To process a real-time query, the serving layer disseminates the incoming query into 2 different sub-queries and forward them to both the Batch serving DB and Realtime serving DB, apply application-specific logic to combine/merge their corresponding result and form a single response to the query.  Since the data in the real-time view and batch view are different from a timestamp perspective, the combine/merge is typically done by concatenate the results together.  In case of any conflict (same time stamp), the one from Batch view will overwrite the one from Realtime view.

Final Thoughts

By separating different responsibility into different layers, the Lambda architecture can leverage different optimization techniques specifically designed for different constraints.  For example, the Batch Layer focuses in large scale data processing using simple, start-from-scratch approach and not worrying about the processing latency.  On the other hand, the Real-time Layer covers where the Batch Layer left off and focus in low-latency merging of the latest information and no need to worry about large scale.  Finally the Serving Layer is responsible to stitch together the Batch View and Realtime View to provide the final complete picture.

The clear demarcation of responsibility also enable different technology stacks to be utilized at each layer and hence can tailor more closely to the organization's specific business need.  Nevertheless, using a very different mechanism to update the Batch view (ie: start-from-scratch) and Realtime view (ie: incremental merge) requires two different algorithm implementation and code base to handle the same type of data.  This can increase the code maintenance effort and can be considered to be the price to pay for bridging the fundamental gap between the "scalability" and "low latency" need.

Nathan's Lambda architecture also introduce a set of candidate technologies which he has developed and used in his past projects (e.g. Hadoop for storing Master dataset, Hadoop for generating Batch view, ElephantDB for batch serving DB, Cassandra for realtime serving DB, STORM for generating Realtime view).  The beauty of Lambda architecture is that the choice of technologies is completely decoupled so I intentionally do not describe any of their details in this post.  On the other hand, I have my own favorite which is different and that will be covered in my future posts.

Sunday, August 28, 2011

Scale Independently in the Cloud

Deploying a large scale system nowadays is quite different from before when data center is the only choice. A traditional deployment exercise typically involve a intensive performance modeling exercise to accurately predict the resource requirement for the production system. The accuracy is very important because it is expensive and slow to make changes after deploy.

This performance modeling typically involve the following steps.
  1. Build a graph model based on the component interaction.
  2. Express the mathematical relationship between input traffic, the resource consumption at the processing node (CPU and Memory based on the processing algorithm), and the output traffic (which will become the input of downstream processing nodes)
  3. Model external workload as random variable (with a workload distribution function)
  4. Run a simulation exercise to compute the corresponding workload distribution function for the workload of each link and node, such workload unit includes CPU, Memory and Network requirement (latency and bandwidth).
  5. Based on business requirement, pick a peak external load target (say 95%). Vary the external workload from 0 to the max workload and compute the corresponding range of workload at each node and link in the graph.
  6. The max CPU, Memory, I/O of each node defines capacity needed to provision for that node. The max value of each link defines the network bandwidth / latency requirement of that link


Notice that the resource are typically provisioned at the peak load target which means the resources are idle most of the time, impacting the efficiency of the overall system. On the other hand, SaaS based system introduce a more dynamic relationship (anyone can call anyone) between components which makes this tradition way of performance modeling more challenging. The performance modeling exercise need to be conducted whenever new clients or new services are introduced into the system, resulting in a non-trivial on going maintenance cost.

Thanks for the cloud computing phenomenon the underlying dynamics and economics has shifted quite significantly over the last few years and now doing capacity planning is quite different from before.

First of all, making a wrong capacity estimation is less costly when deploying additional resources are talking about minutes rather than month. Instead to attempting to construct the fully picture of the system, the cloud practices is to focus at each individual component to make sure each can "scale independently". The steps are as follows ...
  1. Each component scale independently using horizontal scaling. ie: f(a.x) = a.f(x)
  2. Instead of establish a formal mathematical model, just deploy the system in the cloud, adjust the input workload and measure the utilization at each node and link (e.g. AWS Cloudwatch)
  3. Based on the utility measurement, define the initial deployment capacity based on average load (not peak load).
  4. Use auto-scaling to adjust pool size of independent components according to runtime workload.
  5. Sync workload is typically frontend by Load balancer. Async workload will be frontend by scalable queues. Output can be a callout, stored in queue, or stored in scalable storage


By focusing in "scale independently", each component can plug and play much easier with other component due to less assumption is made on each other as each component can dynamically adjusted its capacity according to run-time need. This results in not only a more scalable, but also more flexible system.

Sunday, December 5, 2010

BI at large scale

As more and more data being collected everywhere from pretty much everything a user do, such as transactions activities, social interactions, information search ... enterprises has been actively looking into ways to turn these vast amount of raw data into useful information.

BI process flow

It include the following stages of processing
  1. ETL: Extract operational data (inside enterprise or external sources) into data warehouse (typically organized in Star/Snowflake schema with Fact and Dimension tables).
  2. Data exploration: Get insight into data using simple visualization tools (e.g. histogram, summary statistics) or sophisticated OLAP tools (slice, dice, rollup, drilldown)
  3. Report generation: Produce executive reports
  4. Data mining: Extract patterns of the underlying data to form models (e.g. bayesian networks, linear regression, neural networks, decision trees, support vector machines, nearest neighbors, association rules, principal component analysis)
  5. Feedback: The model will be used to assist business decision making (predicting the future)
The gap of processing BIG data
Many data mining and machine learning algorithms are available in both commercial packages (e.g. SAS, SPSS) as well as open source libraries (e.g. Weka, R). Nevertheless, most of these ML algorithms implementation are based on fitting al data in memory and not designed to process big data (e.g. Tera byte data volume).

On the other hand, massively parallel processing platform such as Hadoop, Map/Reduce, over the last few years, has been proven in processing Terabyte or even Petabyte range of data. Although many sequential algorithm can be restructured to run in map reduce, including a big portion of machine learning algorithm, there isn't a corresponding parallel implementation of ML available in massively parallel form.

Approach 1: Apache Mahout
One approach is to "re-implement" the ML algorithm in Map/Reduce and this is the path of Apache Mahout project. Mahout seems to have implemented an impressive list of algorithms although I haven't used them for my projects yet.

Approach 2: Ensemble of parallel independent learners
This is an alternative path that doesn't require re-implementation of existing algorithms. It works in the following way.
  1. Draw samples from the Big data into many sample data sets, which can fit into the memory of a single, individual learner.
  2. Assign each sample data set to an individual learner, who use existing algorithms to learn the model. After learning, each individual learner keep their own learned model
  3. When a decision / prediction request is received, each individual learner will come up with its own prediction and then combine their results in some ways. (e.g. for classification task, the learners will vote for the predicted class and the majority wins. for regression, the average of the estimate values will be used to predict the output value)

I also found this approach can smoothly fade out outdated model. As user's behavior may change over time, same happens to the validity of a learned model. With this ensemble approach, I can have multiple learners each learn their model periodically. Everytime when a prediction is needed, I will pick the latest k models and combine the final prediction based on a time-decayed weighted voting model. Outdated model will automatically slide out the k-size window automatically.

One gotchas of sampling approach is the handling of rare events (since you may lost those rare events in sampling). In this case, stratified sampling (instead of simple random sampling) should be used.

Friday, November 5, 2010

Map Reduce and Stream Processing

Hadoop Map/Reduce model is very good in processing large amount of data in parallel. It provides a general partitioning mechanism (based on the key of the data) to distribute aggregation workload across different machines. Basically, map/reduce algorithm design is all about how to select the right key for the record at different stage of processing.

However, "time dimension" has a very different characteristic compared to other dimensional attributes of data, especially when real-time data processing is concerned. It presents a different set of challenges to the batch oriented, Map/Reduce model.
  1. Real-time processing demands a very low latency of response, which means there isn't too much data accumulated at the "time" dimension for processing.
  2. Data collected from multiple sources may not have all arrived at the point of aggregation.
  3. In the standard model of Map/Reduce, the reduce phase cannot start until the map phase is completed. And all the intermediate data is persisted in the disk before download to the reducer. All these added to significant latency of the processing.
Here is a more detail description of this high latency characteristic of Hadoop.

Although Hadoop Map/Reduce is designed for batch-oriented work load, certain application, such as fraud detection, ad display, network monitoring requires real-time response for processing large amount of data, have started to looked at various way of tweaking Hadoop to fit in the more real-time processing environment. Here I try to look at some technique to perform low-latency parallel processing based on the Map/Reduce model.


General stream processing model

In this model, data are produced at various OLTP system, which update the transaction data store and also asynchronously send additional data for analytic processing. The analytic processing will write the output to a decision model, which will feed back information to the OLTP system for real-time decision making.

Notice the "asynchronous nature" of the analytic processing which is decoupled from the OLTP system, this way the OLTP system won't be slow down waiting for the completion of the analytic processing. Nevetheless, we still need to perform the analytic processing ASAP, otherwise the decision model will not be very useful if it doesn't reflect the current picture of the world. What latency is tolerable is application specific.

Micro-batch in Map/Reduce


One approach is to cut the data into small batches based on time window (e.g. every hour) and submit the data collected in each batch to the Map Reduce job. Staging mechanism is needed such that the OLTP application can continue independent of the analytic processing. A job scheduler is used to regulate the producer and consumer so each of them can proceed independently.

Continuous Map/Reduce

Here lets imagine some possible modification of the Map/Reduce execution model to cater for real-time stream processing. I am not trying to worry about the backward compatibility of Hadoop which is the approach that Hadoop online prototype (HOP) is taking.

Long running
The first modification is to make the mapper and reducer long-running. Therefore, we cannot wait for the end of the map phase before starting the reduce phase as the map phase never ends. This implies the mapper push the data to the reducer once it complete its processing and let the reducer to sort the data. A downside of this approach is that it offers no opportunity to run the combine() function on the map side to reduce the bandwidth utilization. It also shift more workload to the reducer which now needs to do the sorting.

Notice there is a tradeoff between latency and optimization. Optimization requires more data to be accumulated at the source (ie: the Mapper) so local consolidation (ie: combine) can be performed. Unfortunately, low latency requires the data to be sent ASAP so not much accumulation can be done.

HOP suggest an adaptive flow control mechanism such that data is pushed out to reducer ASAP until the reducer is overloaded and push back (using some sort of flow control protocol). Then the mapper will buffer the processed message and perform combine() before it send to the reducer. This approach automatically shift back and forth the aggregation workload between the reducer and the mapper.

Time Window: Slice and Range
This is a "time slice" concept and a "time range" concept. "Slice" defines a time window where result is accumulated before the reduce processing is executed. This is also the minimum amount of data that the mapper should accumulate before sending to the reducer.

"Range" defines the time window where results are aggregated. It can be a landmark window where it has a well-defined starting point, or a jumping window (consider a moving landmark scenario). It can also be a sliding window where is a fixed size window from the current time is aggregated.

After receiving a specific time slice from every mapper, the reducer can start the aggregation processing and combine the result with the previous aggregation result. Slice can be dynamically adjusted based on the amount of data sent from the mapper.

Incremental processing
Notice that the reducer need to compute the aggregated slice value after receive all records of the same slice from all mappers. After that it calls the user-defined merge() function to merge the slice value with the range value. In case the range need to be refreshed (e.g. reaching a jumping window boundary), the init() functin will be called to get a refreshed range value. If the range value need to be updated (when certain slice value falls outside a sliding range), the unmerge() function will be invoked.

Here is an example of how we keep tracked of the average hit rate (ie: total hits per hour) within a 24 hour sliding window with update happens per hour (ie: an one-hour slice).
# Call at each hit record
map(k1, hitRecord) {
site = hitRecord.site
# lookup the slice of the particular key
slice = lookupSlice(site)
if (slice.time - now > 60.minutes) {
# Notify reducer whole slice of site is sent
advance(site, slice)
slice = lookupSlice(site)
}
emitIntermediate(site, slice, 1)
}

combine(site, slice, countList) {
hitCount = 0
for count in countList {
hitCount += count
}
# Send the message to the downstream node
emitIntermediate(site, slice, hitCount)
}

# Called when reducer receive full slice from all mappers
reduce(site, slice, countList) {
hitCount = 0
for count in countList {
hitCount += count
}
sv = SliceValue.new
sv.hitCount = hitCount
return sv
}

# Called at each jumping window boundary
init(slice) {
rangeValue = RangeValue.new
rangeValue.hitCount = 0
return rangeValue
}

# Called after each reduce()
merge(rangeValue, slice, sliceValue) {
rangeValue.hitCount += sliceValue.hitCount
}

# Called when a slice fall out the sliding window
unmerge(rangeValue, slice, sliceValue) {
rangeValue.hitCount -= sliceValue.hitCount
}

Friday, October 15, 2010

Scalable System Design Patterns

Looking back after 2.5 years since my previous post on scalable system design techniques, I've observed an emergence of a set of commonly used design patterns. Here is my attempt to capture and share them.

Load Balancer

In this model, there is a dispatcher that determines which worker instance will handle the request based on different policies. The application should best be "stateless" so any worker instance can handle the request.

This pattern is deployed in almost every medium to large web site setup.



Scatter and Gather

In this model, the dispatcher multicast the request to all workers of the pool. Each worker will compute a local result and send it back to the dispatcher, who will consolidate them into a single response and then send back to the client.

This pattern is used in Search engines like Yahoo, Google to handle user's keyword search request ... etc.



Result Cache

In this model, the dispatcher will first lookup if the request has been made before and try to find the previous result to return, in order to save the actual execution.

This pattern is commonly used in large enterprise application. Memcached is a very commonly deployed cache server.



Shared Space

This model also known as "Blackboard"; all workers monitors information from the shared space and contributes partial knowledge back to the blackboard. The information is continuously enriched until a solution is reached.

This pattern is used in JavaSpace and also commercial product GigaSpace.



Pipe and Filter

This model is also known as "Data Flow Programming"; all workers connected by pipes where data is flow across.

This pattern is a very common EAI pattern.



Map Reduce

The model is targeting batch jobs where disk I/O is the major bottleneck. It use a distributed file system so that disk I/O can be done in parallel.

This pattern is used in many of Google's internal application, as well as implemented in open source Hadoop parallel processing framework. I also find this pattern can be used in many many application design scenarios.



Bulk Synchronous Parellel

This model is based on lock-step execution across all workers, coordinated by a master. Each worker repeat the following steps until the exit condition is reached, when there is no more active workers.
  1. Each worker read data from input queue
  2. Each worker perform local processing based on the read data
  3. Each worker push local result along its direct connection
This pattern has been used in Google's Pregel graph processing model as well as the Apache Hama project.



Execution Orchestrator

This model is based on an intelligent scheduler / orchestrator to schedule ready-to-run tasks (based on a dependency graph) across a clusters of dumb workers.

This pattern is used in Microsoft's Dryad project



Although I tried to cover the whole set of commonly used design pattern for building large scale system, I am sure I have missed some other important ones. Please drop me a comment and feedback.

Also, there is a whole set of scalability patterns around data tier that I haven't covered here. This include some very basic patterns underlying NOSQL. And it worths to take a deep look at some leading implementations.

Friday, October 30, 2009

Notes on Memcached

Some notes about Memcached. Here is its architecture.


How it works ?
Memcached is organized as a farm of N servers. The storage model can be considered as a huge HashTable partitioned among these N servers.

Every API request takes a "key" parameter. There is a 2-step process at the client lib ...
  • Given the key, locate the server
  • Forward the request to that server
The server receiving the request will do a local lookup for that key. The servers within the farm doesn't gossip with each other at all. Each server use asynchronous, non-blocking I/O and one thread can be used to handle large number of incoming TCP sockets. Actually a thread pool is being used but the number of threads is independent of the number of incoming sockets. This architecture is highly scalable for large number of incoming network connections.

API
Memcached provide a HashTable-like interface, so it has ...
  • get(key)
  • set(key, value)
Memcached also provides a richer "multi-get" so that one read request can retrieve values for multiple keys. The client library will issue different requests to multiple servers and doing the lookup in parallel.
  • get_multi(["k1", "k2", "k3"])
Some client lib offers a "master-key" concept such that a key contains 2 parts, the prefix master-key and the suffix key. In this model, the client lib only use the prefix to located the server (rather than looking at the whole key) and then pass the suffix key to that server. So user can group entries to be stored by the same server by using the same prefix key.
  • get_multi(["user1:k1", "user1:k2", "user1:k3"]) -- This request just go to the server hosting all keys of "user1:*"

For updating data, Memcached provides a number of variations.
  • set(key, value, expiration) -- Memcached guarantees the item will never be staying in the cache once the expiration time is reached. (Note that it is possible that the item being kicked out before expiration due to cache full)
  • add(key, value, expiration) -- Success only when no entry of the key exist.
  • replace(key, value, expiration) -- Success only when an entry of the key already exist.
Server Crashes
When one of the server crashes, all entries owned by that server is lost. Higher resilience can be achieved by storing redundant copies of data in different servers. Memcached has no support for data replication. This has to be taken care by the application (or client lib).

Note that the default server hashing algorithm doesn't handle the growth and shrink of the number of servers very well. When the number of servers changes, the ownership equation (key mod N) will all be wrong. In other words, if the crashed server needs to be taken out from the pool, the total number of servers will be decreased by one and all the existing entries needs to be redistributed to different server. Effectively, the whole cache (among all server) is invalidated even when just one server crashes.

So one approach to address this problem is to retain the number of Memcached servers across system crashes. We can have a monitor server to detect the heartbeat of all Memcached server and in case any crashes is detected, start a new server with the same IP address as the dead server. In this case, although the new server will still lost all the entries and has to repopulate the cache, the ownership of the keys are unchanged and data within the surviving node doesn't need to be redistributed.

Another approach is to run logical servers within a farm of physical machines. When a physical machine crashes, its logical servers will be re-start in the surviving physical machines. In other words, the number of logical servers is unchanged even when crashes happens. This logical server approach is also good when the underlying physical machines has different memory capacity. We can start more Memcached process in the machine with more memory and proportionally spread the cache according to memory capacity.

We also can use a more sophisticated technique called "consistent hashing", which localize the ownership changes to just the neighbor of the crashed server. Under this schema, each server is assigned with an id under the same key space. The ownership of a key is determined by the closest server whose key is the first one encountered when walking in the anti-clockwise direction. When a server crashes, its immediate upstream neighbor server (walking along the anti-clockwise direction) will adopt the key ownership of the dead server, while all other servers has the same ownership of key range unchanged.


Atomicity

Each request to Memcached is atomic by itself. But there is no direct support for atomicity across multiple requests. However, App can implement its own locking mechanism by using the "add()" operation provide by Memcached as follows ...
success = add("lock", null, 5.seconds)
if success
  set("key1", value1)
  set("key2", value2)
  cache.delete("lock")
else
  raise UpdateException.new("fail to get lock")
end

Memcached also support a "check-and-set" mechanism that can be used for optimistic concurrency control. The basic idea is to get a version stamp when getting an object and pass that version stamp in the set method. The system will verify the version stamp to make sure the entry hasn't been modified by something else or otherwise, fail the update.
data, stamp = get_stamp(key)
...
check_and_set(key, value1, stamp)

What Memcached doesn't do ?
Memcached's design goal is centered at performance and scalability. By design, it doesn't deal with the following concerns.
  • Authentication for client request
  • Data replication between servers for fault resilience
  • Key > 250 chars
  • Large object > 1MB
  • Storing collection objects
Data Replication DIY
First of all, think carefully about whether you really need to have data replication at the cache level, given that cache data should always be able to recreated from the original source (although at a higher cost).

The main purpose of using a cache is for "performance" reason. If your system cannot tolerate data lost at the cache level, rethink your design !

Although Memcached doesn't provide data replication, it can easily be done by the client lib or at the application level, based on a similar idea described below.

At the client side, we can use multiple keys to represent different copies of the same data. A monotonically increasing version number is also attached with the data. This version number is used to identify the most up-to-date copy and will be incremented for each update.

When doing update, we update all the copies of the same data via different keys.
def reliable_set(key, versioned_value)
  key_array = [key+':1', key+':2', key+':3']
  new_value = versioned_value.value
  new_version = versioned_value.version + 1
  new_versioned_value =
          combine(new_value, new_version)

  for k in key_array
      set(k, new_versioned_value)
  end
end
For reading the data from cache, use "multi-get" for multiple keys (one for each copy) and return the copy which has the latest version. If any discrepancy is detected (ie: some copies have a lacking version, or some copies are missing), start a background thread to write the latest version back to all copies.
def reliable_get(key)
  key_array = [key+':1', key+':2', key+':3']
  value_array = get_multi(key_array)

  latest_version = 0
  latest_value = nil
  need_fix = false

  for v in value_array
      if (v.version > latest_verson)
          if (!need_fix) && (latest_version > 0)
              need_fix = true
          end
          latest_version = v.version
          latest_value = v.value
      end
  end
  versioned_value =
          combine(latest_value, latest_version)

  if need_fix
      Thread.new do
          reliable_set(key, versioned_value)
      end
  end

  return versioned_value
end
When we delete the data, we don't actually remove it. Instead, we mark the data as deleted but keep it in the cache and let it expire.

User Throttling
An interesting use case other than caching is to throttle user that is too active. Basically you want to disallow user request that is too frequent.
user = get(userId)
if user != null
  disallow request and warn user
else
  add(userId, anything, inactive_period)
  handle request
end

Monday, April 28, 2008

Parallelism with Map/Reduce

We explore the Map/Reduce approach to turn sequential algorithm into parallel

Map/Reduce Overview

Since the "reduce" operation need to accumulate results for the whole job, as well as communication overhead in sending and collecting data, Map/Reduce model is more suitable for long running, batch-oriented jobs.

In the Map/Reduce model, "parallelism" is achieved via a "split/sort/merge/join" process and is described as follows.
  • A MapReduce Job starts from a predefined set of Input data (usually sitting in some directory of a distributed file system). A master daemon (which is a central co-ordinator) is started and get the job configuration.
  • According to the job config, the master daemon will start multiple Mapper daemons as well as Reducer daemons in different machines. And then it start the input reader to read data from some DFS directory. The input reader will chunk the read data accordingly and send them to "randomly" chosen Mapper. This is the "split" phase and begins the parallelism.
  • After getting the data chunks, the mapper daemon will run a "user-supplied map function" and produce a collection of (key, value) pairs. Each item within this collection will be sorted according to the key and then send to the corresponding Reducer daemon. This is the "sort" phase.
  • All items with the same key will come to the same Reducer daemon, which collect all the items of that key and invoke a "user-supplied reduce function" and produce a single entry (key, aggregatedValue) as a result. This is the "merge" phase.
  • The output of reducer daemon will be collected by the Output writer, which is effective the "join" phase and ends the parallelism.
Here is an simple word-counting example ...






















Friday, April 11, 2008

REST design pattern

Based on the same architectural pattern of the web, "REST" has a growing dominance of the SOA (Service Oriented Architecture) implementation these days. In this article, we will discuss some basic design principles of REST.

SOAP : The Remote Procedure Call Model

Before the REST become a dominance, most of SOA architecture are built around WS* stack, which is fundamentally a RPC (Remote Procedure Call) model. Under this model, "Service" is structured as some "Procedure" exposed by the system.

For example, WSDL is used to define the procedure call syntax (such as the procedure name, the parameter and their structure). SOAP is used to define how to encode the procedure call into an XML string. And there are other WS* standards define higher level protocols such as how to pass security credentials around, how to do transactional procedure call, how to discover the service location ... etc.

Unfortunately, the WS* stack are getting so complicated that it takes a steep learning curve before it can be used. On the other hand, it is not achieving its original goal of inter-operability (probably deal to different interpretation of what the spec says).

In the last 2 years, WS* technology development has been slowed down and the momentum has been shifted to another model; REST.

REST: The Resource Oriented Model

REST (REpresentation State Transfer) is introduced by Roy Fielding when he captured the basic architectural pattern that make the web so successful. Observing how the web pages are organized and how they are linked to each other, REST is modeled around a large number of "Resources" which "link" among each other. As a significant difference with WS*, REST raises the importance of "Resources" as well as its "Linkage", on the other hand, it push down the importance of "Procedures".

Unlike the WS* model, "Service" in the REST is organized as large number of "Resources". Each resource will have a URI that make it globally identifiable. A resource is represented by some format of "Representation" which is typically extracted by an idempotent HTTP GET. The representation may embed other URI which refers to other resources. This emulates an HTML link between web pages and provide a powerful way for the client to discover other services by traversing its links. It also make building SOA search engine possible.

On the other hand, REST down play the "Procedure" aspect and define a small number of "action" based on existing HTTP Methods. As we discussed above, HTTP GET is used to get a representation of the resource. To modify a resource, REST use HTTP PUT with the new representation embedded inside the HTTP Body. To delete a resource, REST use HTTP DELETE. To get metadata of a resource, REST use HTTP HEAD. Notice that in all these cases, the HTTP Body doesn't carry any information about the "Procedure". This is quite different from WS* SOAP where the request is always made using HTTP POST.

At the first glance, it seems REST is quite limiting in terms of the number of procedures that it can supported. It turns out this is not the case, REST allows any "Procedure" (which has a side effect) to use HTTP POST. Effectively, REST categorize the operations by its nature and associate well-defined semantics with these categories (ie: GET for read-only, PUT for update, DELETE for remove, all above are idempotent) while provide an extension mechanism for application-specific operations (ie: POST for application procedures which may be non-idempotent).


URI Naming Convention

Since resource is usually mapped to some state in the system, analyzing its lifecycle is an important step when designing how a resource is created and how an URI should be structured.

Typically there are some eternal, singleton "Factory Resource" which create other resources. Factory resource typically represents the "type" of resources. Factory resource usually have a static, well-known URI, which is suffixed by a plural form of the resource type. Some examples are ...
http://xyz.com/books
http://xyz.com/users
http://xyz.com/orders

"Resource Instance", which are created by the "Factory Resource" usually represents an instance of that resource type. "Resource instances" typically have a limited life span. Their URI typically contains some unique identifier so that the corresponding instance of the resource can be located. Some examples are ...
http://xyz.com/books/4545
http://xyz.com/users/123
http://xyz.com/orders/2008/04/10/1001

If this object is a singleton object of that type, the id is not needed.
http://www.xyz.com/library

"Dependent Resource" are typically created and owned by an existing resource during part of its life cycle. Therefore "dependent resource" has an implicit life-cycle dependency on its owning parent. When a parent resource is deleted, all the dependent resource it owns will be deleted automatically. Dependent resource use an URI which has prefix of its parent resource URI. Some examples are ...
http://xyz.com/books/4545/tableofcontent
http://xyz.com/users/123/shopping_cart

Creating Resource

HTTP PUT is also used to create the object if the caller has complete control of assigning the object id, the request body contains the representation of the Object after successful creation.
PUT /library/books/668102 HTTP/1.1
Host: www.xyz.com
Content-Type: application/xml
Content-Length: nnn

<book>
<title>Restful design</title>
<author>Ricky</author>
</book>
HTTP/1.1 201 Created

If the caller has no control in the object id, HTTP POST is made to the object's parent container with the request body contains the representation of the Object. The response body should contain a reference to the URL of the created object.
POST /library/books HTTP/1.1
Host: www.xyz.com
Content-Type: application/xml
Content-Length: nnn

<book>
<title>Restful design</title>
<author>Ricky</author>
</book>
HTTP/1.1 301 Moved PermanentlyLocation: /library/books/668102

To create a resource instance of a particular resource type, make an HTTP POST to the Factory Resource URI. If the creation is successful, the response will contain a URI of the resource that has been created.

To create a book ...
POST /books HTTP/1.1
Host: xyz.com
Content-Type: application/xml; charset=utf-8
Content-Length: nnn

<book>
<title>...</title>
<author>Ricky Ho</author>
</book>
HTTP/1.1 201 Created
Content-Type: application/xml; charset=utf-8
Location: /books/4545

<ref>http://xyz.com/books/4545</ref>

To create a dependent resource, make an HTTP POST (or PUT) to its owning resource's URI

To upload the content of a book (using HTTP POST) ...
POST  /books/4545  HTTP/1.1
Host: example.org
Content-Type: application/pdf
Content-Length: nnnn

{pdf data}
HTTP/1.1 201 Created
Content-Type: application/pdf
Location: /books/4545/content

<ref>http://xyz.com/books/4545/tableofcontent</ref>

HTTP POST is typically used to create a resource when its URI is unknown to the client before its creation. However, if the URI is known to the client, then an idempotent HTTP PUT should be used with the URI of the resource to be created. For example, the

To upload the content of a book (using HTTP PUT) ...
PUT  /books/4545/tableofcontent  HTTP/1.1
Host: example.org
Content-Type: application/pdf
Content-Length: nnnn

{pdf data}
HTTP/1.1 200 OK

Finding Resources

Make an HTTP GET to the factory resource URI, criteria pass in as parameters.
(Note that it is up to the factory resource to interpret the query parameter).

To search for books with a certain author ...
GET /books?author=Ricky HTTP/1.1
Host: xyz.com
Content-Type: application/xml; charset=utf-8
HTTP/1.1 200 OK
Content-Type: application/xml; charset=utf-8
Content-Length: nnn

<books>
<book>
<ref>http://xyz.com/books/4545</ref>
<title>...</title>
<author>Ricky</author>
</book>
<book>
<ref>http://xyz.com/books/4546</ref>
<title>...</title>
<author>Ricky</author>
</book>
</books>

Another school of thoughts is to embed the criteria in the URI path, such as ...
http://xyz.com/books/author/Ricky

I personally prefers the query parameters mechanism because it doesn't imply any order of search criteria.


Lookup a particular resource

Make an HTTP GET to the resource object URI

Lookup a particular book...
GET /books/4545 HTTP/1.1
Host: xyz.com
Content-Type: application/xml; charset=utf-8
HTTP/1.1 200 OK
Content-Type: application/xml; charset=utf-8
Content-Length: nnn

<book>
<title>...</title>
<author>Ricky Ho</author>
</book>

In case the resource have multiple representation format. The client should specify within the HTTP header "Accept" of its request what format she is expecting.


Lookup a dependent resource

Make an HTTP GET to the dependent resource object URI

Download the table of content of a particular book...
GET /books/4545/tableofcontent HTTP/1.1
Host: xyz.com
Content-Type: application/pdf
HTTP/1.1 200 OK
Content-Type: application/pdf
Content-Length: nnn

{pdf data}

Modify a resource

Make an HTTP PUT to the resource object URI, pass in the new object representation in the HTTP body

Change the book title ...
PUT /books/4545 HTTP/1.1
Host: xyz.com
Content-Type: application/xml; charset=utf-8
Content-Length: nnn

<book>
<title>Changed title</title>
<author>Ricky Ho</author>
</book>
HTTP/1.1 200 OK

Delete a resource

Make an HTTP DELETE to the resource object URI

Delete a book ...
DELETE /books/4545 HTTP/1.1
Host: xyz.com
HTTP/1.1 200 OK

Resource Reference

In some cases, we do not want to create a new resource, but we want to add a "reference" to an existing resource. e.g. consider a book is added into a shopping cart, which is another resource.

Add a book into the shopping cart ...
POST  /users/123/shopping_cart  HTTP/1.1
Host: xyz.com
Content-Type: application/xml; charset=utf-8
Content-Length: nnn

<?xml version="1.0" ?>
<add>
<ref>http://xyz.com/books/4545</ref>
</add>
HTTP/1.1 200 OK

Show all items of the shopping cart ...
GET  /users/123/shopping_cart  HTTP/1.1
Host: xyz.com
Content-Type: application/xml; charset=utf-8
HTTP/1.1 200 OK
Content-Type: application/xml; charset=utf-8
Content-Length: nnn

<?xml version="1.0" ?>
<shopping_cart>
<ref>http://xyz.com/books/4545</ref>
...
<shopping_cart>
Note that the shopping cart resource contains "resource reference" which acts as links to other resources (which is the books). Such linkages create a resource web so that client can discovery and navigate across different resources.


Remove a book from the shopping cart ...
POST  /users/123/shopping_cart  HTTP/1.1
Host: xyz.com
Content-Type: application/xml; charset=utf-8
Content-Length: nnn

<?xml version="1.0" ?>
<remove>
<ref>http://xyz.com/books/4545</ref>
</remove>
HTTP/1.1 200 OK
Note that we are using HTTP POST rather than HTTP DELETE to remove a resource reference. This is because we are remove a link but not the actual resource itself. In this case, the book still exist after it is taken out from the shopping cart.

Note that what the book is deleted, that all the shopping cart that refers to that book need to be fixed in an application specific way. One way is to do lazy checking. In other words, wait until the shopping cart checking out to check the book existence and fix it at that point.

Checkout the shopping cart ...
POST  /orders  HTTP/1.1
Host: xyz.com
Content-Type: application/xml; charset=utf-8
Content-Length: nnn

<?xml version="1.0" ?>
<ref>http://xyz.com/users/123/shopping_cart</ref>
HTTP/1.1 201 Created
Content-Type: application/xml; charset=utf-8
Location: /orders/2008/04/10/1001

<?xml version="1.0" ?>
<ref>http://xyz.com/orders/2008/04/10/1001</ref>
Note that here the checkout is implemented by creating another resource "Order" which is used to keep track of the fulfillment of the purchase.

Asynchronous Request

In case when the operation takes a long time to complete, an asynchronous mode should be used. In a polling approach, a transient transaction resource is return immediately to the caller. The caller can then use GET request to poll for the result of the operation

We can also use a notification approach. In this case, the caller pass along a callback URI when making the request. The server will invoke the callback URI to POST the result when it is done.

The basic idea is to immediately create a "Transaction Resource" to return back to the client. While the actual processing happens asynchronously in the background, the client at any time, can poll the "Transaction Resource" for the latest processing status.

Lets look at an example to request for printing a book, which may take a long time to complete

Print a book

POST  /books/123  HTTP/1.1
Host: xyz.com
Content-Type: application/xml; charset=utf-8
Content-Length: nnn

?xml version="1.0" ?>
<print>http://xyz.com/printers/abc</print>
HTTP/1.1 200 OK
Content-Type: application/xml; charset=utf-8
Location: /transactions/1234

<?xml version="1.0" ?>
<ref>http://xyz.com/transactions/1234</ref>
Note that a response is created immediately which contains the URI of a transaction resource, even before the print job is started. Client can poll the transaction resource to obtain the latest status of the print job.

Check the status of the print Job ...
GET /transactions/1234 HTTP/1.1
Host: xyz.com
Content-Type: application/xml; charset=utf-8
HTTP/1.1 200 OK
Content-Type: application/xml; charset=utf-8
Content-Length: nnn

<transaction>
<type>PrintJob</type>
<status>In Progress</status>
</transaction>
It is also possible to cancel the transaction if it is not already completed.

Cancel the print job

POST  /transactions/1234  HTTP/1.1
Host: xyz.com
Content-Type: application/xml; charset=utf-8
Content-Length: nnn

?xml version="1.0" ?>
<cancel/>
HTTP/1.1 200 OK


Conclusion
The Resource Oriented Model that REST advocates provides a more natural fit for our service web. Therefore, I suggest that SOA implementation should take the REST model as a default approach.

Thursday, March 6, 2008

Web Site Scalability

A classical large scale web site typically have multiple data centers in geographically distributed locations. Each data center will typically have the following tiers in its architecture
  • Web tier : Serving static contents (static pages, photos, videos)
  • App tier : Serving dynamic contents and execute the application logic (dynamic pages, order processing, transaction processing)
  • Data tier: Storing persistent states (Databases, Filesystems)

















Content Delivery

Dynamic Content
  • Most of the content display is dynamic content. Some application logic will be executed at the web server which generate an HTML for the client browser. The efficiency of application logic will have a huge impact on the overall site's scalability. This is our main topic here.
  • Sometimes it is possible to pre-generate dynamic content and store it as static content. When the real request comes in, instead of re-running the application logic to generate the page, we just need to lookup the pre-generated page, which can be much faster
Static Content
  • Static content are typically the images, videos embedded inside the dynamic pages.
  • A typical HTML pages typically contains many static contents where the browser will make additional HTTP network round trips to fetch. So fetching static content efficiency also has a big impact to the overall response of dynamic page
  • Content Delivery Network is an effective solution for delivering static contents. CDN provider will cache the static content in their network and will return the cached copy for subsequent HTTP fetch request. This reduce the overall hits to your web site as well as improving the user's response time (because their cache is in closer proximity to the user)
Request dispatching and Load balancing

There are 2 layers of dispatching for a Client who is making an HTTP request to reach the application server

DNS Resolution based on user proximity
  • Depends on the location of the client (derived from the IP address), the DNS server can return an ordered list of sites according to the proximity measurement. Therefore client request will be routed to the data center closest to him/her
  • After that, the client browser will cache the server IP
Load balancer
  • Load balancer (hardware-based or software-based) will be sitting in front of a pool of homogeneous servers which provide same application services. The load balancer's job is to decide which member of the pool should handle the request
  • The decision can be based on various strategy, simple one include round robin or random, more sophisticated one involves tracking the workload of each member (e.g. by measuring their response time) and dispatch request to the least busy one
  • Members of the pool can also monitor its own workload and mark itself down (by not responding to the ping request of the load balancer)

Client communication

This is concerned about designing an effective mechanism to communicate with the client, which is typically the browser making some HTTP call (maybe AJAX as well)

Designing the granularity of service call
  • Reduce the number of round trips by using a coarse grain API model so your client is making one call rather than many small calls
  • Don't send back more data than your client need
  • Consider using an incremental processing model. Just send back sufficient result for the first page. Use a cursor model to compute more result for subsequent pages in case the client needs it. But it is good to calculate an estimation of the total matched result to return to the client.
Designing message format
  • If you have control on the client side (e.g. I provide the JavaScript library which is making the request), then you can choose a more compact encoding scheme and not worry about compatibility.
  • If not, you have to use a standard encoding mechanism such as XML. You also need to publish the XML schema of the message (the contract is the message format)
Consider data compression
  • If the message size is big, then we can apply compression technique (e.g. gzip) to the message before sending it.
  • You are trading off CPU for bandwidth savings, better to measure whether this is a gain first
Asynchronous communication
  • AJAX fits very well here. User can proceed to do other things while the server is working on the request
  • Consider not sending the result at all. Rather than sending the final order status to the client who is sending an order placement request, consider sending an email acknowledgment.
Session state handling
Typical web transaction involves multiple steps. Session state need to be maintained across multiple interactions

Memory-based session state with Load balancer affinity
  • One way is to store the state in the App Server's local memory. But we need to make sure subsequent request land on the same App Server instance otherwise it cannot access the previous stored session state
  • Load balancer affinity need to be turned on. Typically request with the same cookie will be routed to the same app server
Memory replication session state across App servers
  • Another way to have the App server sharing a global session state by replicating its changes to each other
  • Double check the latency of replication so we can make sure there is enough time for the replication to complete before subsequent request is made
Persist session state to a DB
  • Store the session state into a DB which can be accessed by any App Server inside the pool
On-demand session state migration
  • Under this model, the cookie will be used to store the IP address of the last app server who process the client request
  • When the next request comes in, the dispatcher is free to forward to any members of the pool. The app server which receive this request will examine the IP address of the last server and pull over the session state from there.
Embed session state inside cookies
  • If the session state is small, you don't need to store at the server side at all. You can just embed all information inside a cookie and send back to the client.
  • You need to digitally sign the cookie so that modification cannot happen

Caching
Remember the previous result can reuse them for future request can drastically reduce the workload of the system. But don't cache request which modifies the backend state

Sunday, March 2, 2008

Database Scalability

Database is typically the last piece of the puzzle of the scalability problem. There are some common techniques to scale the DB tire

Indexing

Make sure appropriate indexes is built for fast access. Analyze the frequently-used queries and examine the query plan when it is executed (e.g. use "explain" for MySQL). Check whether appropriate index exist and being used.

Data De-normalization

Table join is an expensive operation and should be reduced as much as possible. One technique is to de-normalize the data such that certain information is repeated in different tables.

DB Replication


For typical web application where the read/write ratio is high, it will be useful to maintain multiple read-only replicas so that read access workload can be spread across. For example, in a 1 master/N slaves case, all update goes to master DB which send a change log to the replicas. However, there will be a time lag for replication.


Table Partitioning

You can partition vertically or horizontally.

Vertical partitioning is about putting different DB tables into different machines or moving some columns (rarely access attributes) to a different table. Of course, for query performance reason, tables that are joined together inside a query need to reside in the same DB.

Horizontally partitioning is about moving different rows within a table into a separated DB. For example, we can partition the rows according to user id. Locality of reference is very important, we should put the rows (from different tables) of the same user together in the same machine if these information will be access together.



Transaction Processing

Avoid mixing OLAP (query intensive) and OLTP (update intensive) operations within the same DB. In the OLTP system, avoid using long running database transaction and choose the isolation level appropriately. A typical technique is to use optimistic business transaction. Under this scheme, a long running business transaction is executed outside a database transaction. Data containing a version stamp is read outside the database trsnaction. When the user commits the business transaction, a database transaction is started at that time, the lastest version stamp of the corresponding records is re-read from the DB to make sure it is the same as the previous read (which means the data is not modified since the last read). Is so, the changes is pushed to the DB and transaction is commited (with the version stamp advanced). In case the version stamp is mismatched, the DB transaction as well as the business transaction is aborted.

Object / Relational Mapping
Although O/R mapping layer is useful to simplify persistent logic, it is usually not friendly to scalability. Consider the performance overhead carefully when deciding to use O/R mapping.

There are many tuning parameters in O/R mapping. Consider these ...
  • When an object is dereferenced, how deep the object will be retrieved
  • If a collection is dereferenced, does the O/R mapper retrieve all the object contained in the collection ?
  • When an object is expanded, choose carefully between multiple "single-join" queries and single "multiple join" query