Sunday, December 5, 2010

BI at large scale

As more and more data being collected everywhere from pretty much everything a user do, such as transactions activities, social interactions, information search ... enterprises has been actively looking into ways to turn these vast amount of raw data into useful information.

BI process flow

It include the following stages of processing
  1. ETL: Extract operational data (inside enterprise or external sources) into data warehouse (typically organized in Star/Snowflake schema with Fact and Dimension tables).
  2. Data exploration: Get insight into data using simple visualization tools (e.g. histogram, summary statistics) or sophisticated OLAP tools (slice, dice, rollup, drilldown)
  3. Report generation: Produce executive reports
  4. Data mining: Extract patterns of the underlying data to form models (e.g. bayesian networks, linear regression, neural networks, decision trees, support vector machines, nearest neighbors, association rules, principal component analysis)
  5. Feedback: The model will be used to assist business decision making (predicting the future)
The gap of processing BIG data
Many data mining and machine learning algorithms are available in both commercial packages (e.g. SAS, SPSS) as well as open source libraries (e.g. Weka, R). Nevertheless, most of these ML algorithms implementation are based on fitting al data in memory and not designed to process big data (e.g. Tera byte data volume).

On the other hand, massively parallel processing platform such as Hadoop, Map/Reduce, over the last few years, has been proven in processing Terabyte or even Petabyte range of data. Although many sequential algorithm can be restructured to run in map reduce, including a big portion of machine learning algorithm, there isn't a corresponding parallel implementation of ML available in massively parallel form.

Approach 1: Apache Mahout
One approach is to "re-implement" the ML algorithm in Map/Reduce and this is the path of Apache Mahout project. Mahout seems to have implemented an impressive list of algorithms although I haven't used them for my projects yet.

Approach 2: Ensemble of parallel independent learners
This is an alternative path that doesn't require re-implementation of existing algorithms. It works in the following way.
  1. Draw samples from the Big data into many sample data sets, which can fit into the memory of a single, individual learner.
  2. Assign each sample data set to an individual learner, who use existing algorithms to learn the model. After learning, each individual learner keep their own learned model
  3. When a decision / prediction request is received, each individual learner will come up with its own prediction and then combine their results in some ways. (e.g. for classification task, the learners will vote for the predicted class and the majority wins. for regression, the average of the estimate values will be used to predict the output value)

I also found this approach can smoothly fade out outdated model. As user's behavior may change over time, same happens to the validity of a learned model. With this ensemble approach, I can have multiple learners each learn their model periodically. Everytime when a prediction is needed, I will pick the latest k models and combine the final prediction based on a time-decayed weighted voting model. Outdated model will automatically slide out the k-size window automatically.

One gotchas of sampling approach is the handling of rare events (since you may lost those rare events in sampling). In this case, stratified sampling (instead of simple random sampling) should be used.

Friday, November 5, 2010

Map Reduce and Stream Processing

Hadoop Map/Reduce model is very good in processing large amount of data in parallel. It provides a general partitioning mechanism (based on the key of the data) to distribute aggregation workload across different machines. Basically, map/reduce algorithm design is all about how to select the right key for the record at different stage of processing.

However, "time dimension" has a very different characteristic compared to other dimensional attributes of data, especially when real-time data processing is concerned. It presents a different set of challenges to the batch oriented, Map/Reduce model.
  1. Real-time processing demands a very low latency of response, which means there isn't too much data accumulated at the "time" dimension for processing.
  2. Data collected from multiple sources may not have all arrived at the point of aggregation.
  3. In the standard model of Map/Reduce, the reduce phase cannot start until the map phase is completed. And all the intermediate data is persisted in the disk before download to the reducer. All these added to significant latency of the processing.
Here is a more detail description of this high latency characteristic of Hadoop.

Although Hadoop Map/Reduce is designed for batch-oriented work load, certain application, such as fraud detection, ad display, network monitoring requires real-time response for processing large amount of data, have started to looked at various way of tweaking Hadoop to fit in the more real-time processing environment. Here I try to look at some technique to perform low-latency parallel processing based on the Map/Reduce model.


General stream processing model

In this model, data are produced at various OLTP system, which update the transaction data store and also asynchronously send additional data for analytic processing. The analytic processing will write the output to a decision model, which will feed back information to the OLTP system for real-time decision making.

Notice the "asynchronous nature" of the analytic processing which is decoupled from the OLTP system, this way the OLTP system won't be slow down waiting for the completion of the analytic processing. Nevetheless, we still need to perform the analytic processing ASAP, otherwise the decision model will not be very useful if it doesn't reflect the current picture of the world. What latency is tolerable is application specific.

Micro-batch in Map/Reduce


One approach is to cut the data into small batches based on time window (e.g. every hour) and submit the data collected in each batch to the Map Reduce job. Staging mechanism is needed such that the OLTP application can continue independent of the analytic processing. A job scheduler is used to regulate the producer and consumer so each of them can proceed independently.

Continuous Map/Reduce

Here lets imagine some possible modification of the Map/Reduce execution model to cater for real-time stream processing. I am not trying to worry about the backward compatibility of Hadoop which is the approach that Hadoop online prototype (HOP) is taking.

Long running
The first modification is to make the mapper and reducer long-running. Therefore, we cannot wait for the end of the map phase before starting the reduce phase as the map phase never ends. This implies the mapper push the data to the reducer once it complete its processing and let the reducer to sort the data. A downside of this approach is that it offers no opportunity to run the combine() function on the map side to reduce the bandwidth utilization. It also shift more workload to the reducer which now needs to do the sorting.

Notice there is a tradeoff between latency and optimization. Optimization requires more data to be accumulated at the source (ie: the Mapper) so local consolidation (ie: combine) can be performed. Unfortunately, low latency requires the data to be sent ASAP so not much accumulation can be done.

HOP suggest an adaptive flow control mechanism such that data is pushed out to reducer ASAP until the reducer is overloaded and push back (using some sort of flow control protocol). Then the mapper will buffer the processed message and perform combine() before it send to the reducer. This approach automatically shift back and forth the aggregation workload between the reducer and the mapper.

Time Window: Slice and Range
This is a "time slice" concept and a "time range" concept. "Slice" defines a time window where result is accumulated before the reduce processing is executed. This is also the minimum amount of data that the mapper should accumulate before sending to the reducer.

"Range" defines the time window where results are aggregated. It can be a landmark window where it has a well-defined starting point, or a jumping window (consider a moving landmark scenario). It can also be a sliding window where is a fixed size window from the current time is aggregated.

After receiving a specific time slice from every mapper, the reducer can start the aggregation processing and combine the result with the previous aggregation result. Slice can be dynamically adjusted based on the amount of data sent from the mapper.

Incremental processing
Notice that the reducer need to compute the aggregated slice value after receive all records of the same slice from all mappers. After that it calls the user-defined merge() function to merge the slice value with the range value. In case the range need to be refreshed (e.g. reaching a jumping window boundary), the init() functin will be called to get a refreshed range value. If the range value need to be updated (when certain slice value falls outside a sliding range), the unmerge() function will be invoked.

Here is an example of how we keep tracked of the average hit rate (ie: total hits per hour) within a 24 hour sliding window with update happens per hour (ie: an one-hour slice).
# Call at each hit record
map(k1, hitRecord) {
site = hitRecord.site
# lookup the slice of the particular key
slice = lookupSlice(site)
if (slice.time - now > 60.minutes) {
# Notify reducer whole slice of site is sent
advance(site, slice)
slice = lookupSlice(site)
}
emitIntermediate(site, slice, 1)
}

