Thursday, February 4, 2010

NoSQL GraphDB

I received some constructive criticism regarding my previous blog in NoSQL patterns that I covered only the key/value store but have left out Graph DB.

The Property Graph Model

A property graph is a collection of Nodes and Directed Arcs. Each node represents an entity and has an unique id as well as a Node Type. The Node Type defines a set of metadata that the node has. Each arc represents a unidirectional relationship between two entities and has an Arc Type. The Arc Type defines a set of metadata that the arc has.


General Graph Processing

I found many of the graph algorithms follows a general processing pattern. There are multiple rounds of (sequential) processing iterations. Within each iteration, there are a set of active nodes that perform local processing in parallel. The local processing can modify the node's properties, adding or removing links to other nodes, as well as sending message across links. All message passing are done after the local processing.


This model is similar to the Google Pregel model.

Notice that this model maps well into parallel computing environment where the processing of the set of active node can be spread across multiple processors (or multiple machines in a cluster)

Notice that all messages from all in-coming links are arrived before the link changes within local processing. On the other hand, all message send to all out-going links after the links have changed after the local processing. The term "local processing" means it cannot modify the properties of other nodes or other links.

Because two nodes can simultaneously modify the link in-between them, the following conflicts can happen
  • A node delete a link while other node modifies the link properties.
  • Both nodes on each side modify the properties of the link in-between
A conflict resolution mechanism needs to be attached to each Arc Type to determine how the conflict should be resolved, either determining who is the winner or how to merge the effects.

Neo4j provide a restricted, single-threaded graph traversal model
  • At each round, the set of active nodes is always a single node
  • The set of active nodes of next round is determined by the traversal policy (breath or depth-first), but is still a single node
  • It offer a callback function to determine whether this node should be included in the result set
Gremlin, on the other hand, provides an interactive graph traversal model where user can step through each iteration. It uses an XPath like syntax to express the navigation.
  • The node is expressed as Node(id, inE, outE, properties)
  • The arc is expressed as Arc(id, type, inV, outV, properties)
As an example, if the graph represents a relationship between 2 types of entities. Person writes Book, then given a person, her co-author can be expressed in the following XPath expression
./outE[@type='write']/inV/inE[@type='write']/outV

Path Algebra

Marko Rodriguez has described of a set of matrix operations when the Graph is represented as adjacency matrix. Graph algorithms can be describe as an algebraic form.

Traverse operation can be expressed as Matrix multiplication. If A is the adjacency matrix of the graph. Then A.A represents connection with path of length = 2.

Similar, Merge operation can be expressed as Matrix addition. For example, (A + A.A + A.A.A) represent connectivity within 3 degree of reach.

In a special case when the graph represents a relationship between 2 types of entities. e.g. if A represents a authoring relationship (person write a book). Then A.(A.transpose()) represents co-author relationship (person co-author with another person).

Marko also introduce a set of Filter operations, (not filter / clip filter /column filter /row filter /vertex filter)

Map Reduce
Depends on the traversal strategies inherit from the graph algorithms, certain algorithms which has higher sequential dependency doesn't fit well into parallel computing. For example, graph algorithms with a breath-first search nature fits better into parallel computing paradigm with those that has a depth-first search nature. On the other hand, perform search at all nodes fits better in parallel computing than perform search at a particular start node.

There are different storage representation of graph, from incident list, incident matrix, adjacency list and adjacency matrix. For sparse graph (which is the majority cases), lets assume adjacency list is used for the storage model for subsequent discussion.

In other words, the graph is represented as a list of records, each record is [node, connected_nodes].

There are many graph algorithms and it is not my intend is have an exhausted list. Below are the one that I have used in my previous projects that can be translated into Map/Reduce form.

Topological Sort is commonly used to sort out a work schedule based on dependency tree. It can be done as follows ...

# Topological Sort

# Input records
[node_id, prerequisite_ids]

# Output records[node_id, prerequisite_ids, dependent_ids]
class BuildDependentsJob {
 map(node, prerequisite_ids) {
   for each prerequisite_id in prerequisite_ids {
     emit(prerequisite_id, node)
   }
 }

 reduce(node, dependent_ids) {
   emit(node, [node, prerequisite_ids, dependent_ids])
 }
}

class BuildReadyToRunJob {
 map(node, node) {
   if ! done?(node) and node.prerequisite_ids.empty? {
     result_set.append(node)
     done(node)
     for each dependent_id in dependent_ids {
       emit(dependent_id, node)
     }
   }
 }

 reduce(node, done_prerequsite_ids) {
   remove_prerequisites(node, done_prerequsite_ids)
 }
}

# Topological Sort main program
main() {
 JobClient.submit(BuildDependentsJob.new)
 Result result = []

 result_size_before_job = 0
 result_size_after_job = 1

 while (result_size_before_job < result_size_after_job) {
   result_size_before_job = result.size
   JobClient.submit(BuildReadyToRunJob.new)
   result_size_after_job = result.size
 }

 return result
}

