Saturday, June 12, 2010

Strategy for software engineering position interview

From my experience in hiring and attending job interviews, employers are generally looking at 5 areas when hiring a software engineer ...
  1. Technical knowledge of specific technology product
  2. Experience of the business problem domain
  3. General technical and architecture sense
  4. Personality and working style
  5. IQ, problem solving skills
While 1 to 4 are very straightforward to answer, the most interesting and challenging part is the item (5) because there are effectively no time to prepare. Depends on how good you can manage pressure, you brain can be totally blank at the interview session.

I personally believe that a short interview doesn't necessary reflects a person's ability in doing the actual work. I personally have worked with many people who is a high performer in work but a poor interviewee. So don't lose confidence because you fail an interview, there is a factor of luck involved in this process.

Nevertheless, most employers are willing to accept a higher false negative, but the chance of false positive must be very low. Because the pass rate of these IQ and algorithm questions are generally pretty low, it is a very effective means of filtering the candidates. Therefore, be able to tackle this kinds of question well is a critical success factor of interviewing.

Notice that there is no substitution of "good knowledge", "high IQ" and "the ability to speak/think under a pressured environment", but I've found there are some very useful technique and strategies.

1. Rephrase the question slowly in your own words

This can help you to make sure you fully understand the question and clarify if there is any hidden assumptions you have made. Repeat the question "slowly" also gives you more time to think.

From the interview perspective, he/she can see clearly the candidate's ability to digest a problem.

2. Construct a Visual model of the problem

Use a whiteboard, or paper (if this is a phone interview) to diagram the problem that you perceive. Our brain is good in understanding picture than words so having a diagram will be very useful to come up with solution ideas.

From an interview's perspective, he/she can see a clear picture how you analyze the problem.

3. Use a special, simple case to guide you

Never try to tackle the general problem at first, start with a super-simple, special case, and think how you would solve this simple case first. This is very helpful to reduce the amount of things that you need to consider and let you focus in the core part of the problem.

4. Start with a very naive solution as a baseline

Tell the interviewer that you want to start with a very naive solution to establish a baseline. The naive solution can usually be constructed using a brute-force approach (try all combination until you find a matched solution). After that analyze the complexity of this naive solution as a baseline for future comparison.

5. Improve your solution

At this stage, you need to evolve and improve your solution. Here are some general techniques.
  • Divide and conquer: Decompose the problem into smaller ones and solve each sub-problem separately, then combine the solutions for the overall problem.
  • Reduce to well-known algorithm models: Try to model your problem in terms of well-studied computer science data structure model (e.g. Tree, Graph, Search, Sort) and then apply well known algorithm to solve them
  • Recursive structure: Try to structure your problem in a recursive form. Finding the solution for the base case and then expand the solution in a recursive manner.
  • Greedy method: Try to modify attributes of your current best solution to see if you can get a better one. Watch out of being trapped in a local optimal solution.
  • Approximation: Instead of finding an exact solution, try to see if it is acceptable to find an approximate solution. Probabilistic approach (try 100 random combination and pick the best outcome)
6. Keep talking while you think

Don't wait until you fully figure out the answer. Keep talking while you are thinking about the solution so the interviewer understand how you analyzing things, and you are also showing how well you can express your thoughts. It is also easier for the interviewer to guide you or give you hints. And finally you may impressed the interviewer even you cannot get to the exact answer.


7. Generalize your solution for the final answer

After you find a working solution for the simple case, extend the simple case to see how you would solve it. See if you can find a general pattern how the solution would look like. Once you find it, generalize your solution for the general problem

From the interview's perspective, he/she can assess if the candidate can think in different levels of abstraction, and the ability to apply a solution in a broader scope of problem.


8. Remember the interview hasn't ended at the office building

You always have the chance to think through the question that you haven't given satisfactory answer after you walk out from the office. Submit a solution via email once you get back home (do it ASAP though), along with a thank you note to the interviewer.

Sunday, May 2, 2010

The NOSQL Debate

I have attended the Stanford InfoLab conference, and there are 2 panel discussions in Cloud computing transaction processing and analytic processing.

The session turns out to be a debate between people from the academia side with the open source developer. Both sides have their points ...

Background of RDBMS

RDBMS is based on a clear separation between application logic and data management. This loose coupling allows application and DB technologies to be evolved independently.

This philosophy drives the DBMS architecture to be more general in order to support a wide range of applications. Over many years of evolution, it has a well-defined data model + query model based on relational algebra. On the other hand, it also have a well-defined transaction processing model.

On the other hand, applications also benefit from having a unified data model. They have more freedom to switch their DB vendor without too much of code changes.

For OLTP applications, the RDBMS model has been proven to be highly successful in many large enterprises. The success of both Oracle and MySQL can speak to that.