combine(site, slice, countList) {
hitCount = 0
for count in countList {
hitCount += count
}
# Send the message to the downstream node
emitIntermediate(site, slice, hitCount)
}

# Called when reducer receive full slice from all mappers
reduce(site, slice, countList) {
hitCount = 0
for count in countList {
hitCount += count
}
sv = SliceValue.new
sv.hitCount = hitCount
return sv
}

# Called at each jumping window boundary
init(slice) {
rangeValue = RangeValue.new
rangeValue.hitCount = 0
return rangeValue
}

# Called after each reduce()
merge(rangeValue, slice, sliceValue) {
rangeValue.hitCount += sliceValue.hitCount
}

# Called when a slice fall out the sliding window
unmerge(rangeValue, slice, sliceValue) {
rangeValue.hitCount -= sliceValue.hitCount
}

Friday, October 15, 2010

Scalable System Design Patterns

Looking back after 2.5 years since my previous post on scalable system design techniques, I've observed an emergence of a set of commonly used design patterns. Here is my attempt to capture and share them.

Load Balancer

In this model, there is a dispatcher that determines which worker instance will handle the request based on different policies. The application should best be "stateless" so any worker instance can handle the request.

This pattern is deployed in almost every medium to large web site setup.



Scatter and Gather

In this model, the dispatcher multicast the request to all workers of the pool. Each worker will compute a local result and send it back to the dispatcher, who will consolidate them into a single response and then send back to the client.

This pattern is used in Search engines like Yahoo, Google to handle user's keyword search request ... etc.



Result Cache

In this model, the dispatcher will first lookup if the request has been made before and try to find the previous result to return, in order to save the actual execution.

This pattern is commonly used in large enterprise application. Memcached is a very commonly deployed cache server.



Shared Space

This model also known as "Blackboard"; all workers monitors information from the shared space and contributes partial knowledge back to the blackboard. The information is continuously enriched until a solution is reached.

This pattern is used in JavaSpace and also commercial product GigaSpace.



Pipe and Filter

This model is also known as "Data Flow Programming"; all workers connected by pipes where data is flow across.

This pattern is a very common EAI pattern.



Map Reduce

The model is targeting batch jobs where disk I/O is the major bottleneck. It use a distributed file system so that disk I/O can be done in parallel.

This pattern is used in many of Google's internal application, as well as implemented in open source Hadoop parallel processing framework. I also find this pattern can be used in many many application design scenarios.



Bulk Synchronous Parellel

This model is based on lock-step execution across all workers, coordinated by a master. Each worker repeat the following steps until the exit condition is reached, when there is no more active workers.
  1. Each worker read data from input queue
  2. Each worker perform local processing based on the read data
  3. Each worker push local result along its direct connection
This pattern has been used in Google's Pregel graph processing model as well as the Apache Hama project.



Execution Orchestrator

This model is based on an intelligent scheduler / orchestrator to schedule ready-to-run tasks (based on a dependency graph) across a clusters of dumb workers.

This pattern is used in Microsoft's Dryad project



Although I tried to cover the whole set of commonly used design pattern for building large scale system, I am sure I have missed some other important ones. Please drop me a comment and feedback.

Also, there is a whole set of scalability patterns around data tier that I haven't covered here. This include some very basic patterns underlying NOSQL. And it worths to take a deep look at some leading implementations.

Wednesday, October 6, 2010

BigTable Model with Cassandra and HBase

