Monday, July 12, 2010

Google Pregel Graph Processing

A lot of real life problems can be expressed in terms of entities related to each other and best captured using graphical models. Well defined graph theory can be applied to processing the graph and return interesting results. The general processing patterns can be categorized into the following ...
  1. Capture (e.g. When John is connected to Peter in a social network, a link is created between two Person nodes)
  2. Query (e.g. Find out all of John's friends of friends whose age is less than 30 and is married)
  3. Mining (e.g. Find out the most influential person in Silicon Valley)

Distributed and Parallel Graph Processing

Although using a Graph to represent a relationship network is not new, the size of network has been dramatically increase in the past decade such that storing the whole graph in one place is impossible. Therefore, the graph need to be broken down into multiple partitions and stored in different places. Traditional graph algorithm that assume the whole graph can be resided in memory becomes invalid. We need to redesign the algorithm such that it can work in a distributed environment. On the other hand, by breaking the graph into different partitions, we can manipulate the graph in parallel to speed up the processing.

Property Graph Model

The paper “Constructions from Dots and Lines” by Marko A. Rodriguez and Peter Neubauer illustrate the idea very well. Basically, a graph contains nodes and arcs.

A node has a "type" which defines a set of properties (name/value pairs) that the node can be associated with.

An arc defines a directed relationship between nodes, and hence contains the fromNode, toNode as well as a set of properties defined by the "type" of the arc.



General Parallel Graph Processing

Most of the graph processing algorithm can be expressed in terms of a combination of "traversal" and "transformation".

Parallel Graph Traversal

In the case of "traversal", it can be expressed as a path which contains a sequence of segments. Each segment contains a traversal from a node to an arc, followed by a traversal from an arc to a node. In Marko and Peter's model, a Node (Vertex) contains a collection of "inE" and another collection of "outE". On the other hand, an Arc (Edge) contains one "inV", one "outV". So to expressed a "Friend-of-a-friend" relationship over a social network, we can use the following

./outE[@type='friend']/inV/outE[@type='friend']/inV

Loops can also be expressed in the path, to expressed all persons that is reachable from this person, we can use the following

.(/outE[@type='friend']/inV)*[@cycle='infinite']

On the implementation side, a traversal can be processed in the following way
  1. Start with a set of "context nodes", which can be defined by a list of node ids, or a search criteria (in this case, the search result determines the starting context nodes)
  2. Repeat until all segments in the path are exhausted. Perform a walk from all context nodes in parallel. Evaluate all outward arcs (ie: outE) with conditions (ie: @type='friend'). The nodes that this arc points to (ie: inV) will become the context node of next round
  3. Return the final context nodes
Such traversal path can also be used to expressed inference (or derived) relationships, which doesn't have a physical arc stored in the graph model.

Parallel Graph Transformation

The main goal of Graph transformation is to modify the graph. This include modifying the properties of existing nodes and arcs, creating new arcs / nodes and removing existing arcs / nodes. The modification logic is provided by a user-defined function, which will be applied to all active nodes.

The Graph transformation process can be implemented in the following steps
  1. Start with a set of "active nodes", which can be defined by a lost of node ids, or a search criteria (in this case, the search result determines the starting context nodes)
  2. Repeat until there is no more active nodes. Execute the user-defined transformation which modifies the properties of the context nodes and outward arcs. It can also remove outwards arcs or create new arcs that point to existing or new nodes (in other words, the graph connectivity can be modified). It can also send message to other nodes (the message will be picked up in the next round) as well as receive message sent from other nodes in the previous round.
  3. Return the transformed graph, or a traversal can be performed to return a subset of the transformed graph.
Google's Pregel

Pregel can be thought as a generalized parallel graph transformation framework. In this model, the most basic (atomic) unit is a "node" that contains its properties, outward arcs (and its properties) as well as the node id (just the id) that the outward arc points to. The node also has a logical inbox to receive all messages sent to it.


The whole graph is broken down into multiple "partitions", each contains a large number of nodes. Partition is a unit of execution and typically has an execution thread associated with it. A "worker" machine can host multiple "partitions".


The execution model is based on BSP (Bulk Synchronous Processing) model. In this model, there are multiple processing units proceeding in parallel in a sequence of "supersteps". Within each "superstep", each processing units first receive all messages delivered to them from the preceding "superstep", and then manipulate their local data and may queue up the message that it intends to send to other processing units. This happens asynchronously and simultaneously among all processing units. The queued up message will be delivered to the destined processing units but won't be seen until the next "superstep". When all the processing unit finishes the message delivery (hence the synchronization point), the next superstep can be started, and the cycle repeats until the termination condition has been reached.

Notice that depends on the graph algorithms, the assignment of nodes to a partition may have an overall performance impact. Pregel provides a default assignment where partition = nodeId % N but user can overwrite this assignment algorithm if they want. In general, it is a good idea to put close-neighbor nodes into the same partition so that message between these nodes doesn't need to flow into the network and hence reduce communication overhead. Of course, this also means traversing the neighboring nodes all happen within the same machine and hinder parallelism. This usually is not a problem when the context nodes are very diverse. In my experience of parallel graph processing, coarse-grain parallelism is preferred over fine-grain parallelism as it reduces communication overhead.

The complete picture of execution can be implemented as follows:


The basic processing unit is a "thread" associated with each partition, running inside a worker. Each worker receive messages from previous "superstep" from its "inQ" and dispatch the message to the corresponding partition that the destination node is residing. After that, a user defined "compute()" function is invoked on each node of the partition. Notice that there is a single thread per partition so nodes within a partition are executed sequentially and the order of execution is undeterministic.

The "master" is playing a central role to coordinate the execute of supersteps in sequence. It signals the beginning of a new superstep to all workers after knowing all of them has completed the previous one. It also pings each worker to know their processing status and periodically issue "checkpoint" command to all workers who will then save its partition to a persistent graph store. Pregel doesn't define or mandate the graph storage model so any persistent mechanism should work well. There is a "load" phase at the beginning where each partition starts empty and read a slice of the graph storage. For each node read from the storage, a "partition()" function will be invoked and load the node in the current partition if the function returns the same node, otherwise the node is queue to another partition who the node is assigned to.

Fault resilience is achieved by having the checkpoint mechanism where each worker is instructed to save its in-memory graph partition to the graph storage periodically (at the beginning of a superstep). If the worker is detected to be dead (not responding to the "ping" message from the master), the master will instruct the surviving workers to take up the partitions of the failed worker. The whole processing will be reverted back to the previous checkpoint and proceed again from there (even the healthy worker need to redo the previous processing). The Pregel paper mention a potential optimization to just re-execute the processing of the failed partitions from the previous checkpoint by replaying the previous received message, of course this requires keeping a log of all received messages between nodes at every super steps since previous checkpoint. This optimization, however, rely on the algorithm to be deterministic (in other words, same input execute at a later time will achieve the same output).

Further optimization is available in Pregel to reduce the network bandwidth usage. Messages destined to the same node can be combined using a user-defined "combine()" function, which is required to be associative and commutative. This is similar to the same combine() method in Google Map/Reduce model.

In addition, each node can also emit an "aggregate value" at the end of "compute()". Worker will invoke an user-defined "aggregate()" function that aggregate all node's aggregate value into a partition level aggregate value and all the way to the master. The final aggregated value will be made available to all nodes in the next superstep. Just aggregate value can be used to calculate summary statistic of each node as well as coordinating the progress of each processing units.

I think the Pregel model is general enough for a large portion of classical graph algorithm. I'll cover how we map these traditional algorithms in Pregel in subsequent postings.

Reference

http://www.slideshare.net/slidarko/graph-windycitydb2010

3 comments:

Walter Bogorad said...

Thank you for the great post.
Is "master" a single point of failure? I think that in any case not much harm can be done if it is down, just the workers will not be able to proceed to the next superstep.
But what are protocols to select a new master? Also it looks like one implicit assumption is that the storage is shared and other workers can access stored state (checkpoint) of the failed worker. Is it right?

Ricky Ho said...

We can have multiple masters and use something like a Zookeeper to elect a leader master.

Yes, worker are stateless while the partition are store in a DB server that other workers can access.

Rgds,
ricky

!-!aR$#!L said...

who handles synchronization part and how?