For Analytic applications, the RDBMS model is also used widely for implementing data warehousing based on a STAR schema model (composed of Facts table and Dimension tables).

This model also put DBA into a very important position in the enterprise. They are equipped with sophisticated management tools and specialized knowledge to control the commonly shared DB schema.

Background of NOSQL

In the last 10 years, there are a very few of highly successful web2.0 companies whose applications have gone beyond what a centralized RDBMS can handle. Partition data across many servers and spread the workload across them seems to be a reasonable first thing to try. Many RDBMS solution provides a Master/Slave replication mechanism where one can spread the READ-ONLY workload across many servers, and it helps.

In a partitioned environment, application needs to be more tolerant to data integrity issues, especially when data is asynchronously replicated between servers. The famous CAP theorem from Eric Brewer capture the essence of the tradeoff decisions for highly scalable applications (which must have partition tolerance), they have to choose between "Consistency" and "Availability".

Fortunately, most of these web-scale application have a high tolerance in data integrity, so they choose "availability" over "consistency". This is a very major deviation from the transaction processing model of RDBMS which typically weight "consistency" much higher than "availability".

On the other hand, the data structure used in these web-scale application (e.g. user profile, preference, social graph ... etc) are much richer than the simple row/column/table model. Some of the operations involves navigation within a large social graph which involves too many join operations in a RDBMS model.

The "higher tolerance of data integrity" as well as "efficiency support for rich data structure" challenges some of the very fundamental assumption of RDBMS. Amazon has experimented their large scale key/value store called Dynamo and Google also has their BigTable column-oriented storage model, both are designed from the ground up with a distributed architecture in mind.

Since then, there are many open source clones based on these two models. To represent the movement of this trend, Eric Evans from Rackspace coin a term "NOSQL". This term in my opinion is not accurately reflecting the underlying technologies but nevertheless provide a marketing term for every non RDBMS technologies to get together, even those (e.g. CouchDB, Neo4j) who is not originally trying to tackle the scalability problem.

Today, NOSQL provides an alternative approach for Non-Relational Database.

For analytical application, they also take a highly-parallel brute-force processing model based on the Map/reduce model.

The Debate

There are relatively few criticism on the data model aspects. Jun Rao from IBM Lab summarized the key difference between the philosophy. The traditional RDBMS approach is first to figure out the right model, and then provide an implementation and hope it is scalable. The modern NOSQL approach is doing the opposite by first figuring out how a highly scalable architecture looks like and then layer a data model on top of that. Basically people on both camps agree that we should use a data model that is optimized for the application's access patterns, weakening the "one-size-fits-all" philosophy of RDBMS.

Most of the debate is centered around the transaction processing model itself. Basically RDBMS proponents thinks NOSQL camp hasn't spent enough time to understand the theoretical foundation of the transaction processing model. The new "eventual consistency" model is not well-defined and different implementations may differs significantly with each other. This means figuring out all these inconsistent behavior lands on the application developer's responsibilities and make their life much harder. Hard to reason about the DB's behavior can be very dangerous if the application made wrong assumption about the underlying data integrity guarantees.

While agree that application developers now have more to worry about, the NOSQL camp argues that this is actually a benefit because it gives the domain-specific optimization opportunities back to the application developers who now no longer constrained by a one-size-fits-all model. But they admit that making such optimization decision requires a lot of experience and can be very error-prone and dangerous if not done by experts.

On the other hand, the academia also make a note that the movement to NOSQL may deem fashionable and cool to new technologists who may not have the expertise and skills. The community as a whole should articular the pros and cons of NOSQL.

Notice that this is not the debate of the first time. StoneBraker has written a very good article from the RDBMS side.

Tuesday, March 2, 2010

Two approaches on Multi-tenancy in Cloud

Continue on my previous blog on how multi-tenancy related to cloud computing

My thoughts has changed that now I think both the Amazon approach (Hypervisor isolation) and Salesforce approach (DB isolation) are both valid but attract a different set of audiences.

First of all, increase efficiency through sharing is a fundamental value proposition underlying all cloud computing initiatives, there is no debate that ...
  • We should "share resource" to increase utilization and hence improve efficiency
  • We should accommodate highly dynamic growth and shrink requirement rapidly and smoothly
  • We should "isolate" the tenant so there is no leakage on sensitive information
But at which layer should be facilitate that ? Hypervisor level or DB level.

Hypervisor level Isolation

Hypervisor is a very low-level layer of software that maps the physical machine to a virtualized machine on which a regular OS runs on. When the regular OS issue system calls to the VM, it is intercepted by the Hypervisor which maps to the underlying hardware. The hypervisor also provide some traditional OS functions such as process scheduling to determine which VM to run. Hypervisor can be considered to be a very lean OS that sits very close to the bare hardware.