Recently in a number of "scalability discussion meeting", I've seen the following pattern coming up repeatedly ...
  1. To make your app scalable, you try to make your app layer “stateless”.
  2. OK, so you move the "state" out from your application layer out to a shared DB, or shared data layer.
  3. Now, how do we make the data tier scalable, by definition, we cannot make the data tier stateless.
  4. OK, now lets think about how to "partition" your data and spread them across multiple machines in such a way that workload is balanced.
  5. Now there are more boxes and what if some of them crashes.
  6. OK, we should replicate the data across machines.
  7. Now, how do we keep these data in sync ...
  8. And then Cloud computing gets into the picture (as it always does). Now we are not just having a pool of machines but also the pool size can grow and shrink according to workload fluctuation (you don't want to pay for something idle, right ?).
  9. Now we need to figure out as we add more machines into the pool or remove machine from the pool, how we should "redistribute" the data.
This is an area where NOSQL shines. In the last 18 months, NOSQL has become one of the hottest topic in the software industry. It has been introduced as a solution to large scale data storage problem at the range of Terabytes or Petabytes. Dozens of NOSQL products has come to the market, but two leaders HBase and Cassandra seems to stand out from the rest in terms of their adoption.

Given an increasing demand of explaining these 2 products recently, I decide to write a post on this.

Not to repeat the basic theory of NOSQL here, for a foundation of distributed system theory underlying the NOSQL design, please refer to my earlier blog

Both Hbase and Cassandra are based on Google BigTable model, here lets introduce some key characteristic underlying Bigtable first.

Fundamentally Distributed
BigTable is built from the ground up on a "highly distributed", "share nothing" architecture. Data is supposed to store in large number of unreliable, commodity server boxes by "partitioning" and "replication". Data partitioning means the data are partitioned by its key and stored in different servers. Replication means the same data element is replicated multiple times at different servers.


Column Oriented
Unlike traditional RDBMS implementation where each "row" is stored contiguous on disk, BigTable, on the other hand, store each column contiguously on disk. The underlying assumption is that in most cases not all columns are needed for data access, column oriented layout allows more records sitting in a disk block and hence can reduce the disk I/O.

Column oriented layout is also very effective to store very sparse data (many cells have NULL value) as well as multi-value cell. The following diagram illustrate the difference between a Row-oriented layout and a Column-oriented layout


Variable number of Columns
In RDBMS, each row must have a fixed set of columns defined by the table schema, and therefore it is not easy to support columns with multi-value attributes. The BigTable model introduces the "Column Family" concept such that a row has a fixed number of "column family" but within the "column family", a row can have a variable number of columns that can be different in each row.


In the Bigtable model, the basic data storage unit is a cell, (addressed by a particular row and column). Bigtable allow multiple timestamp version of data within a cell. In other words, user can address a data element by the rowid, column name and the timestamp. At the configuration level, Bigtable allows the user to specify how many versions can be stored within each cell either by count (how many) or by freshness (how old).

At the physical level, BigTable store each column family contiguously on disk (imagine one file per column family), and physically sort the order of data by rowid, column name and timestamp. After that, the sorted data will be compressed so that a disk block size can store more data. On the other hand, since data within a column family usually has a similar pattern, data compression can be very effective.

Note: Although not shown in this example, rowid of different column families can be completely different types. For example, in the above example, I can have another column family "UserIdx" whose rowid is a string (user's name) and it has columns whose columnKey is the u1, u2 (ie: the row id of the User Column family) and columnValue is null (ie: not used). This is a common technique to build index at the application level.

Sequential write
BigTable model is highly optimized for write operation (insert/update/delete) with sequential write (no disk seek is needed). Basically, write happens by first appending a transaction entry to a log file (hence the disk write I/O is sequential with no disk seek), followed by writing the data into an in-memory Memtable . In case of the machine crashes and all in-memory state is lost, the recovery step will bring the Memtable up to date by replaying the updates in the log file.

All the latest update therefore will be stored at the Memtable, which will grow until reaching a size threshold, then it will flushed the Memtable to the disk as an SSTable (sorted by the String key). Over a period of time there will be multiple SSTables on the disk that store the data.

Merged read
Whenever a read request is received, the system will first lookup the Memtable by its row key to see if it contains the data. If not, it will look at the on-disk SSTable to see if the row-key is there. We call this the "merged read" as the system need to look at multiple places for the data. To speed up the detection, SSTable has a companion Bloom filter such that it can rapidly detect the absence of the row-key. In other words, only when the bloom filter returns positive will the system be doing a detail lookup within the SSTable.

Periodic Data Compaction
As you can imagine, it can be quite inefficient for the read operation when there are too many SSTables scattering around. Therefore, the system periodically merge the SSTable. Notice that since each of the SSTable is individually sorted by key, a simple "merge sort" is sufficient to merge multiple SSTable into one. The merge mechanism is based on a logarithm property where two SSTable of the same size will be merge into a single SSTable will doubling the size. Therefore the number of SSTable is proportion to O(logN) where N is the number of rows.



After looking at the common part, lets look at their difference of Hbase and Cassandra.

HBase
Based on the BigTable, HBase uses the Hadoop Filesystem (HDFS) as its data storage engine. The advantage of this approach is then HBase doesn't need to worry about data replication, data consistency and resiliency because HDFS has handled it already. Of course, the downside is that it is also constrained by the characteristics of HDFS, which is not optimized for random read access. In addition, there will be an extra network latency between the DB server to the File server (which is the data node of Hadoop).


In the HBase architecture, data is stored in a farm of Region Servers. The "key-to-server" mapping is needed to locate the corresponding server and this mapping is stored as a "Table" similar to other user data table.

Before a client do any DB operation, it needs to first locate the corresponding region server.
  1. The client contacts a predefined Master server who replies the endpoint of a region server that holds a "Root Region" table.
  2. The client contacts the region server who replies the endpoint of a second region server who holds a "Meta Region" table, which contains a mapping from "user table" to "region server".
  3. The client contacts this second region server, passing along the user table name. This second region server will lookup its meta region and reply an endpoint of a third region server who holds a "User Region", which contains a mapping from "key range" to "region server"
  4. The client contacts this third region server, passing along the row key that it wants to lookup. This third region server will lookup its user region and reply the endpoint of a fourth region server who holds the data that the client is looking for.
  5. Client will cache the result along this process so subsequent request doesn't need to go through this multi-step process again to resolve the corresponding endpoint.
In Hbase, the in-memory data storage (what we refer to as "Memtable" in above paragraph) is implemented in Memcache. The on-disk data storage (what we refer to as "SSTable" in above paragraph) is implemented as a HDFS file residing in Hadoop data node server. The Log file is also stored as an HDFS file. (I feel storing a transaction log file remotely will hurt performance)

Also in the HBase architecture, there is a special machine playing the "role of master" who monitors and coordinates the activities of all region servers (the heavy-duty worker node). To the best of my knowledge, the master node is the single point of failure at this moment.

For a more detail architecture description, Lars George has a very good explanation in the log file implementation as well as the data storage architecture of Hbase.

Cassandra
Also based on the BigTable model, Cassandra use the DHT (distributed hash table) model to partition its data, based on the paper described in the Amazon Dynamo model.

Consistent Hashing via O(1) DHT
Each machine (node) is associated with a particular id that is distributed in a keyspace (e.g. 128 bit). All the data element is also associated with a key (in the same key space). The server owns all the data whose key lies between its id and the preceding server's id.

Data is also replicated across multiple servers. Cassandra offers multiple replication schema including storing the replicas in neighbor servers (whose id succeed the server owning the data), or a rack-aware strategy by storing the replicas in a physical location. The simple partition strategy is as follows ...


Tunable Consistency Level
Unlike Hbase, Cassandra allows you to choose the consistency level that is suitable to your application, so you can gain more scalability if willing to tradeoff some data consistency.

For example, it allows you to choose how many ACK to receive from different replicas before considering a WRITE to be successful. Similarly, you can choose how many replica's response to be received in the case of READ before return the result to the client.

By choosing the appropriate number for W and R response, you can choose the level of consistency you like. For example, to achieve Strict Consistency, we just need to pick W, R such that W + R > N. This including the possibility of (W = one and R = all), (R = one and W = all), (W = quorum and R = quorum). Of course, if you don't need strict consistency, you can even choose a smaller value for W and R and gain a bigger availability. Regardless of what consistency level you choose, the data will be eventual consistent by the "hinted handoff", "read repair" and "anti-entropy sync" mechanism described below.

Hinted Handoff
The client performs a write by send the request to any Cassandra node which will act as the proxy to the client. This proxy node will located N corresponding nodes that holds the data replicas and forward the write request to all of them. In case any node is failed, it will pick a random node as a handoff node and write the request with a hint telling it to forward the write request back to the failed node after it recovers. The handoff node will then periodically check for the recovery of the failed node and forward the write to it. Therefore, the original node will eventually receive all the write request.

Conflict Resolution
Since write can reach different replica, the corresponding timestamp of the data is used to resolve conflict, in other words, the latest timestamp wins and push the earlier timestamps into an earlier version (they are not lost)

Read Repair
When the client performs a "read", the proxy node will issue N reads but only wait for R copies of responses and return the one with the latest version. In case some nodes respond with an older version, the proxy node will send the latest version to them asynchronously, hence these left-behind node will still eventually catch up with the latest version.

Anti-Entropy data sync
To ensure the data is still in sync even there is no READ and WRITE occurs to the data, replica nodes periodically gossip with each other to figure out if anyone out of sync. For each key range of data, each member in the replica group compute a Merkel tree (a hash encoding tree where the difference can be located quickly) and send it to other neighbors. By comparing the received Merkel tree with its own tree, each member can quickly determine which data portion is out of sync. If so, it will send the diff to the left-behind members.

Anti-entropy is the "catch-all" way to guarantee eventual consistency, but is also pretty expensive and therefore is not done frequently. By combining the data sync with read repair and hinted handoff, we can keep the replicas pretty up-to-date.

BigTable trade offs
To retain the scalability features of BigTable, some of the basic features of what RDBMS has provided is missing in the BigTable model. Here we highlight the rough edges of Bigtable.

1) Primitive transaction support
Transaction protection is only guaranteed within a single row. In other words, you cannot start a atomic transaction to modify multiple rows.

2) Primitive isolation support
While you are reading a row, other people may have modified the same row and update it before you. Your view is not current anymore but your later update can easily wipe off other people's change.

There are many techniques how concurrent update can be isolated, including pessimistic approach like locking or optimistic approach by using vector clock to be the version stamp. But to the best of my understanding, there is no robust test-and-set operation in the BigTable model (this is some getLock mechanism in Hbase which I haven't looked into), my impression is that there is no easy way to check there is no concurrent update happen in between.

Because of this limitation, I think BigTable model is more suitable for those applications where concurrent update to the same row is very rare, or some inconsistency is tolerable at the application level. Fortunately, there are still a lot of applications falling into this bucket.

3) No indexes
Notice that data within BigTable are all physically sorted; by rowid, column name and timestamp. There is no index from the column value to its containing rowid.

This model is quite different from RDBMS where you typically define a table and worry about defining the index later. There is no such "index" concept in BigTable and you need to carefully plan out the physical sorting order of your data layout.