Minimum spanning tree is a pretty common algorithm, the Prim's algorithm looks like the following.
# Minimum Spanning Tree (MST) using Prim's algorithm

Adjacency Matrix, W[i][j] represents weights
W[i][j] = infinity if node i, j is disconnected

MST has nodes in array N = [] and arcs A = []
E[i] = minimum weighted edge connecting to the skeleton
D[i] = weight of E[i]

Initially, pick a random node r into N[]
N = [r] and A = []
D[r] = 0; D[i] = W[i][r];

Repeat until N[] contains all nodes
 Pick node k outside N[] where D[k] is minimum
 Add node k to N; Add E[k] to A
 for all node p connected to node k
   if W[p][k] < D[p]
     D[p] = W[p][k]
     E[p] = k
   end
 end
end

We are doing the map/reduce here because it is very similar to another popular algorithm single source shortest path. The Map/Reduce form of the SPSS based on Dijkstra's algorithm is as follows ...
# Single Source Shortest Path (SSSP) based on Dijkstra

Adjacency Matrix, W[i][j] represents weights of arc
  connecting node i to node j
W[i][j] = infinity if node i, j is disconnected

SSSP has nodes in array N = []
L[i] = Length of minimum path so far from the source node
Path[i] = Identified shortest path from source to i

Initially, put the source node s into N[]
N = [s]
L[s] = 0; L[i] = W[s][i];
Path[i] = arc[s][i] for all nodes directly connected
from source.

Repeat until N[] contains all nodes
 Pick node k outside N[] where L[k] is minimum
 Add node k to N;
 for all node p connected from node k {
   if L[k] + W[k][p] < L[p] {
     L[p] = L[k] + W[k][p]
     Path[p] = Path[k].append(Arc[k][p])
   }
 }
end repeat


# Here is the map/reduce pseudo code would look like

class FindMinimumJob
 map(node_id, path_length) {
   if not N.contains(node_id) {
     emit(1, [path_length, node_id])
   }
 }

 reduce(k, v) {
   min_node, min_length = minimum(v)
   for each node in min_node.connected_nodes {
     emit(node, min_node)
   }
 }
}

class UpdateMinPathJob {
 map(node, min_node) {
   if L[min_node] + W[min_node][node] < L[node] {
     update L[node] = L[min_node] + W[min_node][node]
      Path[node] =
       Path[min_node].append(arc(min_node, node))
    }
 }
}
# Single Source Shortest Path main program
main() {
 init()
 while (not N.contains(V)) {
   JobClient.submit(FindMinimumJob.new)    JobClient.submit(UpdateMinPathJob.new)  }

 return Path
}

The same SSSP problem can also be solved using breath-first search. The intuition is to grow a frontier from the source at each iteration and update the shortest distance from the source.

# Single Source Shortest Path (SSSP) using BFS

Adjacency Matrix, W[i][j] represents weights of arc
  connecting node i to node j
W[i][j] = infinity if node i, j is disconnected

Frontier nodes in array F
L[i] = Length of minimum path so far from the source node
Path[i] = Identified shortest path from source to i

Initially,
F = [s]
L[s] = 0; L[i] = W[s][i];
Path[i] = arc[s][i] for all nodes directly connected
from source.

# input is all nodes in the frontier F
# output is frontier of next round FF

class GrowFrontierJob {
 map(node) {
   for each to_node in node.connected_nodes {
     emit(to_node, [node, L[node] + W[node][to_node]])
   }
 }