Depends on the specific implementation, Hypervisor introduces an extra layer of indirection and hence incur a certain % of overhead. If we need a VM with capacity less than a physical machine, Hypervisor allows us to partition the hardware into finer granularity and hence improve the efficiency by having more tenants running on the same physical machine. For light-usage tenant, such increment in efficiency should offset the lost from the overhead.

Since Hypervisor focus on low-level system level primitives, it provides the cleanest separation and hence lessen security concerns. On the other hand, by intercepting at the lowest layer, Hypervisor retain the familar machine model that existing system/network admin are familiar with. Since Application is now completely agnostic to the presence of Hyervisor, this minimize the change required to move existing apps into the cloud and makes cloud adoption easier.

Of course, the downside is that virtualization introduce a certain % of overhead. And the tenant still need to pay for the smallest VM even none of its user is using it.

DB level Isolation

Here is another school of thought, if tenants are running the same kind of application, the only difference is the data each tenant store. Why can't we just introduce an extra attribute "tenantId" in every table and then append a "where tenantId = $thisTenantId" in every query ? In other words, add some hidden column and modify each submitted query.

In additional, the cloud provider usually need to re-architect the underlying data layer and move to a distributed and partitioned DB. Some of the more sophisticate providers also need to invest in developing intelligent data placement algorithm based on workload patterns.

In this approach, the degree of isolating is as good as the rewritten query. In my opinion, this doesn't seem to be hard, although it is less proven than the Hypervisor approach.

The advantage of DB level isolation is there is no VM overhead and there is no minimum charge to the tenant.

However, we should compare these 2 approach not just from a resource utilization / efficiency perspective, but also other perspectives as well, such as ...

Freedom of choice on technology stack

Hypervisor isolation gives it tenant maximum freedom of the underlying technology stack. Each tenant can choose the stack that fits best to its application's need and inhouse IT skills. The tenant can also free to move to latest technologies as they evolve.

This freedom of choice comes with a cost though. The tenant need to hire system administrators to configure and maintain the technology stack.

In a DB level isolation, the tenants are live within a set of predefined data schemas and application flows. So their degree of freedom is limited to whatever the set of parameters that the cloud provider exposes. Also the tenants' applications are "lock-in" to the cloud provider's framework, and a tight coupling and dependency is created between the tenant and the cloud provider.

Of course, the advantage is that there is no administration needed in the technology stack.

Reuse of Domain Specific Logic

Since it focus in the lowest layer of resource sharing, Hypervisor isolation provides no reuse at the app logic level. Tenants need to build their own technology stack from the ground up and write their own application logic.

In the DB isolation approach, the cloud provider pre-defines a set of templates in DB schemas and Application flow logic based on their domain expertise (it is important that the cloud provider must be the recognized expert in that field). The tenant can leverage the cloud provider's domain expertise and focus in purely business operation.

Conclusion

I think each approach will attract a very different (and clearly disjoint) set of audiences.

Notice that DB-level isolation commoditize everything and make it very hard to create product feature differentiations. If I am a technology startup company trying to develop a killer product, then my core value is my domain expertise. In this case, I won't go with the DB-level isolation which impose too much constraints on me to distinguish my product from "anyone else". Hypervisor level isolation much better because I can outsource the infrastructure layer and focus in my core value.

On the other hand, if I am operating a business but not building a product, then I would like to outsource all supporting functions including my applications as well. In this case, I would pick the best app framework provided by the market leader and follow their best practices (also very willing to live by their constraints), the DB level isolation is more compelling in this case.

Monday, March 1, 2010

Search Engine Basics

Receive the question of "how search works ?" couple times recently so try to document the whole process. This is intended to highlight the key concepts but not specific implementation details, which will be much more complicated and sophisticated than this one.

A very basic search engine includes a number of processing phases.
  • Crawling: to discover the web pages on the internet
  • Indexing: to build an index to facilitate query processing
  • Query Procesisng: Extract the most relevant page based on user's query terms
  • Ranking: Order the result based on relevancy


Notice that each element in the above diagram reflects a logical function unit but not its physical boundary. For example, the processing unit in each orange box is in fact executed across many machines in parallel. Similarly, each of the data store element is spread physically across many machines based on the key partitioning.


Vector Space Model

Here we use the "Vector Space Model" where each document is modeled as a multi-dimensional vector (each word represents a dimension). If we put all documents together, we form a matrix where the rows are documents and columns are words, and each cell contains the TF/IDF value of the word within the document.


To determine the similarity between 2 documents, we can apply the dot product between 2 documents and the result will represents the degree of similarity.


Crawler

