Tuesday, July 20, 2010

Graph Processing in Map Reduce

In my previous post about Google's Pregel model, a general pattern of parallel graph processing can be expressed as multiple iterations of processing until a termination condition is reached. Within each iteration, same processing happens at a set of nodes (ie: context nodes).

Each context node perform a sequence of steps independently (hence achieving parallelism)
  1. Aggregate all incoming messages received from its direct inward arcs during the last iteration
  2. With this aggregated message, perform some local computation (ie: the node and its direct outward arcs' local state)
  3. Pass the result of local computation along all outward arcs to its direct neighbors
This processing pattern can be implemented using Map/Reduce model, using a MR job for each iteration. The sequence is a little different from above. Typically a mapper will perform (2) and (3) where it emits the message using its neighbor's node id as key. Reducer will be responsible to perform (1).

Issue of using Map/Reduce

However, due to the functional programming nature of Map() and Reduce(), M/R does not automatically retain "state" between jobs. To retain the graph across iterations, the mapper need to explicitly pass along the corresponding portion of the graph to the reducer, in additional to the messages itself. Similarly, the reducer need to handle a different type of data passed along.

map(id, node) {
  emit(id, node)
  partial_result = local_compute()
  for each neighbor in node.outE.inV {
      emit(neighbor.id, partial_result)

reduce(id, list_of_msg) {
  node = null
  result = 0

  for each msg in list_of_msg {
      if type_of(msg) == Node
          node = msg
          result = aggregate(result, msg)

  node.value = result
  emit(id, node)

This downside of this approach is a substantial amount of I/O processing and bandwidth is consumed to just passing the graph itself around.

Google's Pregel model provides an alternative message distribution model so that state can be retained at the processing node across iterations.

The Schimmy Trick

In a recent research paper, Jimmy Lin and Michael Schatz use a clever partition() algorithm in Map /Reduce which can achieve "stickiness" of graph distribution as well as maintaining a sorted-order of node id on disk.

The whole graph is broken down into multiple files and stored in HDFS. Each file contains multiple records and each record describe a node and its corresponding adjacency list.

id -> [nodeProps, [[arcProps, toNodeId], [arcProps, toNodeId] ...]

In addition, the records are physically sorted within the file by their node id.

There will be as many reducers as the number of above files and so each Reducer task is assigned with one of this file. On the other hand, the partition() function assign all nodes within the file to land on its associated reducer.

Mapper does the same thing before, except the first line in the method is removed as it no longer need to emit the graph.

Reducer will receive all the message emitted from the mapper, which is sorted by the Map/Reduce framework by the key (which happens to be the node id). On the other hand, the reducer can open the corresponding file in HDFS, which also maintain a sorted list of nodes based on their ids. The reducer can just read the HDFS file sequentially on each reduce() call and confident that all preceding nodes in the file has already received their corresponding messages.

reduce(id, list_of_msg) {
   nodeInFile = readFromFile()

   # Emit preceding nodes that receives no message
   while(nodeInFile.id < id)
       emit(nodeInFile.id, nodeInFile)

   result = 0

   for each msg in list_of_msg {
       result = aggregate(result, msg)

   nodeInFile.value = result
   emit(id, nodeInFile)

Although the Schimmy trick provides an improvement over the classical way of map/reduce, it only eliminates the communication between the mapper and the reducer. At each iteration, the mapper still needs to read the whole graph from HDFS to the mapper node and the reducer still need to write the whole graph back to HDFS, which maintains a 3-way replication for each file.

Hadoop provides some co-location mechanism for the mapper and try to assign files that is sitting at the same machine to the mapper. However, this co-location mechanism is not available for the reducer and so reducer still need to write the graph back over the network.

Pregel Advantage

Since Pregel model retain worker state (the same worker is responsible for the same set of nodes) across iteration, the graph can be loaded in memory once and reuse across iterations. This will reduce I/O overhead as there is no need to read and write to disk at each iteration. For fault resilience, there will be a periodic check point where every worker write their in-memory state to disk.

Also, Pregel (with its stateful characteristic), only send local computed result (but not the graph structure) over the network, which implies the minimal bandwidth consumption.

Of course, Pregel is very new and relative immature as compared to Map/Reduce.

1 comment:

Ali said...

Thanks, I finally understand how/why Pregel can work better on graphs than MapReduce.

Just one more question; if the entire graph is too big to fit in memory and on each iteration only an small portion of graph is going to participate and change is it better to continue using MapReduce?