Monday, April 28, 2008

Parallelism with Map/Reduce

We explore the Map/Reduce approach to turn sequential algorithm into parallel

Map/Reduce Overview

Since the "reduce" operation need to accumulate results for the whole job, as well as communication overhead in sending and collecting data, Map/Reduce model is more suitable for long running, batch-oriented jobs.

In the Map/Reduce model, "parallelism" is achieved via a "split/sort/merge/join" process and is described as follows.
  • A MapReduce Job starts from a predefined set of Input data (usually sitting in some directory of a distributed file system). A master daemon (which is a central co-ordinator) is started and get the job configuration.
  • According to the job config, the master daemon will start multiple Mapper daemons as well as Reducer daemons in different machines. And then it start the input reader to read data from some DFS directory. The input reader will chunk the read data accordingly and send them to "randomly" chosen Mapper. This is the "split" phase and begins the parallelism.
  • After getting the data chunks, the mapper daemon will run a "user-supplied map function" and produce a collection of (key, value) pairs. Each item within this collection will be sorted according to the key and then send to the corresponding Reducer daemon. This is the "sort" phase.
  • All items with the same key will come to the same Reducer daemon, which collect all the items of that key and invoke a "user-supplied reduce function" and produce a single entry (key, aggregatedValue) as a result. This is the "merge" phase.
  • The output of reducer daemon will be collected by the Output writer, which is effective the "join" phase and ends the parallelism.
Here is an simple word-counting example ...






















Sunday, April 27, 2008

Bayesian Classifier

Classification Problem

Observing an instance x, determine its class. (e.g. Given a Email, determine if this is a spam).


Solution Approach

Based on probability theory, the solution is class[j] which has maximum chance to produce x.

Also, x can be represents by a set of observed features (a set of predicates). ie: x = a0 ^ a1 ^ a2 ... ^ ak

For each class[j], calculate j which maximize P(class[j] | x).

Also assume we have already gone through a learning stage where a lot of (x, class) has been taught

Note that X is a huge space, it is unlikely that we have seen x during training. Therefore, apply Bayes theorem:

P(class[j] | x) = P(x | class[j]) * P(class[j]) / P(x)

Since P(x) is the same for all j, we can remove P(x).

Find j to maximize: P(a0 ^ a1 ... ^ ak | class[j]) * P(class[j])

P(class[j]) = no_of_training_instances_whose_class_equals_classJ / total_no_of_training_instances. (This is easy to find).

Now P(a0 ^ a1 ... ^ ak | class[j]) is very hard to find because you probably have not met this combination during the training.

Lets say we have some domain knowledge and we understand the dependency relationship between a0, a1 ... We can make some assumptions.


Naive Bayes

So if we know a0, a1 ... ak are "independent of each other given knowing class == class[j], then

P(a0 ^ a1 ... ^ ak | class[j]) is same as P(a0 | class[j]) x P(a1 | class[j]) x .... x P(ak | class[j])

Now P(ak | class[j]) = no_of_instances_has_ak_and_classJ / no_of_instances_has_classJ (This is easy to find)


Spam Filtering Example

x is an Email. class[0] = spam, class[1] = non-spam

Lets break down the observed instance x as a vector of words.

  • x = ["Hello", "there", "welcome", .....]
  • a0 is position[0] == "Hello"
  • a1 is position[1] == "there"
  • a2 is position[2] == "welcome"

We assume position[k] == "Hello" is the same for all k and the occurrence of words are independent of each other given a particular class

Therefore, we try to compare between ...

  • P(a0 ^ a1 ... ^ ak | spam) * P(spam)
  • P(a0 ^ a1 ... ^ ak | nonspam) * P(nonspam)

P(a0 ^ a1 ... ^ak | spam) is the same as:

P(pos[0] == "Hello" | spam) x P(pos[1] == "there" | spam) x .... x P(ak | spam) * P(spam)