Crawler's job is to collect web pages on the internet, it is typically done by a farm of crawlers, who do the following

Start from a set of seed URLs, repeat following ...
  1. Pick the URL that has the highest traversal priority.
  2. Download the page content from the URLs to the content repository (which can be a distributed file system, or DHT), as well as update the entry in the doc index
  3. Discover new URL links from the download pages. Add the link relationship into the link index and add these links to the traversal candidates
  4. Prioritize the traversal candidates
The content repository can be any distributed file system, here lets say it is a DHT.

There are a number of considerations.
  • How to make sure different Crawlers are working on different set of contents (rather than crawling the same page twice) ? When the crawler detects overlapping is happening (url is already exist in the page repository with pretty recent time), the crawler will skip the processing on this URL and pick up the next best URL to crawl.
  • How does the crawler determines which is the next candidate to crawl ? We can use a heuristic algorithm based on some utility function (e.g. we can pick the URL candidate which has the highest page rank score)
  • How frequent do we re-crawl ? We can track the rate of changes of the page to determine the frequency of crawling.

Indexer


The Indexer's job is to build the inverted index for the query processor to serve the online search requests.

First the indexer will build the "forward index"
  1. The indexer will parse the documents from the content repository into a token stream.
  2. Build up a "hit list" which describe each occurrence of the token within the document (e.g. position in the doc, font size, is it a title, archor text ... etc).
  3. Apply various "filters" to the token stream (like stop word filters to remove words like "a", "the", or a stemming filter to normalize words "happy", "happily", "happier" into "happy")
  4. Compute the term frequency within the document.
From the forward index, the indexer will proceed to build a reverse index (typically through a Map/Reduce mechanism). The result will be keyed by word and stored in a DHT.


Ranker


Ranker's job is to compute the rank of a document, based on how many in-links pointing to the document as well as the rank of the referrers (hence a recursive definition). Two popular ranking algorithms including the "Page Rank" and "HITs".
  • Page Rank Algorithm
Page rank is a global rank mechanism. It is precomputed upfront and is independent of the query

  • HITS Algorithm
In HITS, every page is playing a dual role: "hub" role and "authority" role. It has two corresponding ranks on these two roles. Hub rank measures the quality of the outlinks. A good hub is one that points to many good authorities. Authority ranks measures the quality of my content. A good authority is one that has many good hubs pointing to.

Notice that HITS doesn't pre-compute the hub and authority score. Instead it invoke a regular search engine (which only do TF/IDF matches but not ranking) to get a set of initial results (typically with a predefined fix size) and then expand this result set by tracing the outlinks into the expand result set. It also incorporate a fix size of inlinks (by sampling the inlinks into the initial result set) into the expanded result set. After this expansion, it runs an iterative algorithm to compute the authority ranks and hub ranks. And use the combination of these 2 ranks to calculate the ultimate rank of each page, usually pages with high hub rank will weight more than high authority rank.

Notice that the HITS algorithm is perform at query time and not pre-computed upfront. The advantage of HITS is that it is sensitive to the query (as compare to PageRank which is not). The disadvantage is that it perform ranking per query and hence expensive.


Query Processor

When user input a search query (containing multiple words), the query will be treated as a "query document". Relevancy is computed and combined with the rank of the document and return an ordered list of result.

There are many ways to compute the relevancy. We can consider only the documents that contains all the terms specified in the query. In this model, we search for each term (with the query) a list of document id and then do an intersection with them. If we order the document list by the document id, the intersection can be computed pretty efficiently.

Alternatively, we can return the union (instead of intersection) of all document and order them by a combination of the page rank TF/IDF score. Document that have more terms intersecting with the query will have a higher TF/IDF score.

In some cases, an automatic query result feedback loop can be used to improve the relevancy.
  1. In first round, the search engine will perform a search (as described above) based on user query
  2. Construct a second round query by expanding the original query with additional terms found in the return documents which has high rank in the first round result
  3. Perform a second round of query and return the result.

Outstanding Issues


Fighting the spammer is a continuous battle in search engine. Because of the financial value of being shown up in the first page of search result. Many spammers try to manipulate their page. Earlier attempt is to modify a page to repeat the terms many many times (trying to increase the TF/IDF score). The evolution of Page rank has mitigate this to some degree because page rank in based on "out-of-page" information that the site owner is much harder to manipulate.

But people use Link-farms to game the page rank algorithms. The ideas is to trade links between different domains. There is active research in this area about how to catch these patterns and discount their ranks

Wednesday, February 17, 2010

Spatial Index RTree

For location-based search, it is very common to search for objects based on their spatial location. e.g. find all restaurants within 5 miles of my current location, or find all schools within the zipcode of 95110 ... etc.