Lacking index turns out to be quite inconvenient and many people using Bigtable ends up building their own index at the application level. This usually results in having a highly denormalized data model with lots of column family who store links to other tables. Any update to the base data need to carefully update these other column family as well. From a performance angle, this is actually better than maintaining index in RDBMS because Bigtable is optimized from writes. However, since it is now the application logic to maintain the index, this can be a source of application bugs.

4) No referential integrity enforcement
As mentioned above, since you are building artificial index at the application level, you need to maintain the integrity of your index as well. This includes update your index when the base data is inserted, modified or deleted. This kind of handling logic is traditionally residing at the RDBMS level, but since BigTable has no such referential integrity concept, this responsibility is now landed on your application logic.

5) Lack of surrounding tools
As NOSQL or BigTable is very new, the tools surrounding it is definitely not comparable to the RDBMS world at this moment, such tools includes report generation, BI, data warehouse ... etc.

I observe the general trend that most NOSQL products are moving towards the direction to provide an ODBC / JDBC interface to integrate with existing tool markets easier. But at this moment, to the best of my understanding, such interface is not wide spread yet.


Design Patterns for Bigtable model
Due to the very different model of Bigtable, the data model design methodology is also quite different from traditional RDBMS schema design. Here is a sequence of steps that are pretty common ...

1) Identify all your query scenarios
Since there is no index concept, you have to plan out carefully how your data is physically sorted. Therefore it is important to find all your query use cases first.

2) Define your "entity table" and its corresponding column families
For an entity table, it is pretty common to have one column family storing all the entity attributes, and multiple column families to store the links to other entities. (e.g. A "UserTable" may contain a column family "baseInfo" to store all attributes of the user, a column family "friend" to store the links to another user, a column family "company" to store links to another CompanyTable)

3) Define your "index table"
The "index table" is what your application build to support reverse lookup. The "key" is typically base on the search criteria you have identified in your query scenario. It is not uncommon that each query may have its own specific index table.

4) Make sure your application logic updates the index correctly
Since the index table has to be maintained by application logic, you need to check to make sure it is done correctly. In many cases, this can be quite a source of bugs.

It is important to realize that NOSQL is not advocating a replacement of RDBMS which has been proven in many lines of application. The NOSQL should be considered a complementary technologies for some niche area where RDBMS is not covering well.

