Friday, December 27, 2013

Spark: Low latency, massively parallel processing framework

While Hadoop fits well in most batch processing workload, and is the primary choice of big data processing today, it is not optimized for other types of workload  due to its following limitation
  • Lack of iteration support
  • High latency due to persisting intermediate data onto disk
 For a more detail elaboration of the Hadoop limitation, refer to my previous post.

Nevertheless, the Map/Reduce processing paradigm is a proven mechanism for dealing with large scale data.  On the other hand, many of Hadoop's infrastructure piece such as HDFS, HBase has been mature over time.

In this blog post, we'll look at a different architecture called Spark, which has taken the strength of Hadoop and make improvement in a number of Hadoop's weakness, and provides a more efficient batch processing framework with a much lower latency (from the benchmark result, Spark (using RAM cache) claims to be 100x faster than Hadoop, and 10x faster than Hadoop when running on disk.  Although competing with Hadoop MapRed, Spark integrates well with other parts of Hadoop Ecosystem (such as HDFS, HBase ... etc.).  Spark has generated a lot of excitement in the big data community and represents a very promising parallel execution stack for big data analytics.

Berkeley Spark

Within the Spark cluster, there is a driver program where the application logic execution is started, with multiple workers which processing data in parallel.  Although this is not mandated, data is typically collocated with the worker and partitioned across the same set of machines within the cluster.  During the execution, the driver program will passed code/closure into the worker machine where processing of corresponding partition of data will be conducted.  The data will undergoing different steps of transformation while staying in the same partition as much as possible (to avoid data shuffling across machines).  At the end of the execution, actions will be executed at the worker and result will be returned to the driver program.

Underlying the cluster, there is an important Distributed Data Structure called RDD (Resilient Distributed Dataset), which is a logically centralized entity but physically partitioned across multiple machines inside a cluster based on some notion of key.  Controlling how different RDD are co-partitioned (with the same keys) across machines can reduce inter-machine data shuffling within a cluster.  Spark provides a "partition-by" operator which create a new RDD by redistributing the data in the original RDD across machines within the cluster.

RDD can optionally be cached in RAM and hence providing fast access.  Currently the granularity of caching is done at the RDD level, either the whole or none of the RDD is cached.  Cached is a hint but not a guarantee.  Spark will try to cache the RDD if sufficient memory is available in the cluster, based on LRU (Least Recent Use) eviction algorithm.

RDD provides an abstract data structure from which application logic can be expressed as a sequence of transformation processing, without worrying about the underlying distributed nature of the data.

Typically an application logic are expressed in terms of a sequence of TRANSFORMATION and ACTION.  "Transformation" specifies the processing dependency DAG among RDDs and "Action" specifies what the output will be (ie: the sink node of the DAG with no outgoing edge).  The scheduler will perform a topology sort to determine the execution sequence of the DAG, tracing all the way back to the source nodes, or node that represents a cached RDD.

Notice that dependencies in Spark come in two forms.  "Narrow dependency" means the all partitions of an RDD will be consumed by a single child RDD (but a child RDD is allowed to have multiple parent RDDs).  "Wide dependencies" (e.g. group-by-keys, reduce-by-keys, sort-by-keys) means a parent RDD will be splitted with elements goes to different children RDDs based on their keys.  Notice that RDD with narrow dependencies preserve the key partitioning between parent and child RDD.  Therefore RDD can be co-partitioned with the same keys (parent key range to be a subset of child key range) such that the processing (generating child RDD from parent RDD) can be done within a machine with no data shuffling across network.  On the other hand, RDD will wide dependencies involves data shuffling.

Narrow transformation (involves no data shuffling) includes the following operators
  • Map
  • FlatMap
  • Filter
  • Sample
Wide transformation (involves data shuffling) includes the following operators
  •  SortByKey
  • ReduceByKey
  • GroupByKey
  • CogroupByKey
  • Join
  • Cartesian
Action output the RDD to the external world and includes the following operators
  • Collect
  • Take(n)
  • Reduce
  • ForEach
  • Sample
  • Count
  • Save
The scheduler will examine the type of dependencies and group the narrow dependency RDD into a unit of processing called a stage.  Wide dependencies will span across consecutive stages within the execution and require the number of partition of the child RDD to be explicitly specified.

A typical execution sequence is as follows ...
  1. RDD is created originally from external data sources (e.g. HDFS, Local file ... etc)
  2. RDD undergoes a sequence of TRANSFORMATION (e.g. map, flatMap, filter, groupBy, join), each provide a different RDD that feed into the next transformation.
  3. Finally the last step is an ACTION (e.g. count, collect, save, take), which convert the last RDD into an output to external data sources
The above sequence of processing is called a lineage (outcome of the topological sort of the DAG).  Each RDD produced within the lineage is immutable.  In fact, unless if it is cached, it is used only once to feed the next transformation to produce the next RDD and finally produce some action output.

In a classical distributed system, fault resilience is achieved by replicating data across different machines together with a active monitoring system.  In case of any machine crashes, there is always another copy of data residing in a different machine from where recovery can take place.

Fault resiliency in Spark takes a different approach.  First of all, as a large scale compute cluster, Spark is not meant to be a large scale data cluster at all.  Spark makes two assumptions of its workload.
  • The processing time is finite (although the longer it takes, the cost of recovery after fault will be higher)
  • Data persistence is the responsibility of external data sources, which keeps the data stable within the duration of processing.
Spark has made a tradeoff decision that in case of any data lost during the execution, it will re-execute the previous steps to recover the lost data.  However, this doesn't mean everything done so far is discarded and we need to start from scratch at the beginning.  We just need to re-executed the corresponding partition in the parent RDD which is responsible for generating the lost partitions, in case of narrow dependencies, this resolved to the same machine.

Notice that the re-execution of lost partition is exactly the same as the lazy evaluation of the DAG, which starts from the leaf node of the DAG, tracing back the dependencies on what parent RDD is needed and then eventually track all the way to the source node.  Recomputing the lost partition is done is a similar way, but taking partition as an extra piece of information to determine which parent RDD partition is needed.

However, re-execution across wide dependencies can touch a lot of parent RDD across multiple machines and may cause re-execution of everything. To mitigate this, Spark persist the intermediate data output from a Map phase before it shuffle them to different machines executing the reduce phase.  In case of machine crash, the re-execution (from another surviving machine) just need to trace back to fetch the intermediate data from the corresponding partition of the mapper's persisted output.  Spark also provide a checkpoint API to explicitly persist intermediate RDD so re-execution (when crash) doesn't need to trace all the way back to the beginning.  In future, Spark will perform check-pointing automatically by figuring out a good balance between the latency of recovery and the overhead of check-pointing based on statistical result.

Spark provides a powerful processing framework for building low latency, massively parallel processing for big data analytics.  It supports API around the RDD abstraction with a set of operation for transformation and action for a number of popular programming language like Scala, Java and Python.

In future posts, I'll cover other technologies in the Spark stack including real-time analytics using streaming as well as machine learning frameworks.

Thursday, December 12, 2013

Escape local optimum trap

Optimization is a very common technique in computer science and machine learning to search for the best (or good enough) solution.  Optimization itself is a big topic and involves a wide range of mathematical techniques in different scenarios.

In this post, I will be focusing in local search, which is a very popular technique in searching for an optimal solution based on a series of greedy local moves.  The general setting of local search is as follows ...

1. Define an objective function
2. Pick an initial starting point
3. Repeat
     3.1 Find a neighborhood
     3.2 Locate a subset of neighbors that is a candidate move
     3.3 Select a candidate from the candidate set
     3.4 Move to the candidate

One requirement is that the optimal solution need to be reachable by a sequence of moves.  Usually this requires a proof that any arbitrary state is reachable by any arbitrary state through a sequence of moves.

Notice that there are many possible strategies for each steps in 3.1, 3.2, 3.3.  For example, one strategy can examine all members within the neighborhood, pick the best one (in terms of the objective function) and move to that.  Another strategy can randomly pick a member within the neighborhood, and move to the member if it is better than the current state.

Regardless of the strategies, the general theme is to move towards the members which is improving the objective function, hence the greedy nature of this algorithm.

One downside of this algorithm is that it is possible to be trapped in a local optimum, whose is the best candidate within its neighborhood, but not the best candidate from a global sense.

In the following, we'll explore a couple meta-heuristic techniques that can mitigate the local optimum trap.

Multiple rounds

We basically conduct k rounds of local search, each round getting a local optimum and then pick the best one out of these k local optimum.

Simulated Anealing

This strategy involves a dynamic combination of exploitation (better neighbor) and exploration (random walk to worse neighbor).  The algorithm works in the following way ...

1. Pick an initial starting point
2. Repeat until terminate condition
     2.1 Within neighborhood, pick a random member
     2.2 If neighbor is better than me
           move to the neighbor
           With chance exp(-(myObj - neighborObj)/Temp)
               move to the worse neighbor
     2.3 Temp = alpha * Temp

Tabu Search

This strategy maintains a list of previously visited states (called Tabu list) and make sure these states will not be re-visited in subsequent exploration.  The search will keep exploring the best move but skipping the previously visited nodes.  This way the algorithm will explore the path that hasn't be visited before.  The search also remember the best state obtained so far.

1. Initialization
     1.1 Pick an initial starting point S
     1.2 Initial an empty Tabu list
     1.3 Set the best state to S
     1.4 Put S into the Tabu list
2. Repeat until terminate condition
     2.1 Find a neighborhood
     2.2 Locate a smaller subset that is a candidate move
     2.3 Remove elements that is already in Tabu list
     2.4 Select the best candidate and move there
     2.5 Add the new state to the Tabu list
     2.6 If the new state is better that best state
          2.6.1 Set the best state to this state
     2.7 If the Tabu list is too large
          2.7.1 Trim Tabu list by removing old items