Every spatial object can be represented by an object id, a minimal bounded rectangle (MBR), as well as other attributes. So the space can be represented by a collection of spatial objects. (here we use 2 dimension to illustrate the idea, but the concept can be extended to N dimensions.)

A query can be represented as another rectangle. The query is about locating the spatial objects whose MBR overlaps with the query rectangle.


RTree is a spatial indexing technique such that given a query rectangle, we can quickly locate the spatial object results.

The concept is similar to BTree. We group spatial objects that are close by each other and form a tree whose intermediate nodes contains "close-by" objects. Since the MBR of the parent node contains all MBR of its children, the Objects are close by if their parent's MBR is minimized.


Search

Start from the root, we examine each children's MRB to see if it overlaps with the query MBR. We skip the whole subtree if there is no overlapping, otherwise, we recurse the search by drilling into each child.

Notice that unlike other tree algorithm where only traverse down one path. Our search here needs to traverse down multiple path if the overlaps happen. Therefore, we need to structure the tree to minimize the overlapping as high in the tree as possible. This means we want to minimize the sum of MBR areas along each path (from the root to the leaf) as much as possible.

Insert

To insert a new spatial object, starting from the root node, pick the children node whose MBR will be extended least if the new spatial object is added, walk down this path until reaching the leaf node.

If the leaf node has space left, insert the object to the leaf node and update the MBR of the leaf node as well as all its parents. Otherwise, split the leaf node into two (create a new leaf node and copy some of the content of the original leaf node to this new one). And then add the newly created leaf node to the parent of the original leaf node. If the parent has no space left, the parent will be split as well.

If the split goes all the way to the root, the original root will then be split and a new root is created.

Delete

Deleting a spatial node will first search for the containing leaf node. Remove the spatial node from the leaf node's content and update its MBR and its parent's MBR all the way to the root. If the leaf node now has less than m node, then we need to condense the node by marking the leaf node to be delete. And then we remove the leaf node from its parent's content as well as updating the . If the parent is now less than m node, we mark the parent to be delete also and remote the parent from the parent's parent. At this point, all the node that is marked delete is removed from the RTree.

Notice that the content with these delete node is not all garbage, since they still have some children that are valid nodes (but were removed from the tree). Now we need to reinsert all these valid nodes back in the tree.

Finally, we check if the root node contains only one child, we throw away the original root and use its own child to become the new root.

Update

Update happens when an existing spatial node changes its dimension. One way is to just change the spatial node's MBR but not change the RTree. A better way (but more expensive) is to delete the node, modify it MBR and then insert it back to the RTree.

Thursday, February 11, 2010

Cloud MapReduce Tricks

Cloud Map/Reduce developed by Huan Liu and Dan Orban offers some good lessons in how to design applications on top of Cloud environment, which has a set of characteristics and constraints.

Although it is providing the same map, reduce programming model to the application developer, the underlying implementation architecture of Cloud MR is drastically different from Hadoop. For a description of Hadoop internals, here is it.

Build on top of a Cloud OS (which is Amazon AWS), Cloud MR enjoys the inherit scalability and resiliency, which greatly simplifies its architecture.
  1. Cloud MR doesn't need to design a central coordinator components (like the NameNode and JobTracker in the Hadoop environment). They simply store the job progress status information in the distributed metadata store (SimpleDB).
  2. Cloud MR doesn't need to worry about scalability in the communication path and how data can be moved efficiently between nodes, all is taken care by the underlying CloudOS
  3. Cloud MR doesn't need to worry about disk I/O issue because all storage is effectively remote and being taken care by the Cloud OS.
On the other hand, Cloud OS has a set of constraints and limitations that the design of Cloud MR has to deal with
  • Network latency and throughput : 20 - 100 ms for SQS access, SimpleDB domain write througput is 30 - 40 items/sec
  • Eventual consistency : 2 simultaneous requests to dequeue from SQS can both get the same message. SQS sometimes report empty when there are still messages in the queue.
Cloud MR use a "parallel path" technique to overcome the network throughput issue. The basic idea is to read/write data across multiple network paths so the effective throughput is the aggregated result of individual path.

Cloud MR use a "double check" technique to overcome the consistency issue. Writer will write status into multiple places and reader will read from multiple places also. If the reader read inconsistent result from different place, that means the eventual consistent state hasn't arrived yet so it needs to retry later. When the state read from different places agrees with each other, eventual consistency has arrived and the state is valid.

Following describe the technical details of Cloud MR ...

Cloud MR Architecture


SimpleDB is used to store Job status. Client submit jobs to SimpleDB, Map and Reduce workers update and extract job status from the SimpleDB. The actual data of each job is stored in SQS (which can also points to an Object stored in S3).

So the job progress in the following way