 reduce(node, from_list) {
   for each from in from_list {
     from_node = from[0]
     length_via_from_node = from[1]
     if (length_via_from_node < L[node] {
       L[node] = length_via_from_node
       Path[node] =
         Path[from_node].append(arc(from_node, node))
       FF.add(node)
     }
   }
 }
}

# Single Source Shortest Path BFS main program
main() {
 init()
 while (F is non-empty) {
   JobClient.set_input(F)
   JobClient.submit(FindMinimumJob.new)
   copy FF to F
   clear FF
 }

 return Path
}

9 comments:

jernst said...

Good to see a detailed description that doesn't come from a vendor ;-) But let me provide some feedback on the part of your description of what makes a graph from my InfoGrid perspective:

In InfoGrid, each node (called MeshObject) may or may not have a type. It can have N types simultaneously (e.g. be a Customer and an Employee at the same time).

Arcs are always bidirectional, not unidirectional as you described it (I think that is true for all graph databases, not just InfoGrid.) In InfoGrid, each arc (called a Relationship) can have zero, one or more types, representing the different types of how the things relate. E.g. two MeshObjects A and B of type Employee may simultaneously participate in Relationships with the semantics A.ReportsTo.B, B.Loves.A and A.Despises.B.

Finally, what operations are available and/or desirable on a graph -- such as iteration -- highly depends on what one wants to accomplish with the graph. Logically they are an abstraction level above (the data in the database doesn't care). While useful, perhaps their discussion should be different from the discussion of a graph *database*.

Felix Halim said...

May I know for the BFS and SSSP what is the largest number of vertices and edges of the graph that you ever execute?

From your pseudocode, I suspect that the attributes of the nodes are stored in a centralized database, not along with the records in MapReduce input, right? I believe such strategy will have a bottleneck to the database connection when the graph is very large (say millions of vertices and billions of edges).

Ricky Ho said...

Not big in my case, in the range of tenths of thousands.

You are accurate that the intermediate results (between rounds) is stored in a separate data structure (not in the Map input). But it is not a central database, it is a Distributed Key/Value store. I found this pattern quite common.

And I don't see the bottleneck issue you described.

Rgds,
Ricky

Felix Halim said...

So, attributes such as L[] and path[] are stored "globally" in a distributed key/value? If you are to access L[i] or path[i] in a mapper or reducer, then you need to "connect" to the distributed key/value pairs, right?

This will be the bottleneck if the number of vertices is millions. Each MR round, there will be millions queries to this "distributed" key/value pairs. Am I right?

Ricky Ho said...

Yes, there will be millions of queries to the distributed store but notice that these queries are all spread out. There isn't any hotspot here.

Of course, you can challenge whether there is still too many round network trips within each M/R round. This is a valid challenge. And I think it is a granularity question in how you want to divide up the map task. You certainly can batch up the request to the distributed key/value store.

Ricky Ho said...

To reduce the number of network queries, you can also use a distributed key/value store (instead of HDFS) for the input and output records to MapReduce.

Tobias said...

Nice article!

Some minor corrections regarding Neo4j:

In Neo4j there is no such thing as node types. There are arc types (relationship types in Neo4j terminology), but these are not data types, as you seem to have understood things. Neo4j's relationship types are navigational keys rather than data structure enforcements. As a user of Neo4j you utilize the relationship types to traverse the proper subset of the graph. Data types are not enforced by the storage layer of Neo4j, there are components that add such features, but the kernel does nothing to enforce what properties you associate with each node or relationship.

I don't know how pregel works, since Google has not published any information about this. It does however seem to me that you have used their concept of nodes being units with behavior when forming your mental model of the field of graph databases. Pregel is a graph computational framework, not a database, thus using it as a base model will give a slightly skewed view. In Neo4j (and all other graph db's I've encountered) nodes are simply data entities, with no behavior. The node will never issue a change in the graph, that is part of the application working on the graph. Neo4j's Traversers are vastly different from the Pregel model. Traversers in Neo4j are simple breadth first or depth first traversals of the graph based on a set of relationship types. Traversers also have means of pruning the traversal and returning a filtered set of traversed nodes. Traversers are single-threaded, but for real applications they are very useful in a lot of cases. For the cases that extend beyond the usefulness of these basic traversers there are other components in the Neo4j ecosystem that add more advanced means of traversing the graph, some of them are able to run in parallel. Gremlin is a great example of such a component, that also spans more graph databases than Neo4j.

Finally I'd just like to say how much I like the work you're doing. I've read a number of your articles and think they do a good job at providing a basic overview of the way things work in the field of software at the moment. I also agree with jernst that it's great to see a vendor-independent article about graph databases. Keep up the good work!

Ricky Ho said...

Tobias,

Thanks for your note. Yes, I am fully aware that Neo4j doesn't have the type concept other than treating "type" as just an property name.

In my experience, I found have a type/schema concept to constraint the set of metadata is a good idea. And in my particular case, I use the Type to define the type of processing within the node as well as the traversal strategy. Yes, I know this is deviate from the Neo4j model.

Yes, I am trying to cover two independent concepts in one blog. Graph DB and Graph processing. First, I complete agree that they are conceptually orthogonal and I argue the end solution will be much more powerful if the Graph DB is designed with what type of Graph processing will be conducted in mind.

I attempt to describe a graph processing model that is generic enough to cover a broad set of graph algorithm. And I believe it covers the traversal model of Neo4j. Right ?

Felix Halim said...

I don't know how fast the currently available distributed key/value store. Suppose you have 10 nodes distributed key/value pair, what is the maximum (rough) number of (insert/query) per second?

In my environment, with 25 nodes, one round of MR can process a graph with 225 million vertices and 10 billion edges in around 30 minutes.

So, for the distributed key/value pair to be useful, it has to be able to handle better than 125,000 insert/update/query per second.

I don't have experience on the distributed key/value store, is 125,000 query per second achievable using 25 nodes? Which means for each node, will have to handle 5000 query/second.