5) Design your update to be idempotent
Max's post has a good articulation on this. The basic idea is that due to the eventual consistency model based on "read/repair" and "quorum update". It is possible that a failed update is in fact successful. Here is how this can happen.
  1. Client issue a quorum update.
  2. The server distributed the update to all replica servers, but unfortunately doesn't get more than half to respond successfully. So it returns a failure to the client.
  3. Nevertheless, the update has been received by some minority replicas (in other words, they don't rollback even the update is not successful).
  4. Later, if the client read one of this minority, it will get this update (even it has failed). Even more, since this update has a later version, it will read-repair the other copies (ie: further propagate the failed update).
Therefore, the usual recovery is that user should retry the operation when it fails. And the application logic need to deal with potentially duplicated updates. One way is to find some way to detect duplications and ignore them once detected.

Sunday, August 29, 2010

Designing algorithms for Map Reduce

Since the emerging of Hadoop implementation, I have been trying to morph existing algorithms from various areas into the map/reduce model. The result is pretty encouraging and I've found Map/Reduce is applicable in a wide spectrum of application scenarios.

So I want to write down my findings but then found the scope is too broad and also I haven't spent enough time to explore different problem domains. Finally, I realize that there is no way for me to completely cover what Map/Reduce can do in all areas, so I just dump out what I know at this moment over the long weekend when I have an extra day.

Notice that Map/Reduce is good for "data parallelism", which is different from "task parallelism". Here is a description about their difference and a general parallel processing design methodology.

I'll cover the abstract Map/Reduce processing model below. For a detail description of the implementation of Hadoop framework, please refer to my earlier blog here.


Abstract Processing Model
There are no formal definition of the Map/reduce model. Basic on the Hadoop implementation, we can think of it as a "distributed merge-sort engine". The general processing flow is as follows.
  • Input data is "split" into multiple mapper process which executes in parallel
  • The result of the mapper is partitioned by key and locally sorted
  • Result of mapper of the same key will land on the same reducer and consolidated there
  • Merge sorted happens at the reducer so all keys arriving the same reducer is sorted

Within the processing flow, user defined functions can be plugged-in to the framework.
  • map(key1, value1) -> emit(key2, value2)
  • reduce(key2, value2_list) -> emit(key2, aggregated_value2)
  • combine(key2, value2_list) -> emit(key2, combined_value2)
  • partition(key2) return reducerNo
Design the algorithm for map/reduce is about how to morph your problem into a distributed sorting problem and fit your algorithm into the user defined functions of above.

To analyze the complexity of the algorithm, we need to understand the processing cost, especially the cost of network communication in such a highly distributed system.

Lets first consider the communication between Input data split and Mapper. To minimize this overhead, we need to run the mapper logic at the data split (without moving the data). How well we do this depends on how the input data is stored and whether we can run the mapper code there. For HDFS and Cassandra, we can the mapper at the storage node and the scheduler algorithm of JobTracker will assign the mapper to the data split that it collocates with and hence significantly reduce the data movement. Other data store such as Amazon S3 doesn't allow execution of mapper logic at the storage node and therefore incur more data traffic.

The communication between Mapper and Reducer cannot be collocated because it depends on the emit key. The only mechanism available is the combine() function which can perform a local consolidation and hence can reduce the data sent to the reducer.

Finally the communication between the reducer and the output data store depends on the store's implementation. For HDFS, the data is triply replicated and hence the cost of writing can be high. Cassandra (a NOSQL data store) allows configurable latency with various degree of data consistency trade-off. Fortunately, in most case the volume of result data after a Map/Reduce processing is not high.

Now, we see how to fit various different kinds of algorithms into the Map/Reduce model ...


Map-Only
"Embarrassing parallel" problems are those that the same processing is applied in each data element in a pretty independent way, in other words, there is no need to consolidate or aggregate individual results.

These kinds of problem can be expressed as a Map-only job (by specifying the number of reducers to zero). In this case, Mapper's emitted result will directly go to the output format.

Some examples of map-only examples are ...
  • Distributed grep
  • Document format conversion
  • ETL
  • Input data sampling

Sorting
As we described above, Hadoop is fundamentally a distributed sorting engine, so using it for sorting is a natural fit.

For example, we can use an Identity function for both map() and reduce(), then the output is equivalent to sorting the input data. Notice that we are using a single reducer here. So the merge is still sequential although the sorting is done at the mapper in parallel.

We can perform the merge in parallel by using multiple reducers. In this case, output of each reducer are sorted. We may need to do a final merge on all the reducer's output. Another way is to use a customized partition() function such that the keys are partitioned by range. In this case, each reducer is sorting a particular range and the final result is just to concatenate the each reducer's sorted result.
partition(key) {
  range = (KEY_MAX - KEY_MIN) / NUM_OF_REDUCERS
  reducer_no = (key - KEY_MIN) / range
  return reducer_no
}


Inverted Indexes
The map reduce model is originated from Google which has a lot of scenarios of building large scale inverted index. Building an inverted index is about parsing different documents to build a word -> document index for keyword search.

In fact, inverted index is pretty general and can be applied in many scenarios. To build an inverted index, we can feed the mapper each document (or lines within a document). The Mapper will parse the words in the document to emit [word, doc] pairs along with other metadata such as where in the document this word occurs ... etc. The reducer can simply be an identity function that just dump out the list, or it can perform some statistic aggregation per word.

In a more general form of Inverted index, there is a "container" and "element" concept. The Map and Reduce function will be organized in the following patterns.
map(key, container) {
  for each element in container {
      element_meta =
           extract_metadata(element, container)
      emit(element, [container_id, element_meta])
  }
}

reduce(element, container_ids) {
  element_stat =
       compute_stat(container_ids)
  emit(element, [element_stat, container_ids])
}

In Text index, we are not just counting the actual frequency of the terms but also adjust its weighting based on its frequency distribution so common words will have less significance when they appears in the document. The final value after normalization is called TF-IDF (term frequency times inverse document frequency) and can be computed using Map Reduce as well.


Simple Statistics ComputationComputing max, min, count is very straightforward since this operation is commutative and associative. Each mapper will perform the local computation and send the result to a single reducer to do the final computation.

Combine function is typically used to reduce the network traffic. Notice that the input to the combine function must look the same as the input to the reducer function and the output of the combine function must look the same as the output of the map function. There is also no guarantee that the combiner function will be invoked at all.

class Mapper {
  buffer

  map(key, number) {
      buffer.append(number)
      if (buffer.is_full) {
          max = compute_max(buffer)
          emit(1, max)
      }
  }
}


class Reducer {
  reduce(key, list_of_local_max) {
      global_max = 0
      for local_max in list_of_local_max {
          if local_max > global_max {
              global_max = local_max
          }
      }        emit(1, global_max)
  }
}


class Combiner {
  combine(key, list_of_local_max) {
      local_max = maximum(list_of_local_max)
      emit(1, local_max)
  }
}
Computing avg is done in a similar way except that instead of computing the local avg, we compute the local sum and local count. The reducer will do the final sum divided by the final count to come up with the final avg.

Computing a histogram is pretty common in statistics and can give a quick idea about the data distribution. A typical approach is to divide the number into different intervals. The mapper will compute the count per interval, and emit that per interval and the reducer will compute the sum of that interval.
class Mapper {
  interval_start = [0, 20, 40, 60, 80]

  map(key, number) {
      i = 0;
      while (i < NO_OF_INTERVALS) {
          if (number < interval_start[i]) {
              emit(i, 1)
              break
          }
      }
  }
}


class Reducer {
  reduce(interval, counts) {
      total_counts = 0
      for each count in counts {
          total_counts += count
      }
      emit(interval, total_counts)
  }
}


class Combiner {
  combine(interval, occurrence) {
      emit(interval, occurrence.size)
  }
}
Notice that a non-uniform distribution of values across intervals may cause an unbalanced workload among reducers and hence undermine the degree of parallelism. We'll address this in the later part of this post.


In-Mapper Combine
Jimmy Lin, in his excellent book, talks about a technique call "in-mapper combine" which regains control at the application level when the combine takes place. The general idea is to maintain a HashMap to buffer the intermediate result and has a separate logic to determine when to actually emit the data from the buffer. The general code structure is as follows ...
class Mapper {
  buffer

  init() {
      buffer = HashMap.new
  }

  map(key, data) {
      elements = process(data)
      for each element {
          ....
          check_and_put(buffer, k2, v2)
      }
  }

  check_and_put(buffer, k2, v2) {
      if buffer.full {
          for each k2 in buffer.keys {
              emit(k2, buffer[k2])
          }
      }
  }

  close() {        for each k2 in buffer.keys {
          emit(k2, buffer[k2])
      }    }
}

SQL Model
The SQL model can be used to extract data from the data source. It contains a number of primitives.

Projection / Filter
This logic is typically implemented in the Mapper
  • result = SELECT c1, c2, c3, c4 FROM source WHERE conditions
Aggregation / Group by / Having
This logic is typically implemented in the Reducer
  • SELECT sum(c3) as s1, avg(c4) as s2 ... FROM result GROUP BY c1, c2 HAVING conditions
The above example can be realized by the following map/reduce job
class Mapper {
  map(k, rec) {
      select_fields =
          [rec.c1, rec.c2, rec.c3, rec.c4]
      group_fields =
          [rec.c1, rec.c2]
      if (filter_condition == true) {
          emit(group_fields, select_fields)
      }
  }
}

class Reducer {
  reduce(group_fields, list_of_rec) {
      s1 = 0
      s2 = 0
      for each rec in list_of_rec {
          s1 += rec.c3
          s2 += rec.c4
      }
      s2 = s2 / rec.size
      if (having_condition == true) {
          emit(group_fields, [s1, s2])
      }
  }
}

Data Joins
Joining 2 data set is a very common operation in Relational Data Model and has been very mature in RDBMS implementation. The common join mechanism in a centralized DB architecture is as follows
  1. Nested loop join -- This is the most basic and naive mechanism and is organized as two loops. The outer loop reads from data set1, the inner loop scan through the whole data set2 and compare with the records just read from data set1.
  2. Indexed join -- An index (e.g. B-Tree index) is built for one of the data sets (say data set2 which is the smaller one). The join will scan through data set1 and lookup the index to find the matched records of data set2.
  3. Merge join -- Pre-sort both data sets so they are arranged physically in increasing order. The join is realized by just merging the two data sets. a) Locate the first record in both data set1 & set2, which is their corresponding minimum key b) In the one with a smaller minimum key (say data set1), keep scanning until finding the next key which is bigger than the minimum key of the other data set (ie. data set2), call this the next minimum key of data set1. c) Switch position and repeat the whole thing until one of the data set is exhausted.
  4. Hash / Partition join -- Partition the data set1 and data set2 into smaller size and apply other join algorithm in a smaller data set size. A linear scan with a hash() function is typically performed to partition the data sets such that data in set1 and data in set2 with the same key will land on the same partition.
  5. Semi join -- This is mainly used to join two sets of data that is stored at different locations and the goal is to reduce the amount of data transfer such that only the full records appears in the final joint result will be send through. a) Data set2 will send its key set to machine holding Data set1. b) Machine holding Data set1 will do a join and send back the records in Data set1 that matches one of the send-over keys. c) The machine holding data set2 will do a final join to the data send back.
In the map reduce environment, it has the corresponding joins.

General reducer-side join
This is the most basic one, records from data set1 and set2 with the same key will land on the same reducer, which will then do a cartesian product. The downside of this model is that the reducer need to have enough memory to hold all records of each key.
map(k1, rec) {
  emit(rec.key, [rec.type, rec])
}

reduce(k2, list_of_rec) {
  list_of_typeA = []
  list_of_typeB = []
  for each rec in list_of_rec {
      if (rec.type == 'A') {
          list_of_typeA.append(rec)
      } else {
          list_of_typeB.append(rec)
      }
  }

  # Compute the catesian product
  products = []
  for recA in list_of_typeA {
      for recB in list_of_typeB {
          emit(k2, [recA, recB])
      }
  }
}

Optimized reducer-side join
You can "secondary sort" the data type for each key by defining a customized partition function. In this model, you arrange the data type (which has less records per key to arrive first) and you only need to store these types.
map(k1, rec) {
  emit([rec.key, rec.type], rec])
}

partition(key_pair) {
  super.partition(key_pair[0])
}

reduce(k2, list_of_rec) {
  list_of_typeA = []
  for each rec in list_of_rec {
      if (rec.type == 'A') {
          list_of_typeA.append(rec)
      } else { # receive records of typeA
          for recA in list_of_typeA {
              emit(k2, [recA, rec])
          }
      }
  }
}