Job Client Processing Cycle
  1. Store data in many S3 file objects
  2. Create a Mapper task request for each file split (each map task request contains a reference to the S3 object and the byte range).
  3. Create an input queue in SQS and enqueue each Mapper task request to it.
  4. Create a master reduce queue, an result output queue as well as multiple partition queues.
  5. Create one reducer task request for each partition queue. Each reducer task request contains a pointer to the partition queue.
  6. Enqueue the reducer task requests to the master reducer queue
  7. Create a job request that contains a mapper task count S as well as a reference to all the SQS queue created so far.
  8. Add the job request into SimpleDB
  9. Invoke AWS commands to start the EC2 instances for Mappers and Reducers, passing along queue and SimpleDB locations as "user data" to the EC2 instances.
  10. From this point onwards, poll the SimpleDB on the job progress status
  11. When the job is complete, download the result from output queue and S3
Mapper Processing Cycle
  1. Take a mapper task request from the SQS input queue
  2. Fetch the file split and parse out each record
  3. Invoke user defined map() function, for each emit intermediate key, perform a (hash(k1) % no_of_partitions). Enqueue the intermediate record to the corresponding partition queue.
  4. When done with the mapper task request, write a commit record containing worker id, map request id and number of records processed count per partition (in other words, R[i][j] where i is the map request and j is the partition no.).
  5. Remove the map task request from the SQS input queue
Due to the eventual consistency model constraint, Mapper cannot stop processing even when it sees the input queue is empty. Instead it count all the commit records to make sure the unique map request id has been sum up equal to the mapper task count S in the Job request. When this happens, it enters the reduce phase.

It is possible that a Mapper worker crashes before it finishes the mapper task, so another mapper will re-process the map task request (after the SQS timeout). Or due to eventual consistency model, it is possible to have 2 simultaneous mappers working on the same file splits. In both case, it is possible of causing some duplications in the partition queues.

To facilitates the duplicate elimination, each intermediate records emit by the mapper will be tagged with [map request id, worker id, a unique number]

Reducer Processing Cycle
  1. Monitor SimpleDB and wait for seeing commit records from all mappers.
  2. Dequeue a reducer task request from the master reducer queue
  3. Go to the corresponding partition queue, dequeue each intermediate record
  4. Invoke user define reduce() function and write the reducer output to the output queue
  5. When done with the reducer task request, write a commit record in a similar way as the Mapper worker
  6. Remove the reduce task request from the master reducer queue
To eliminate duplicated intermediate messages in the partition queue, each Reducer will first query the SimpleDB for all the commit records written by the successful mapper worker. If there are different workers working on the same mapper request, there maybe be multiple commit records with the same mapper request id. In this case, the Reduce will arbitrary pick the winner and then all intermediary records tagged with the mapper request id but not the winner worker id will be discarded.

Similar to the Mapper, Reducer j will not stop getting the message from the partition queue even when it is empty, it will keep reading the message up to the sum all R[i][j] over i.

Due to eventual consistency, it is possible that multiple reducers dequeue the same reducer task request from the master reducer queue and then taking messages from the same partition queue. Since they are competing on the same partition queue, one of them will find the queue is empty before they reach the sum of R[i][j] over i. After certain timeout period, the reducer will write a "suspect conflict" record (containing its worker id) in the SimpleDB. If it found another reducer has written such record, it knows there is another reducer working in the same partition. Workers with the lowest id is the loser and so the reducer will keep reading until it sees another conflict record with a lower id, then it will drop off existing processing and pickup another reducer task. All the records read by the loser will come back to the queue after the timeout period, and will be picked up by the winner.

Network Latency and Throughput

One of the SimpleDB-specific implementation constraint is they read and write throughput is very asymmetric. While the read response is very fast, the write is slow. To mitigate this asymmetry, Cloud MR using multiple domains in SimpleDB. When it writes to SimpleDB, it randomly pick one domain and write to it. This way, the write request workload is spread across multiple domains. When it reads from SimpleDB, it read every domain and aggregate the results (since one domain will have the result).

To overcome the latency issue of SQS, CloudMR at the Mapper side uses buffering technique to batch up intermediate messages (destined to the same partition), A message buffer is 8k size (the maximum size of a SQS message). When the buffer is full (or after some timeout period), a designated thread will flush the buffer by writing a composite message (which contains all the intermediate records) into the SQS.


The reducer side works in a similar way, multiple read threads will dequeue message from the partition queue and put them in a Read buffer, where the Reducer will read the intermediate messages. Notice that it is possible to have 2 threads reading the same message from the partition queue (remember the eventual consistency scenario described above). To eliminate the potential duplicated message, the reducer will examine the unique number tagged with the message and discard the message that has seen before.

Difference with Hadoop