P(pos[0] == "Hello" | spam) = no_of_hello_in_spam_email / total_words_in_spam_email


Algorithm

Class NaiveBayes {

def initialize(word_dictionary) {
@word_dictionary = word_dictionary
}

def learn(doc, class) {
@total_instances += 1
@class_count[class] += 1
for each word in doc {
@word_count_by_class[class][word] += 1
@total_word[class] += 1
}
}

def classify(doc) {
for each class in ["spam", "nonspam"] {
prob[class] = @class_count[class] / @total_instances
for k in 0 .. doc.length {
word = doc[k]
prob[class] *= (@word_count[_by_class[class][word] + 1) / (@total_word[class] + @word_dictionary.length)
}
if max_prob < prob[class] {
max_prob = prob[class]
max_class = class
}
}
return max_class
}
}

Bayesian Network

Sometimes, assuming complete independence is too extreme. We need to relax this assumption by letting some possible dependencies among a0, a1 ... ak.

We can draw a dependency graph (called Bayesian network) between features. For example, if we know ak depends on a2, then a node a2 will have an arc pointing to ak.

P(a0 ^ a1 ... ^ ak | class[j]) = P(a0 | class[j]) x P(a1 | class[j]) x .... x P(ak | a2 ^ class[j])

Now P(ak | a2 ^ class[j]) = no_of_instances_has_ak_a2_and_classJ / no_of_instances_has_a2_and_classJ (this is harder to find than Naive Bayes but still much better).

Tuesday, April 15, 2008

Parallelizing Algorithms

The growth of a single CPU has been limited by physical factors such as clock rate, generated heat, power ... etc. Current trend is moving to multi-core system, ie: multiple CPU within a chip, multiple CPU within a machine, or just a cluster of machines connected to a high speed network.

However, most traditional algorithms are developed in a sequential way (which is easier to design and analyze). Without redesigning the algorithm in a parallelized form, they are not ready to run on multiple CPUs. Recently, Google's Map/Reduce model has gained momentum to become the de facto approach to handle high volume processing using large number of low-cost commodity hardware. In the Opensource community, Hadoop is a Java clone of Google's Map/Reduce model, and there are a couple of Ruby clone as well. Since then, parallelizing traditionally sequential algorithm to run on a multi-CPU network has been drawing a lot of attention in the software community.

Model

A sequential algorithm contains a number of "steps" ordered by the sequence of execution. Parallelizing such an algorithm means trying to run these steps "simultaneously" on multiple CPUs, and hopefully can speed up the whole process of execution.

Lets define T(p) to be the time it takes to execute the algorithm in p CPUs.
So, T(1) is the time takes to execute on a single CPU.
Obviously, T(p) >= T(1) / p.

When T(p) == T(1) / p, we say it has linear speedup. Unfortunately, linear speedup is usually not possible when p increase beyond a certain number, due to "sequential dependency" and "coordination overhead".


Sequential Dependency
StepA and StepB cannot be executed simultaneously if there is a sequential dependency between them. Sequential dependency means one step cannot be started before the other step has completed, which happens if
  • StepB reads some data that StepA writes
  • StepA reads some data that StepB writes
  • StepA and StepB write to same data
Let T(infinity) be the execution time given infinite number of CPUs. Due to sequential dependency, at some point throwing in more CPUs won't help. If we use a DAG to represent dependency, T(infinity) is the time take to execute the longest path within the DAG.

T(p) >= max(T(1)/p, T(infinity))


Coordination Overhead

Even steps can be execute in parallel, there are certain processing overhead such as
  • Data need to be transfered to the corresponding CPU before processing can take place
  • Schedule the CPU for execution and keep track of their corresponding work load
  • Monitor the completion of all parallel tasks and move forward to next steps
We need to make sure the coordination overhead does not offset the gain in parallelizing the execution. That means we cannot break the steps into too fine-grain, we need to control the granularity of the steps at the right level.


Design Goal

Given T(p) >= max(T(1)/p, T(infinity)), there is no benefit to increase p beyond T(1)/T(infinity), which is called parallelism.

  • Let O-1(n) be the time complexity of the parallel algorithm when there is one CPU
  • Let O-infinity(n) be the time complexity of the parallel algorithm when there is infinite CPUs

  • Our goal is to design the parallel algorithm to maximize parallelism: O-1(n) / O-infinity(n).

    If we can do this, we can throw more CPUs to help when n increases.

    Recall master method
    T(n) = a.T(n/b) + f(n)

    case 1:  if  f(n) << n ** log(a, base=b)
            T(n) = O(n ** log(a, base=b))
    
    case 2:  if  f(n) ~ n ** log(a, base=b)
            T(n) = O((lg(n) ** k+1) * (n ** log(a, base=b)))
    
    case 3:  if  f(n) >> n ** log(a, base=b)
            T(n) = O(f(n))

    Lets walk through an example ... of adding two arrays of size n.

    Sequential Algorithm:
    def sum(a, b)
     for i in 0 .. a.size
       c[i] = a[i] + b[i]
     return c
    
    This is of O(n) complexity


    Parallel Algorithm:
    def sum(a, b, start, end)
     if start == end
       c[start] = a[start] + b[start]
       return
    
     mid = start + (end - start) / 2
    
     spawn sum(a, b, start, mid)
     spawn sum(a, b, mid, end)

    For a single CPU, the algorithm will be ...

    T(n) = 2.T(n/2) + O(1)
    This is case 1, and so it is O(n)

    For infinite number of CPU, the algorithm will be ...
    T(n) = T(n/2) + O(1)
    This is case 2, k = 0, so it is O(lg(n))

    So the parallelism = O(n / lg(n))

    In other words, we can improve the performance from the sequential algorithm O(n) to the parallel algorithm O(n/p) by throwing in p CPUs. And the growth of p is limited by n/lg(n)

    Friday, April 11, 2008

    REST design pattern

    Based on the same architectural pattern of the web, "REST" has a growing dominance of the SOA (Service Oriented Architecture) implementation these days. In this article, we will discuss some basic design principles of REST.

    SOAP : The Remote Procedure Call Model

    Before the REST become a dominance, most of SOA architecture are built around WS* stack, which is fundamentally a RPC (Remote Procedure Call) model. Under this model, "Service" is structured as some "Procedure" exposed by the system.

    For example, WSDL is used to define the procedure call syntax (such as the procedure name, the parameter and their structure). SOAP is used to define how to encode the procedure call into an XML string. And there are other WS* standards define higher level protocols such as how to pass security credentials around, how to do transactional procedure call, how to discover the service location ... etc.

    Unfortunately, the WS* stack are getting so complicated that it takes a steep learning curve before it can be used. On the other hand, it is not achieving its original goal of inter-operability (probably deal to different interpretation of what the spec says).

    In the last 2 years, WS* technology development has been slowed down and the momentum has been shifted to another model; REST.

    REST: The Resource Oriented Model

    REST (REpresentation State Transfer) is introduced by Roy Fielding when he captured the basic architectural pattern that make the web so successful. Observing how the web pages are organized and how they are linked to each other, REST is modeled around a large number of "Resources" which "link" among each other. As a significant difference with WS*, REST raises the importance of "Resources" as well as its "Linkage", on the other hand, it push down the importance of "Procedures".

    Unlike the WS* model, "Service" in the REST is organized as large number of "Resources". Each resource will have a URI that make it globally identifiable. A resource is represented by some format of "Representation" which is typically extracted by an idempotent HTTP GET. The representation may embed other URI which refers to other resources. This emulates an HTML link between web pages and provide a powerful way for the client to discover other services by traversing its links. It also make building SOA search engine possible.

    On the other hand, REST down play the "Procedure" aspect and define a small number of "action" based on existing HTTP Methods. As we discussed above, HTTP GET is used to get a representation of the resource. To modify a resource, REST use HTTP PUT with the new representation embedded inside the HTTP Body. To delete a resource, REST use HTTP DELETE. To get metadata of a resource, REST use HTTP HEAD. Notice that in all these cases, the HTTP Body doesn't carry any information about the "Procedure". This is quite different from WS* SOAP where the request is always made using HTTP POST.

    At the first glance, it seems REST is quite limiting in terms of the number of procedures that it can supported. It turns out this is not the case, REST allows any "Procedure" (which has a side effect) to use HTTP POST. Effectively, REST categorize the operations by its nature and associate well-defined semantics with these categories (ie: GET for read-only, PUT for update, DELETE for remove, all above are idempotent) while provide an extension mechanism for application-specific operations (ie: POST for application procedures which may be non-idempotent).


    URI Naming Convention

    Since resource is usually mapped to some state in the system, analyzing its lifecycle is an important step when designing how a resource is created and how an URI should be structured.

    Typically there are some eternal, singleton "Factory Resource" which create other resources. Factory resource typically represents the "type" of resources. Factory resource usually have a static, well-known URI, which is suffixed by a plural form of the resource type. Some examples are ...
    http://xyz.com/books
    http://xyz.com/users
    http://xyz.com/orders

    "Resource Instance", which are created by the "Factory Resource" usually represents an instance of that resource type. "Resource instances" typically have a limited life span. Their URI typically contains some unique identifier so that the corresponding instance of the resource can be located. Some examples are ...
    http://xyz.com/books/4545
    http://xyz.com/users/123
    http://xyz.com/orders/2008/04/10/1001

    If this object is a singleton object of that type, the id is not needed.
    http://www.xyz.com/library

    "Dependent Resource" are typically created and owned by an existing resource during part of its life cycle. Therefore "dependent resource" has an implicit life-cycle dependency on its owning parent. When a parent resource is deleted, all the dependent resource it owns will be deleted automatically. Dependent resource use an URI which has prefix of its parent resource URI. Some examples are ...
    http://xyz.com/books/4545/tableofcontent
    http://xyz.com/users/123/shopping_cart

    Creating Resource

    HTTP PUT is also used to create the object if the caller has complete control of assigning the object id, the request body contains the representation of the Object after successful creation.
    PUT /library/books/668102 HTTP/1.1
    Host: www.xyz.com
    Content-Type: application/xml
    Content-Length: nnn
    
    <book>
    <title>Restful design</title>
    <author>Ricky</author>
    </book>
    HTTP/1.1 201 Created

    If the caller has no control in the object id, HTTP POST is made to the object's parent container with the request body contains the representation of the Object. The response body should contain a reference to the URL of the created object.
    POST /library/books HTTP/1.1
    Host: www.xyz.com
    Content-Type: application/xml
    Content-Length: nnn
    
    <book>
    <title>Restful design</title>
    <author>Ricky</author>
    </book>
    HTTP/1.1 301 Moved PermanentlyLocation: /library/books/668102
    

    To create a resource instance of a particular resource type, make an HTTP POST to the Factory Resource URI. If the creation is successful, the response will contain a URI of the resource that has been created.

    To create a book ...
    POST /books HTTP/1.1
    Host: xyz.com
    Content-Type: application/xml; charset=utf-8
    Content-Length: nnn
    
    <book>
    <title>...</title>
    <author>Ricky Ho</author>
    </book>
    HTTP/1.1 201 Created
    Content-Type: application/xml; charset=utf-8
    Location: /books/4545
    
    <ref>http://xyz.com/books/4545</ref>

    To create a dependent resource, make an HTTP POST (or PUT) to its owning resource's URI

    To upload the content of a book (using HTTP POST) ...
    POST  /books/4545  HTTP/1.1
    Host: example.org
    Content-Type: application/pdf
    Content-Length: nnnn
    
    {pdf data}
    HTTP/1.1 201 Created
    Content-Type: application/pdf
    Location: /books/4545/content
    
    <ref>http://xyz.com/books/4545/tableofcontent</ref>

    HTTP POST is typically used to create a resource when its URI is unknown to the client before its creation. However, if the URI is known to the client, then an idempotent HTTP PUT should be used with the URI of the resource to be created. For example, the

    To upload the content of a book (using HTTP PUT) ...
    PUT  /books/4545/tableofcontent  HTTP/1.1
    Host: example.org
    Content-Type: application/pdf
    Content-Length: nnnn
    
    {pdf data}
    HTTP/1.1 200 OK

    Finding Resources

    Make an HTTP GET to the factory resource URI, criteria pass in as parameters.
    (Note that it is up to the factory resource to interpret the query parameter).

    To search for books with a certain author ...
    GET /books?author=Ricky HTTP/1.1
    Host: xyz.com
    Content-Type: application/xml; charset=utf-8
    
    HTTP/1.1 200 OK
    Content-Type: application/xml; charset=utf-8
    Content-Length: nnn
    
    <books>
    <book>
    <ref>http://xyz.com/books/4545</ref>
    <title>...</title>
    <author>Ricky</author>
    </book>
    <book>
    <ref>http://xyz.com/books/4546</ref>
    <title>...</title>
    <author>Ricky</author>
    </book>
    </books>

    Another school of thoughts is to embed the criteria in the URI path, such as ...
    http://xyz.com/books/author/Ricky

    I personally prefers the query parameters mechanism because it doesn't imply any order of search criteria.


    Lookup a particular resource

    Make an HTTP GET to the resource object URI

    Lookup a particular book...
    GET /books/4545 HTTP/1.1
    Host: xyz.com
    Content-Type: application/xml; charset=utf-8
    HTTP/1.1 200 OK
    Content-Type: application/xml; charset=utf-8
    Content-Length: nnn
    
    <book>
    <title>...</title>
    <author>Ricky Ho</author>
    </book>

    In case the resource have multiple representation format. The client should specify within the HTTP header "Accept" of its request what format she is expecting.


    Lookup a dependent resource

    Make an HTTP GET to the dependent resource object URI

    Download the table of content of a particular book...
    GET /books/4545/tableofcontent HTTP/1.1
    Host: xyz.com
    Content-Type: application/pdf
    HTTP/1.1 200 OK
    Content-Type: application/pdf
    Content-Length: nnn
    
    {pdf data}
    

    Modify a resource

    Make an HTTP PUT to the resource object URI, pass in the new object representation in the HTTP body

    Change the book title ...
    PUT /books/4545 HTTP/1.1
    Host: xyz.com
    Content-Type: application/xml; charset=utf-8
    Content-Length: nnn
    
    <book>
    <title>Changed title</title>
    <author>Ricky Ho</author>
    </book>
    HTTP/1.1 200 OK
    

    Delete a resource

    Make an HTTP DELETE to the resource object URI

    Delete a book ...
    DELETE /books/4545 HTTP/1.1
    Host: xyz.com
    HTTP/1.1 200 OK
    

    Resource Reference

    In some cases, we do not want to create a new resource, but we want to add a "reference" to an existing resource. e.g. consider a book is added into a shopping cart, which is another resource.

    Add a book into the shopping cart ...
    POST  /users/123/shopping_cart  HTTP/1.1
    Host: xyz.com
    Content-Type: application/xml; charset=utf-8
    Content-Length: nnn
    
    <?xml version="1.0" ?>
    <add>
    <ref>http://xyz.com/books/4545</ref>
    </add>
    HTTP/1.1 200 OK

    Show all items of the shopping cart ...
    GET  /users/123/shopping_cart  HTTP/1.1
    Host: xyz.com
    Content-Type: application/xml; charset=utf-8
    HTTP/1.1 200 OK
    Content-Type: application/xml; charset=utf-8
    Content-Length: nnn
    
    <?xml version="1.0" ?>
    <shopping_cart>
    <ref>http://xyz.com/books/4545</ref>
    ...
    <shopping_cart>
    Note that the shopping cart resource contains "resource reference" which acts as links to other resources (which is the books). Such linkages create a resource web so that client can discovery and navigate across different resources.


    Remove a book from the shopping cart ...
    POST  /users/123/shopping_cart  HTTP/1.1
    Host: xyz.com
    Content-Type: application/xml; charset=utf-8
    Content-Length: nnn
    
    <?xml version="1.0" ?>
    <remove>
    <ref>http://xyz.com/books/4545</ref>
    </remove>
    HTTP/1.1 200 OK
    Note that we are using HTTP POST rather than HTTP DELETE to remove a resource reference. This is because we are remove a link but not the actual resource itself. In this case, the book still exist after it is taken out from the shopping cart.

    Note that what the book is deleted, that all the shopping cart that refers to that book need to be fixed in an application specific way. One way is to do lazy checking. In other words, wait until the shopping cart checking out to check the book existence and fix it at that point.

    Checkout the shopping cart ...
    POST  /orders  HTTP/1.1
    Host: xyz.com
    Content-Type: application/xml; charset=utf-8
    Content-Length: nnn
    
    <?xml version="1.0" ?>
    <ref>http://xyz.com/users/123/shopping_cart</ref>
    HTTP/1.1 201 Created
    Content-Type: application/xml; charset=utf-8
    Location: /orders/2008/04/10/1001
    
    <?xml version="1.0" ?>
    <ref>http://xyz.com/orders/2008/04/10/1001</ref>
    Note that here the checkout is implemented by creating another resource "Order" which is used to keep track of the fulfillment of the purchase.

    Asynchronous Request

    In case when the operation takes a long time to complete, an asynchronous mode should be used. In a polling approach, a transient transaction resource is return immediately to the caller. The caller can then use GET request to poll for the result of the operation

    We can also use a notification approach. In this case, the caller pass along a callback URI when making the request. The server will invoke the callback URI to POST the result when it is done.

    The basic idea is to immediately create a "Transaction Resource" to return back to the client. While the actual processing happens asynchronously in the background, the client at any time, can poll the "Transaction Resource" for the latest processing status.

    Lets look at an example to request for printing a book, which may take a long time to complete

    Print a book

    POST  /books/123  HTTP/1.1
    Host: xyz.com
    Content-Type: application/xml; charset=utf-8
    Content-Length: nnn
    
    ?xml version="1.0" ?>
    <print>http://xyz.com/printers/abc</print>
    HTTP/1.1 200 OK
    Content-Type: application/xml; charset=utf-8
    Location: /transactions/1234
    
    <?xml version="1.0" ?>
    <ref>http://xyz.com/transactions/1234</ref>
    Note that a response is created immediately which contains the URI of a transaction resource, even before the print job is started. Client can poll the transaction resource to obtain the latest status of the print job.

    Check the status of the print Job ...
    GET /transactions/1234 HTTP/1.1
    Host: xyz.com
    Content-Type: application/xml; charset=utf-8
    HTTP/1.1 200 OK
    Content-Type: application/xml; charset=utf-8
    Content-Length: nnn
    
    <transaction>
    <type>PrintJob</type>
    <status>In Progress</status>
    </transaction>
    It is also possible to cancel the transaction if it is not already completed.

    Cancel the print job

    POST  /transactions/1234  HTTP/1.1
    Host: xyz.com
    Content-Type: application/xml; charset=utf-8
    Content-Length: nnn
    
    ?xml version="1.0" ?>
    <cancel/>
    HTTP/1.1 200 OK


    Conclusion
    The Resource Oriented Model that REST advocates provides a more natural fit for our service web. Therefore, I suggest that SOA implementation should take the REST model as a default approach.