While being very flexible, the downside of Reducer side join is that all data need to be transfer from the mapper to the reducer and then result write to HDFS. Map-side join explore some special arrangement of the input file such that the join is being perform at the mapper. The advantage of doing in the mapper is that we can exploit the collocation of the Map reduce framework such that the mapper will be allocated an input split in its local machine, hence reduce the data transfer from the disk to the mapper. After the map-side join, the result is written directly to the output HDFS files and hence eliminate the data transfer between the mapper and the reducer.

Map-side partition join
In this model, it requires the 2 data sets to be partitioned into 2 sets of partition files (same number of partitions for each set). The size of the partition is such that it can fit into the memory of the Mapper machine. We also need to configure the Map/Reduce job such that there is no split in the partition file, in other words, the whole partition is assigned to a mapper task.

The mapper will detect the partition of the input file and then read the corresponding partition file of the other data set into an in-memory hashtable. After that, the mapper will lookup the Hashtable to do the join.
class Mapper {
  map = Hashtable.new

  init() {
      partition = detect_input_filename()
      map = load("hdfs://dataset2/" + partition)
  }

  map(k1, rec1) {
      rec2 = map[rec1.key]
      if (rec2 != nil) {
          emit(rec1.key, [rec1, rec2])
      }
  }
}

Map-side partition merge join
In additional, if the partition file is also sorted, then the mapper can use a merge join, which has an even smaller memory footprint.
class Mapper {
  rec2_key = nil
  next_rec2 = nil
  list_of_rec2 = []
  file = nil

  init() {
      partition = detect_input_filename()
      file = open("hdfs://dataset2/" + partition, "r")
      next_rec2 = file.read()
      fill_rec2_list()
  }

  # Fill up the list of rec2 list which has the same key
  fill_rec2_list() {
      rec2_key = next_rec2.key
      list_of_rec2.append(next_rec2)
      next_rec2 = file.read
      while(next_rec2.key == key) {
          list_of_rec2.append(next_rec2)
      }
  }

  map(k1, rec1) {
      while (rec1.key > rec2_key) {
          fill_rec2_list()
      }
        while (rec1.key == rec2.key) {
          for rec2 in list_of_rec2 {
              emit(rec1.key, [rec1, rec2])
          }
      }
    }
}

Memcache join
The model is very straightforward, the second data set is loaded into a distributed hash table (like memcache) which has effectively unlimited size. The mapper will receive input split from the first data set and then lookup the memcache for the corresponding record of the other data set.

There are also some other more sophisticated join mechanism such as semi-join described in this paper.

Graph Algorithms
Many problems can be modeled as a graph of Node and Edges. In the Search engine environment, computing the rank of a document using Page Rank or Hits can be model as a sequence of iterations of Map/Reduce jobs.

In the past, I have been blog a number of very basic graph algorithms in map reduce including doing topological sort, finding shortest path, minimum spanning tree etc. and also how to recommend people connection using Map/Reduce.

Due to the fact that graph traversal is inherently sequential, I am not sure Map/Reduce is the best parallel processing model for graph processing. Another problem is that due to the "stateless nature" of map() and reduce() functions, the whole graph need to be transferred between mapper and reducer which incur significant communication costs. Jimmy Lin has described a clever technique called Shimmy which exploit using a special partitioning function which let the reducer to retain the ownership of nodes across map/reduce jobs. I have described this technique as well as a general model of Map/Reduce graph processing in a previous blog.

I think a parallel programming model specific for Graph processing will perform much better. Google's Pregel model is a good example of that.


Machine Learning
Many of the machine learning algorithm involve multiple iterations of parallel processing that fits very well into Map/Reduce model.

For example, we can use map reduce to calculate the statistics for probabilistic methods such as naive Bayes.

A simple example of computing K-Means cluster can also be done in the following way.
  • Input: A set of points, with k initial centrods
  • Output: K final centroids
Iterate until no more change of membership
  1. For each point, assign it to be the member of closest centroid
  2. Re-compute the centroid from the assigned point members


For a complete list of Machine learning algorithms and how they can be implemented using the Map/Reduce model, here is a very good paper.


Matrix arithmetic
A lot of real-life relationships can be represented as a Matrix. One example is the vector space model of Information Retrieval where the column represents docs and the row represents terms. Another example is the social network graph where the column as well as the row representing people and a binary value of each cell to represent a "friend" relationship. In this case, M + M.M represents all the people that I can reach within 2 degree.

Processing for dense matrix is very easy to parallelized. But since the sequential version is O(N^3), it is not that interesting for Matrix with large size (millions range in rows and columns).

A lot of real-world graph problem can be represented as sparse matrix. So my interests is to focus more in the processing of sparse matrix. I don't have much to share at this moment but I hope this is something I will blog about in future.

Saturday, August 28, 2010

The Limitations of SPARQL

Recently, I have been looking at RDF model and try to compare that with the property graph model that I mention in a previous post. I also look at the SPARQL query model. While I think it is a very powerful query language based by variable bindings, I also observe a couple of limitations that it doesn't handle well.

Note that I haven't used SPARQL in very simple examples and don't claim to be expert in this area. I am hoping my post here can invite other SPARQL experts to share their experience.

Here are the limitations that I have seen.
Support of Negation

Because of the “Open World” assumption, SPARQL doesn’t support “negation” well, this means expressing "negation" in SPARQL is not easy.

  • Find all persons who is Bob’s friends but doesn’t know Java
  • Find all persons who know Bob but doesn't know Alice
Support of Path Expression

In SPARQL, expressing a variable length path is not easy.

  • Find all posts written by Bob’s direct and indirect friends (everyone reachable from Bob)
Predicates cannot have Properties
This may be a RDF limitation that SPARQL inherits. Since RDF represents everything in Triples. It is easy to implement properties of a Node using extra Triples, but it is very difficult to implement properties in Edges.

In SPARQL, there is no way to attaching a property to a “predicate”.

  • Bob knows Peter for 5 years
RDF inference Rule

Inference rules are build around RDFS and OWL which is focusing mainly on type and set relationships and is implemented using a Rule: (conditions => derived triple) expression. But it is not easy to express a derived triples whose object’s value is an expression of existing triples.

  • Family income is the sum of all individual member’s income
Support of Fuzzy Matches with Ranked results
SPARQL is based on a boolean query model which is designed for exact match. Express a fuzzy match with ranked result is very difficult.

  • Find the top 20 posts that is “similar” to this post ranked by degree of similarity (lets say similarity is measured by the number of common tags that the 2 posts share)

I am also very interested to see if there is any large scale deployment of RDF graph in real-life scenarios. I am not aware of any popular social network sites are using RDF to store the social graph or social activities. I guess this may be due to scalability of the RDF implementation today. I may be wrong though.

Wednesday, August 4, 2010

Map/Reduce to recommend people connection

Once common feature in Social Network site is to recommend people connection. e.g. "People you may know" from Linkedin. The basic idea is very simple; if person A and person B doesn't know each other but they have a lot of common friends, then the system should recommend person B to person A and vice versa.

From a graph theory perspective, for each person who is 2-degree reachable from person A, we count how many distinct paths (with 2 connecting edges) exist between this person and person A. Rank this list in terms the number of paths and show the top 10 persons that person A should connect with.

We should how we can use Map/Reduce to compute this top-10 connection list for every person. The problem can be stated as: For every person X, we determine a list of person X1, X2 ... X10 which is the top 10 persons that person X has common friends with.