Map/Reduce developers familiar with Hadoop implementation will find Cloud MR behaves in a similar way. But there are a number of difference that I want to highlight here.
  • Reducer key is not sorted: Unlike Hadoop which guarantee that keys arrived at the same partition (or reducer) will be in sorted order, Cloud MR doesn't provide such feature. Application need to do their own sorting if they need the sort order.

For a detail technical description of Cloud MR, as well as how it is compared with Hadoop, read the original paper from Huan Liu and Dan Orban

Thursday, February 4, 2010

NoSQL GraphDB

I received some constructive criticism regarding my previous blog in NoSQL patterns that I covered only the key/value store but have left out Graph DB.

The Property Graph Model

A property graph is a collection of Nodes and Directed Arcs. Each node represents an entity and has an unique id as well as a Node Type. The Node Type defines a set of metadata that the node has. Each arc represents a unidirectional relationship between two entities and has an Arc Type. The Arc Type defines a set of metadata that the arc has.


General Graph Processing

I found many of the graph algorithms follows a general processing pattern. There are multiple rounds of (sequential) processing iterations. Within each iteration, there are a set of active nodes that perform local processing in parallel. The local processing can modify the node's properties, adding or removing links to other nodes, as well as sending message across links. All message passing are done after the local processing.


This model is similar to the Google Pregel model.

Notice that this model maps well into parallel computing environment where the processing of the set of active node can be spread across multiple processors (or multiple machines in a cluster)

Notice that all messages from all in-coming links are arrived before the link changes within local processing. On the other hand, all message send to all out-going links after the links have changed after the local processing. The term "local processing" means it cannot modify the properties of other nodes or other links.

Because two nodes can simultaneously modify the link in-between them, the following conflicts can happen
  • A node delete a link while other node modifies the link properties.
  • Both nodes on each side modify the properties of the link in-between
A conflict resolution mechanism needs to be attached to each Arc Type to determine how the conflict should be resolved, either determining who is the winner or how to merge the effects.

Neo4j provide a restricted, single-threaded graph traversal model
  • At each round, the set of active nodes is always a single node
  • The set of active nodes of next round is determined by the traversal policy (breath or depth-first), but is still a single node
  • It offer a callback function to determine whether this node should be included in the result set
Gremlin, on the other hand, provides an interactive graph traversal model where user can step through each iteration. It uses an XPath like syntax to express the navigation.
  • The node is expressed as Node(id, inE, outE, properties)
  • The arc is expressed as Arc(id, type, inV, outV, properties)
As an example, if the graph represents a relationship between 2 types of entities. Person writes Book, then given a person, her co-author can be expressed in the following XPath expression
./outE[@type='write']/inV/inE[@type='write']/outV

Path Algebra

Marko Rodriguez has described of a set of matrix operations when the Graph is represented as adjacency matrix. Graph algorithms can be describe as an algebraic form.

Traverse operation can be expressed as Matrix multiplication. If A is the adjacency matrix of the graph. Then A.A represents connection with path of length = 2.

Similar, Merge operation can be expressed as Matrix addition. For example, (A + A.A + A.A.A) represent connectivity within 3 degree of reach.

In a special case when the graph represents a relationship between 2 types of entities. e.g. if A represents a authoring relationship (person write a book). Then A.(A.transpose()) represents co-author relationship (person co-author with another person).

Marko also introduce a set of Filter operations, (not filter / clip filter /column filter /row filter /vertex filter)

Map Reduce
Depends on the traversal strategies inherit from the graph algorithms, certain algorithms which has higher sequential dependency doesn't fit well into parallel computing. For example, graph algorithms with a breath-first search nature fits better into parallel computing paradigm with those that has a depth-first search nature. On the other hand, perform search at all nodes fits better in parallel computing than perform search at a particular start node.

There are different storage representation of graph, from incident list, incident matrix, adjacency list and adjacency matrix. For sparse graph (which is the majority cases), lets assume adjacency list is used for the storage model for subsequent discussion.

In other words, the graph is represented as a list of records, each record is [node, connected_nodes].

There are many graph algorithms and it is not my intend is have an exhausted list. Below are the one that I have used in my previous projects that can be translated into Map/Reduce form.

Topological Sort is commonly used to sort out a work schedule based on dependency tree. It can be done as follows ...

# Topological Sort

# Input records
[node_id, prerequisite_ids]

# Output records[node_id, prerequisite_ids, dependent_ids]
class BuildDependentsJob {
 map(node, prerequisite_ids) {
   for each prerequisite_id in prerequisite_ids {
     emit(prerequisite_id, node)
   }
 }

 reduce(node, dependent_ids) {
   emit(node, [node, prerequisite_ids, dependent_ids])
 }
}

class BuildReadyToRunJob {
 map(node, node) {
   if ! done?(node) and node.prerequisite_ids.empty? {
     result_set.append(node)
     done(node)
     for each dependent_id in dependent_ids {
       emit(dependent_id, node)
     }
   }
 }

 reduce(node, done_prerequsite_ids) {
   remove_prerequisites(node, done_prerequsite_ids)
 }
}

# Topological Sort main program
main() {
 JobClient.submit(BuildDependentsJob.new)
 Result result = []

 result_size_before_job = 0
 result_size_after_job = 1

 while (result_size_before_job < result_size_after_job) {
   result_size_before_job = result.size
   JobClient.submit(BuildReadyToRunJob.new)
   result_size_after_job = result.size
 }

 return result
}

Minimum spanning tree is a pretty common algorithm, the Prim's algorithm looks like the following.
# Minimum Spanning Tree (MST) using Prim's algorithm

Adjacency Matrix, W[i][j] represents weights
W[i][j] = infinity if node i, j is disconnected

MST has nodes in array N = [] and arcs A = []
E[i] = minimum weighted edge connecting to the skeleton
D[i] = weight of E[i]

Initially, pick a random node r into N[]
N = [r] and A = []
D[r] = 0; D[i] = W[i][r];

Repeat until N[] contains all nodes
 Pick node k outside N[] where D[k] is minimum
 Add node k to N; Add E[k] to A
 for all node p connected to node k
   if W[p][k] < D[p]
     D[p] = W[p][k]
     E[p] = k
   end
 end
end

We are doing the map/reduce here because it is very similar to another popular algorithm single source shortest path. The Map/Reduce form of the SPSS based on Dijkstra's algorithm is as follows ...
# Single Source Shortest Path (SSSP) based on Dijkstra

Adjacency Matrix, W[i][j] represents weights of arc
  connecting node i to node j
W[i][j] = infinity if node i, j is disconnected

SSSP has nodes in array N = []
L[i] = Length of minimum path so far from the source node
Path[i] = Identified shortest path from source to i

Initially, put the source node s into N[]
N = [s]
L[s] = 0; L[i] = W[s][i];
Path[i] = arc[s][i] for all nodes directly connected
from source.

Repeat until N[] contains all nodes
 Pick node k outside N[] where L[k] is minimum
 Add node k to N;
 for all node p connected from node k {
   if L[k] + W[k][p] < L[p] {
     L[p] = L[k] + W[k][p]
     Path[p] = Path[k].append(Arc[k][p])
   }
 }
end repeat


# Here is the map/reduce pseudo code would look like

class FindMinimumJob
 map(node_id, path_length) {
   if not N.contains(node_id) {
     emit(1, [path_length, node_id])
   }
 }

 reduce(k, v) {
   min_node, min_length = minimum(v)
   for each node in min_node.connected_nodes {
     emit(node, min_node)
   }
 }
}

class UpdateMinPathJob {
 map(node, min_node) {
   if L[min_node] + W[min_node][node] < L[node] {
     update L[node] = L[min_node] + W[min_node][node]
      Path[node] =
       Path[min_node].append(arc(min_node, node))
    }
 }
}
# Single Source Shortest Path main program
main() {
 init()
 while (not N.contains(V)) {
   JobClient.submit(FindMinimumJob.new)    JobClient.submit(UpdateMinPathJob.new)  }

 return Path
}

The same SSSP problem can also be solved using breath-first search. The intuition is to grow a frontier from the source at each iteration and update the shortest distance from the source.

# Single Source Shortest Path (SSSP) using BFS

Adjacency Matrix, W[i][j] represents weights of arc
  connecting node i to node j
W[i][j] = infinity if node i, j is disconnected

Frontier nodes in array F
L[i] = Length of minimum path so far from the source node
Path[i] = Identified shortest path from source to i

Initially,
F = [s]
L[s] = 0; L[i] = W[s][i];
Path[i] = arc[s][i] for all nodes directly connected
from source.

# input is all nodes in the frontier F
# output is frontier of next round FF

class GrowFrontierJob {
 map(node) {
   for each to_node in node.connected_nodes {
     emit(to_node, [node, L[node] + W[node][to_node]])
   }
 }

 reduce(node, from_list) {
   for each from in from_list {
     from_node = from[0]
     length_via_from_node = from[1]
     if (length_via_from_node < L[node] {
       L[node] = length_via_from_node
       Path[node] =
         Path[from_node].append(arc(from_node, node))
       FF.add(node)
     }
   }
 }
}

# Single Source Shortest Path BFS main program
main() {
 init()
 while (F is non-empty) {
   JobClient.set_input(F)
   JobClient.submit(FindMinimumJob.new)
   copy FF to F
   clear FF
 }

 return Path
}