The social network graph is generally very sparse. Here we assume the input records is an adjacency list sorted by name.
"ricky" => ["jay", "peter", "phyllis"]
"peter" => ["dave", "jack", "ricky", "susan"]
We use two rounds of Map/Reduce job to compute the top-10 list

First Round MR Job
The purpose of this MR job is to compute the number of distinct path between all pairs of people who is 2 degree separated from each other.
  • In Map(), we do a cartesian product for all pairs of friends (since these friends may be connected in 2-dgrees). We also need to eliminate the pairs if they already have a direct connection. Therefore, the The Map() function should also emit pairs of direct connected persons. We need to order the key space such that all keys with the same pair of people with go to the same reducer. On the other hand, we need the pair of direct connection come before the pairs of 2 degree of separations.
  • In Reduce(), we know all the key pairs reaching the same reducer will be sorted. So the direct connect pair will come before the 2-degree pairs. So the reducer just need to check if the first pair is a direct connected one and if so skip the rest.
Input record ...  person -> connection_list
e.g. "ricky" => ["jay", "john", "mitch", "peter"]
also the connection list is sorted by alphabetical order

def map(person, connection_list)
  # Compute a cartesian product using nested loops
  for each friend1 in connection_list
     # Eliminate all 2-degree pairs if they already
     # have a one-degree connection
     emit([person, friend1, 0])
     for each friend2 > friend1 in connection_list
         emit([friend1, friend2, 1],  1)

def partition(key)
  #use the first two elements of the key to choose a reducer
  return super.partition([key[0], key[1]])

def reduce(person_pair, frequency_list)
  # Check if this is a new pair
  if @current_pair != [person_pair[0], person_pair[1]]
      @current_pair = [person_pair[0], person_pair[1]]
      # Skip all subsequent pairs if these two person
      # already know each other
      @skip = true if person_pair[2] == 0

  if !skip
      path_count = 0
      for each count in frequency_list
          path_count += count
      emit(person_pair, path_count)

Output record ... person_pair => path_count
e.g. ["jay", "john"] => 5


Second Round MR Job

The purpose of this MR job is to rank the connections for every person by the number of distinct path between them.
  • In Map(), we rearrange the input records so it will be sorted before reaching the reducer
  • In Reduce(), all the connections from the person is sorted, we just need to aggregate the top 10 to a list and then write the list out.
Input record = Output record of round 1

def map(person_pair, path_count)
  emit([person_pair[0], path_count], person_pair[1])

def partition(key)
  #use the first element of the key to choose a reducer
  return super.partition(key[0])

def reduce(connection_count_pair, candidate_list)
  # Check if this is a new person
  if @current_person != connection_count_pair[0]
      emit(@current_person, @top_ten)
      @top_ten = []
      @current_person = connection_count_pair[0]

  #Pick the top ten candidates to connect with
  if @top_ten.size < 10
      for each candidate in candidate_list
          @top_ten.append([candidate, connection_count_pair[1]])
          break if @pick_count > 10

Output record ... person -> candidate_count_list
e.g.  "ricky" => [["jay", 5],  ["peter", 3] ...]

Tuesday, July 20, 2010

Graph Processing in Map Reduce

In my previous post about Google's Pregel model, a general pattern of parallel graph processing can be expressed as multiple iterations of processing until a termination condition is reached. Within each iteration, same processing happens at a set of nodes (ie: context nodes).

Each context node perform a sequence of steps independently (hence achieving parallelism)
  1. Aggregate all incoming messages received from its direct inward arcs during the last iteration
  2. With this aggregated message, perform some local computation (ie: the node and its direct outward arcs' local state)
  3. Pass the result of local computation along all outward arcs to its direct neighbors
This processing pattern can be implemented using Map/Reduce model, using a MR job for each iteration. The sequence is a little different from above. Typically a mapper will perform (2) and (3) where it emits the message using its neighbor's node id as key. Reducer will be responsible to perform (1).

Issue of using Map/Reduce

However, due to the functional programming nature of Map() and Reduce(), M/R does not automatically retain "state" between jobs. To retain the graph across iterations, the mapper need to explicitly pass along the corresponding portion of the graph to the reducer, in additional to the messages itself. Similarly, the reducer need to handle a different type of data passed along.

map(id, node) {
  emit(id, node)
  partial_result = local_compute()
  for each neighbor in node.outE.inV {
      emit(neighbor.id, partial_result)
  }
}

reduce(id, list_of_msg) {
  node = null
  result = 0

  for each msg in list_of_msg {
      if type_of(msg) == Node
          node = msg
      else
          result = aggregate(result, msg)
      end
  }

  node.value = result
  emit(id, node)
}

This downside of this approach is a substantial amount of I/O processing and bandwidth is consumed to just passing the graph itself around.

Google's Pregel model provides an alternative message distribution model so that state can be retained at the processing node across iterations.

The Schimmy Trick

In a recent research paper, Jimmy Lin and Michael Schatz use a clever partition() algorithm in Map /Reduce which can achieve "stickiness" of graph distribution as well as maintaining a sorted-order of node id on disk.

The whole graph is broken down into multiple files and stored in HDFS. Each file contains multiple records and each record describe a node and its corresponding adjacency list.

id -> [nodeProps, [[arcProps, toNodeId], [arcProps, toNodeId] ...]

In addition, the records are physically sorted within the file by their node id.

There will be as many reducers as the number of above files and so each Reducer task is assigned with one of this file. On the other hand, the partition() function assign all nodes within the file to land on its associated reducer.

Mapper does the same thing before, except the first line in the method is removed as it no longer need to emit the graph.

Reducer will receive all the message emitted from the mapper, which is sorted by the Map/Reduce framework by the key (which happens to be the node id). On the other hand, the reducer can open the corresponding file in HDFS, which also maintain a sorted list of nodes based on their ids. The reducer can just read the HDFS file sequentially on each reduce() call and confident that all preceding nodes in the file has already received their corresponding messages.

reduce(id, list_of_msg) {
   nodeInFile = readFromFile()

   # Emit preceding nodes that receives no message
   while(nodeInFile.id < id)
       emit(nodeInFile.id, nodeInFile)
   end

   result = 0

   for each msg in list_of_msg {
       result = aggregate(result, msg)
   }

   nodeInFile.value = result
   emit(id, nodeInFile)
}

Although the Schimmy trick provides an improvement over the classical way of map/reduce, it only eliminates the communication between the mapper and the reducer. At each iteration, the mapper still needs to read the whole graph from HDFS to the mapper node and the reducer still need to write the whole graph back to HDFS, which maintains a 3-way replication for each file.

Hadoop provides some co-location mechanism for the mapper and try to assign files that is sitting at the same machine to the mapper. However, this co-location mechanism is not available for the reducer and so reducer still need to write the graph back over the network.

Pregel Advantage

Since Pregel model retain worker state (the same worker is responsible for the same set of nodes) across iteration, the graph can be loaded in memory once and reuse across iterations. This will reduce I/O overhead as there is no need to read and write to disk at each iteration. For fault resilience, there will be a periodic check point where every worker write their in-memory state to disk.

Also, Pregel (with its stateful characteristic), only send local computed result (but not the graph structure) over the network, which implies the minimal bandwidth consumption.

Of course, Pregel is very new and relative immature as compared to Map/Reduce.

Monday, July 12, 2010

Google Pregel Graph Processing

A lot of real life problems can be expressed in terms of entities related to each other and best captured using graphical models. Well defined graph theory can be applied to processing the graph and return interesting results. The general processing patterns can be categorized into the following ...
  1. Capture (e.g. When John is connected to Peter in a social network, a link is created between two Person nodes)
  2. Query (e.g. Find out all of John's friends of friends whose age is less than 30 and is married)
  3. Mining (e.g. Find out the most influential person in Silicon Valley)

Distributed and Parallel Graph Processing

Although using a Graph to represent a relationship network is not new, the size of network has been dramatically increase in the past decade such that storing the whole graph in one place is impossible. Therefore, the graph need to be broken down into multiple partitions and stored in different places. Traditional graph algorithm that assume the whole graph can be resided in memory becomes invalid. We need to redesign the algorithm such that it can work in a distributed environment. On the other hand, by breaking the graph into different partitions, we can manipulate the graph in parallel to speed up the processing.

Property Graph Model

The paper “Constructions from Dots and Lines” by Marko A. Rodriguez and Peter Neubauer illustrate the idea very well. Basically, a graph contains nodes and arcs.

A node has a "type" which defines a set of properties (name/value pairs) that the node can be associated with.

An arc defines a directed relationship between nodes, and hence contains the fromNode, toNode as well as a set of properties defined by the "type" of the arc.



General Parallel Graph Processing

Most of the graph processing algorithm can be expressed in terms of a combination of "traversal" and "transformation".

Parallel Graph Traversal

In the case of "traversal", it can be expressed as a path which contains a sequence of segments. Each segment contains a traversal from a node to an arc, followed by a traversal from an arc to a node. In Marko and Peter's model, a Node (Vertex) contains a collection of "inE" and another collection of "outE". On the other hand, an Arc (Edge) contains one "inV", one "outV". So to expressed a "Friend-of-a-friend" relationship over a social network, we can use the following

./outE[@type='friend']/inV/outE[@type='friend']/inV

Loops can also be expressed in the path, to expressed all persons that is reachable from this person, we can use the following

.(/outE[@type='friend']/inV)*[@cycle='infinite']

On the implementation side, a traversal can be processed in the following way
  1. Start with a set of "context nodes", which can be defined by a list of node ids, or a search criteria (in this case, the search result determines the starting context nodes)
  2. Repeat until all segments in the path are exhausted. Perform a walk from all context nodes in parallel. Evaluate all outward arcs (ie: outE) with conditions (ie: @type='friend'). The nodes that this arc points to (ie: inV) will become the context node of next round
  3. Return the final context nodes
Such traversal path can also be used to expressed inference (or derived) relationships, which doesn't have a physical arc stored in the graph model.

Parallel Graph Transformation

The main goal of Graph transformation is to modify the graph. This include modifying the properties of existing nodes and arcs, creating new arcs / nodes and removing existing arcs / nodes. The modification logic is provided by a user-defined function, which will be applied to all active nodes.

The Graph transformation process can be implemented in the following steps
  1. Start with a set of "active nodes", which can be defined by a lost of node ids, or a search criteria (in this case, the search result determines the starting context nodes)
  2. Repeat until there is no more active nodes. Execute the user-defined transformation which modifies the properties of the context nodes and outward arcs. It can also remove outwards arcs or create new arcs that point to existing or new nodes (in other words, the graph connectivity can be modified). It can also send message to other nodes (the message will be picked up in the next round) as well as receive message sent from other nodes in the previous round.
  3. Return the transformed graph, or a traversal can be performed to return a subset of the transformed graph.
Google's Pregel

Pregel can be thought as a generalized parallel graph transformation framework. In this model, the most basic (atomic) unit is a "node" that contains its properties, outward arcs (and its properties) as well as the node id (just the id) that the outward arc points to. The node also has a logical inbox to receive all messages sent to it.


The whole graph is broken down into multiple "partitions", each contains a large number of nodes. Partition is a unit of execution and typically has an execution thread associated with it. A "worker" machine can host multiple "partitions".


The execution model is based on BSP (Bulk Synchronous Processing) model. In this model, there are multiple processing units proceeding in parallel in a sequence of "supersteps". Within each "superstep", each processing units first receive all messages delivered to them from the preceding "superstep", and then manipulate their local data and may queue up the message that it intends to send to other processing units. This happens asynchronously and simultaneously among all processing units. The queued up message will be delivered to the destined processing units but won't be seen until the next "superstep". When all the processing unit finishes the message delivery (hence the synchronization point), the next superstep can be started, and the cycle repeats until the termination condition has been reached.

Notice that depends on the graph algorithms, the assignment of nodes to a partition may have an overall performance impact. Pregel provides a default assignment where partition = nodeId % N but user can overwrite this assignment algorithm if they want. In general, it is a good idea to put close-neighbor nodes into the same partition so that message between these nodes doesn't need to flow into the network and hence reduce communication overhead. Of course, this also means traversing the neighboring nodes all happen within the same machine and hinder parallelism. This usually is not a problem when the context nodes are very diverse. In my experience of parallel graph processing, coarse-grain parallelism is preferred over fine-grain parallelism as it reduces communication overhead.

The complete picture of execution can be implemented as follows:


The basic processing unit is a "thread" associated with each partition, running inside a worker. Each worker receive messages from previous "superstep" from its "inQ" and dispatch the message to the corresponding partition that the destination node is residing. After that, a user defined "compute()" function is invoked on each node of the partition. Notice that there is a single thread per partition so nodes within a partition are executed sequentially and the order of execution is undeterministic.

The "master" is playing a central role to coordinate the execute of supersteps in sequence. It signals the beginning of a new superstep to all workers after knowing all of them has completed the previous one. It also pings each worker to know their processing status and periodically issue "checkpoint" command to all workers who will then save its partition to a persistent graph store. Pregel doesn't define or mandate the graph storage model so any persistent mechanism should work well. There is a "load" phase at the beginning where each partition starts empty and read a slice of the graph storage. For each node read from the storage, a "partition()" function will be invoked and load the node in the current partition if the function returns the same node, otherwise the node is queue to another partition who the node is assigned to.

Fault resilience is achieved by having the checkpoint mechanism where each worker is instructed to save its in-memory graph partition to the graph storage periodically (at the beginning of a superstep). If the worker is detected to be dead (not responding to the "ping" message from the master), the master will instruct the surviving workers to take up the partitions of the failed worker. The whole processing will be reverted back to the previous checkpoint and proceed again from there (even the healthy worker need to redo the previous processing). The Pregel paper mention a potential optimization to just re-execute the processing of the failed partitions from the previous checkpoint by replaying the previous received message, of course this requires keeping a log of all received messages between nodes at every super steps since previous checkpoint. This optimization, however, rely on the algorithm to be deterministic (in other words, same input execute at a later time will achieve the same output).

Further optimization is available in Pregel to reduce the network bandwidth usage. Messages destined to the same node can be combined using a user-defined "combine()" function, which is required to be associative and commutative. This is similar to the same combine() method in Google Map/Reduce model.

In addition, each node can also emit an "aggregate value" at the end of "compute()". Worker will invoke an user-defined "aggregate()" function that aggregate all node's aggregate value into a partition level aggregate value and all the way to the master. The final aggregated value will be made available to all nodes in the next superstep. Just aggregate value can be used to calculate summary statistic of each node as well as coordinating the progress of each processing units.

I think the Pregel model is general enough for a large portion of classical graph algorithm. I'll cover how we map these traditional algorithms in Pregel in subsequent postings.

Reference

http://www.slideshare.net/slidarko/graph-windycitydb2010