Wednesday, November 5, 2014

Common data science project flow

As working across multiple data science projects, I observed a similar pattern across a group of strategic data science projects where a common methodology can be used.  In this post, I want to sketch this methodology at a high level.

First of all, "data science" itself is a very generic term that means different things to different people.  For the projects I involved, many of them target to solve a very tactical and specific problem.  However, over the last few years more and more enterprises start to realize the strategic value of data.  I observed a growing number of strategic data science projects were started from a very broad scope and took a top-down approach to look at the overall business operation.  Along the way, the enterprise prioritize the most critical areas within their operation cycle and build sophisticated models to guide and automate the decision process.

Usually, my engagement started as a data scientist / consultant, with very little (or even no) domain knowledge.  Being unfamiliar with the domain is nothing to be proud of and often slow down my initial discussion.  Therefore, within a squeezed time period I need to quickly learn enough "basic domain knowledge" to facilitate the discussion smooth.  On the other hand, lacking a per-conceived model enables me (or you can say force me) to look from a fresh-eye view, from which I can trim off unnecessary details from the legacies and only focus on those essential elements that contributes to the core part of the data model.  It is also fun to go through the concept blending process between a data scientist and a domain expert.  I force them to think in my way and they force me to think in their way.  This is by far the most effective way for me to learn any new concepts.

Recently I had a discussion with a company who has a small, but very sophisticated data science team that build pricing model, and demand forecasting for their product line.  I am, by no means an expert in their domain.  But their problem (how to predict demand, and how to set price) is general enough across many industries.  Therefore, I will use this problem as an example to illustration the major steps in the common pattern that I describe above.

Problem Settings

Lets say a car manufacturer starts its quarterly planning process.  Here are some key decisions that need to be made by the management.
  • How many cars the company should produce for next year ?
  • What should be the renew price of the cars ?
First of all, we need to identify the ultimate "goal" of these decisions.  Such goal is usually easy to find as it usually in the company's mission statement.

In this problem, the goal is to ...
maximize: "Profit_2015"

In general, I find it is a good start to look at the problem from an "optimization" angle, from which we define our goal in terms of an objective function as well as a set of constraints.

Step 1:  Identify variables and define its dependency graph

Build the dependency graph between different variables starting from the Objective function.  Separate between the decision variables (where you have control) and environment variable (where you have no control).

As an illustrative example, we start from our objective function "Profit_2015" and define the dependency relationship below. Decision variable is highlighted in blue.

Profit_2015 = F(UnitSold_2015, UnitProduced_2015, Price_2015, Cost_2015)
UnitSold_2015 = G(Supply_2015, Demand_2015, Price_2015, CompetitorPrice_2015)
Demand_2015 = H(GDP_2014, PhoneSold_2014)
GDP_2015 = T(GDP_2014, GDP_2013, GDP_2012, GDP_2011 ...)
...

Identifying these variable and their potential dependencies typically come from a well-studied theory from University, or domain experts in the industry.  At this stage, we don't need to know the exact formula of the function F/G/H.  We only need to capture the links between the variables.  It is also ok to include a link that shouldn't have exist (ie: there is no relationship between the 2 variables in reality).  However, it is not good if we miss a link (ie: fail to capture a strong, existing dependency).

This round usually involves 4 to 5 half day brainstorming sessions with the domain experts, facilitated by the data scientist/consultant who is familiar with the model building process.  There may be additional investigation, background studies if the subject matter experts doesn't exist.  Starting from scratch, this round can take somewhere between couple weeks to couple months

Step 2:  Define the dependency function

In this round,  we want to identify the relationship between variable using formula of F(), G(), H().


Well-Known Function
For some relationship that is well-studied, we can use a known mathematical model.

For example, in the relationship
Profit_2015 = F(UnitSold_2015, UnitProduced_2015, Price_2015, Cost_2015)

We can use the following Mathematical formula in a very straightforward manner
Profit = (UnitSold * Price) - (UnitProduced * Cost)


Semi-Known Function
However, some of the relationship is not as straightforward as that.  For those relationship that we don't exactly know the formula, but can make a reasonable assumption on the shape of the formula, we can assume the relationship follows a family of models (e.g. Linear, Quadratic ... etc.), and then figure out the optimal parameters that best fit the historical data.

For example, in the relationship
Demand_2015 = H(GDP_2014, PhoneSold_2014)

Lets assume the "demand" is a linear combination of "GDP" and "Phone sold", which seems to be a reasonable assumption.

For the linear model we assume
Demand = w0 + (w1 * GDP) + (w2 * PhoneSold)

Then we feed the historical training data to a build a linear regression model and figure out what the fittest value of w0, w1, w2 should be.


Time-Series Function
In some cases, a variable depends only on its own past value but not other variables, here we can train a Time Series model to predict the variable based on its own past values.  Typically, the model is decomposed into 3 components; Noise, Trend and Seasonality.  One popular approach is to use exponential smoothing techniques such as Holt/Winters model.  Another popular approach is to use the ARIMA model which decomposed the value into "Auto-Regression" and "Moving-Average".

For example, in the relationship
GDP_2015 = T(GDP_2014, GDP_2013, GDP_2012, GDP_2011 ...)

We can use TimeSeries model to learn the relationship between the historical data to its future value.


Completely Unknown Function
But if we cannot even assume the model family, we can consider using "k nearest neighbor" approach to interpolate the output from its input.  We need to define the "distance function" between data points based on domain knowledge and also to figure out what the optimal value of k should be.  In many case, using a weighted average of the k-nearest neighbor is a good interpolation.

For example, in the relationship
UnitSold_2015 = G(Supply_2015, Demand_2015, Price_2015, CompetitorPrice_2015)
 It is unclear what model to be used in representing UnitSold as a function of Supply, Demand, Price and CompetitorPrice.  So we go with a nearest neighbor approach.

Based on monthly sales of past 3 years, we can use "Euclidean distance" (we can also consider scaling the data to a comparable range by minus its mean and divide by its standard deviation) to find out the closest 5 neighbors, and then using the weighted average to predict the unit sold.
 

Step 3: Optimization

At this point, we have the following defined
  • A goal defined by maximizing (or minimizing) an objective function
  • A set of variables (including the decision and environment variables)
  • A set of functions that define how these variables are inter-related to each other.  Some of them is defined by a mathematical formula and some of them is defined as a black-box (base on a predictive model)
Our goal is to figure out what the decision variables (which we have control) should be set such that the objective function is optimized (maximized or minimized).

Determine the value of environment variables
For those environment variables that has no dependencies on other variables, we can acquire their value from external data sources.  For those environment variables that has dependencies on other environment variables (but not decision variables), we can estimate their value using the corresponding dependency function (of course, we need to estimate all its depending variables first).  For those environment variables that has dependencies (direct or indirect) on decision variables, leave it as undefined.

Determine the best value of decision variables
Once we formulate the dependency function, depends on the format of these function, we can employ different optimization methods.  Here is how I choose the appropriate method based on the formulation of dependency functions.



Additional Challenges

To summarize, we have following the process below
  1. Define an objective function, constraints, decision variables and environment variables
  2. Identify the relationship between different variables
  3. Collect or predict those environment variables
  4. Optimize those decision variables based on the objective functions
  5. Return the optimal value of decision variables as the answer
So far, our dependency graph is acyclic where our decision won't affect the underlying variables.  Although this is reasonably true if the enterprise is an insignificant small player in the market, it is no longer true if the enterprise is one of the few major players.  For example, their pricing strategy may causes other competitors to change their own pricing strategy as well.  And how the competitors would react is less predictable and historical data play a less important role here.  At some point, human judgement will get involved to fill the gaps.



Sunday, August 17, 2014

Lambda Architecture Principles

"Lambda Architecture" (introduced by Nathan Marz) has gained a lot of traction recently.  Fundamentally, it is a set of design patterns of dealing with Batch and Real time data processing workflow that fuel many organization's business operations.  Although I don't realize any novice ideas has been introduced, it is the first time these principles are being outlined in such a clear and unambiguous manner.

In this post, I'd like to summarize the key principles of the Lambda architecture, focus more in the underlying design principles and less in the choice of implementation technologies, which I may have a different favors from Nathan.

One important distinction of Lambda architecture is that it has a clear separation between the batch processing pipeline (ie: Batch Layer) and the real-time processing pipeline (ie: Real-time Layer).  Such separation provides a means to localize and isolate complexity for handling data update.  To handle real-time query, Lambda architecture provide a mechanism (ie: Serving Layer) to merge/combine data from the Batch Layer and Real-time Layer and return the latest information to the user.

Data Source Entry

At the very beginning, data flows in Lambda architecture as follows ...
  • Transaction data starts streaming in from OLTP system during business operations.  Transaction data ingestion can be materialized in the form of records in OLTP systems, or text lines in App log files, or incoming API calls, or an event queue (e.g. Kafka)
  • This transaction data stream is replicated and fed into both the Batch Layer and Realtime Layer
Here is an overall architecture diagram for Lambda.



Batch Layer

For storing the ground truth, "Master dataset" is the most fundamental DB that captures all basic event happens.  It stores data in the most "raw" form (and hence the finest granularity) that can be used to compute any perspective at any given point in time.  As long as we can maintain the correctness of master dataset, every perspective of data view derived from it will be automatically correct.

Given maintaining the correctness of master dataset is crucial, to avoid the complexity of maintenance, master dataset is "immutable".  Specifically data can only be appended while update and delete are disallowed.  By disallowing changes of existing data, it avoids the complexity of handling the conflicting concurrent update completely.

Here is a conceptual schema of how the master dataset can be structured.  The center green table represents the old, traditional-way of storing data in RDBMS.  The surrounding blue tables illustrates the schema of how the master dataset can be structured, with some key highlights
  • Data are partitioned by columns and stored in different tables.  Columns that are closely related can be stored in the same table
  • NULL values are not stored
  • Each data record is associated with a time stamp since then the record is valid




Notice that every piece of data is tagged with a time stamp at which the data is changed (or more precisely, a change record that represents the data modification is created).  The latest state of an object can be retrieved by extracting the version of the object with the largest time stamp.

Although master dataset stores data in the finest granularity and therefore can be used to compute result of any query, it usually take a long time to perform such computation if the processing starts with such raw form.  To speed up the query processing, various data at intermediate form (called Batch View) that aligns closer to the query will be generated in a periodic manner.  These batch views (instead of the original master dataset) will be used to serve the real-time query processing. 

To generate these batch views, the "Batch Layer" use a massively parallel, brute force approach to process the original master dataset.  Notice that since data in master data set is timestamped, the data candidate can be identified simply from those that has the time stamp later than the last round of batch processing.  Although less efficient, Lambda architecture advocates that at each round of batch view generation, the previous batch view should just be simply discarded and the new batch view is computed  from master dataset.  This simple-mind, compute-from-scratch approach has some good properties in stopping error propagation (since error cannot be accumulated), but the processing may not be optimized and may take a longer time to finish.  This can increase the "staleness" of the batch view.

Real time Layer

As discussed above, generating the batch view requires scanning a large volume of master dataset that takes few hours.  The batch view will therefore be stale for at least the processing time duration (ie: between the start and end of the Batch processing).  But the maximum staleness can be up to the time period between the end of this Batch processing and the end of next Batch processing (ie: the batch cycle).  The following diagram illustrate this staleness.


Even the batch view is stale period, business operates as usual and transaction data will be streamed in continuously.  To answer user's query with the latest, up-to-date information.  The business transaction records need to be captured and merged into the real-time view.  This is the responsibility of the Real-time Layer.  To reduce the latency of latest information availability close to zero, the merge mechanism has to be done in an incremental manner such that no batching delaying the processing will be introduced.  This requires the real time view update to be very different from the batch view update, which can tolerate a high latency.  The end goal is that the latest information that is not captured in the Batch view will be made available in the Realtime view.

The logic of doing the incremental merge on Realtime view is application specific.  As a common use case, lets say we want to compute a set of summary statistics (e.g. mean, count, max, min, sum, standard deviation, percentile) of the transaction data since the last batch view update.  To compute the sum, we can simply add the new transaction data to the existing sum and then write the new sum back to the real-time view.  To compute the mean, we can multiply the existing count with existing mean, adding the transaction sum and then divide by the existing count plus one.  To implement this logic, we need to READ data from the Realtime view, perform the merge and WRITE the data back to the Realtime view.  This requires the Realtime serving DB (which host the Realtime view) to support both random READ and WRITE.  Fortunately, since the realtime view only need to store the stale data up to one batch cycle, its scale is limited to some degree.
Once the batch view update is completed, the real-time layer will discard the data from the real time serving DB that has time stamp earlier than the batch processing.  This not only limit the data volume of Realtime serving DB, but also allows any data inconsistency (of the realtime view) to be clean up eventually.  This drastically reduce the requirement of sophisticated multi-user, large scale DB.  Many DB system support multiple user random read/write and can be used for this purpose.

Serving Layer

The serving layer is responsible to host the batch view (in the batch serving database) as well as hosting the real-time view (in the real-time serving database).  Due to very different accessing pattern, the batch serving DB has a quite different characteristic from the real-time serving DB.

As mentioned in above, while required to support efficient random read at large scale data volume, the batch serving DB doesn't need to support random write because data will only be bulk-loaded into the batch serving DB.  On the other hand, the real-time serving DB will be incrementally (and continuously) updated by the real-time layer, and therefore need to support both random read and random write.

To maintain the batch serving DB updated, the serving layer need to periodically check the batch layer progression to determine whether a later round of batch view generation is finished.  If so, bulk load the batch view into the batch serving DB.  After completing the bulk load, the batch serving DB has contained the latest version of batch view and some data in the real-time view is expired and therefore can be deleted.  The serving layer will orchestrate these processes.  This purge action is especially important to keep the size of the real-time serving DB small and hence can limit the complexity for handling real-time, concurrent read/write.

To process a real-time query, the serving layer disseminates the incoming query into 2 different sub-queries and forward them to both the Batch serving DB and Realtime serving DB, apply application-specific logic to combine/merge their corresponding result and form a single response to the query.  Since the data in the real-time view and batch view are different from a timestamp perspective, the combine/merge is typically done by concatenate the results together.  In case of any conflict (same time stamp), the one from Batch view will overwrite the one from Realtime view.

Final Thoughts

By separating different responsibility into different layers, the Lambda architecture can leverage different optimization techniques specifically designed for different constraints.  For example, the Batch Layer focuses in large scale data processing using simple, start-from-scratch approach and not worrying about the processing latency.  On the other hand, the Real-time Layer covers where the Batch Layer left off and focus in low-latency merging of the latest information and no need to worry about large scale.  Finally the Serving Layer is responsible to stitch together the Batch View and Realtime View to provide the final complete picture.

The clear demarcation of responsibility also enable different technology stacks to be utilized at each layer and hence can tailor more closely to the organization's specific business need.  Nevertheless, using a very different mechanism to update the Batch view (ie: start-from-scratch) and Realtime view (ie: incremental merge) requires two different algorithm implementation and code base to handle the same type of data.  This can increase the code maintenance effort and can be considered to be the price to pay for bridging the fundamental gap between the "scalability" and "low latency" need.

Nathan's Lambda architecture also introduce a set of candidate technologies which he has developed and used in his past projects (e.g. Hadoop for storing Master dataset, Hadoop for generating Batch view, ElephantDB for batch serving DB, Cassandra for realtime serving DB, STORM for generating Realtime view).  The beauty of Lambda architecture is that the choice of technologies is completely decoupled so I intentionally do not describe any of their details in this post.  On the other hand, I have my own favorite which is different and that will be covered in my future posts.

Sunday, July 27, 2014

Incorporate domain knowledge into predictive model

As a data scientist / consultant, in many cases we are being called in to work with domain experts who has in-depth business knowledge of industry settings.  The main objective is to help our clients to validate and quantify the intuition of existing domain knowledge based on empirical data, and remove any judgement bias.  In many cases, customers will also want to build a predictive model to automate their business decision making process.

To create a predictive model, feature engineering (defining the set of input) is a key part if not the most important.  In this post, I'd like to share my experience in how to come up with the initial set of features and how to evolve it as we learn more.

Firstly, we need to acknowledge two forces in this setting
  1. Domain experts tends to be narrowly focused (and potentially biased) towards their prior experience.  Their domain knowledge can usually encoded in terms of "business rules" and tends to be simple and obvious (if it is too complex and hidden, human brain is not good at picking them up).
  2. Data scientist tends to be less biased and good at mining through a large set of signals to determine how relevant they are in an objective and quantitative manner.  Unfortunately, raw data rarely gives strong signals.  And lacking the domain expertise, data scientist alone will not even be able to come up with a good set of features (usually requires derivation from the combination of raw data).  Notice that trying out all combinations are impractical because there are infinite number of ways to combine raw data.  Also, when you have too many features in the input, the training data will not be enough and resulting in model with high variance.
Maintain a balance between these forces is a critical success factor of many data science project.

This best project settings (in my opinion) is to let the data scientist to take control in the whole exercise (as less bias has an advantage) while guided by input from domain experts.

Indicator Feature

This is a binary variable based on a very specific boolean condition (ie: true or false) that the domain expert believe to be highly indicative to the output.  For example, for predicting stock, one indicator feature is whether the stock has been drop more than 15 % in a day.

Notice that indicator features can be added at any time once a new boolean condition is discovered by the domain expert.  Indicators features doesn't need to be independent to each other and in fact most of the time they are highly inter-correlated.

After fitting these indicator features into the predictive model, we can see how many influence each of these features is asserting in the final prediction and hence providing a feedback to the domain experts about the strength of these signals.

Derived Feature

This is a numeric variable (ie: quantity) that the domain expert believe to be important to predicting the output.  The idea is same as indicator feature except it is numeric in nature.

Expert Stacking

Here we build a predictive model whose input features are taking from each of the expert's prediction output.  For example, to predict the stock, our model takes 20 analyst's prediction as its input.

The strength of this approach is that it can incorporate domain expertise very easily because it treat them as a blackbox (without needing to understand their logic).  The model we training will take into account the relative accuracy of each expert's prediction and adjust its weighting accordingly.  On the other hand, one weakness is the reliance of domain expertise during the prediction, which may or may not be available in an on-going manner.

Saturday, June 28, 2014

Interactive Data Visualization

Recently, "interactive report" is becoming a hot topic in data visualization.  I believe it is becoming the next generation UI paradigm for KPI reports.

Interactive report is sitting somewhere in between static report and BI tools …

Executive KPI report today

Today most executive reports are "static report" provided by financial experts by pulling data from various ERP system on the regular basis, summarize these raw data in a highly condensed and simplified form, then generate a static report for the execs.  When the exec gets the report, it is already in a summarized form that is customized based on his/her prior requirement.  There is no way to ask any other question that the report is not already showing.  Of course, the exec can ask for a separate report which requires additional development time and effort on his/her staff, but also need to wait for the new report to be developed.

This is a suboptimal situation.  In order to survive or maintain leadership in today's highly competitive business environment, execs not just need a much broader perspective (from wide variety of operation data) to make his/her decision, but also he/she has to make the decision fast.  Static report cannot fulfill this need.



Business Intelligence Tools


On the other hand, BI tools (such as Tableau) or OLAP tools can do very detail analysis in wide range of data sources.  However, using these tools to perform more detail analysis (such as slice/dice/rollup/drilldown) typically requires specially trained data analysis skills.  In reality, very few execs use these tools directly.  What they do is to ask their data analyst to prepare a static report for them using these BI tools.  The exec still get a "static report" although it is provided by the BI tools.  Whenever they need to ask a different question, they need to go back to the data analyst and ask to prepare a separate report.



There is a gap between the static report and BI tool.

Interactive Report

"Interactive Report" provides a new paradigm to fill this gap.  It has the following characteristics …
  • Like a static report, "Interactive Report" is still based on "static data", which is a fixed set of data generated in a periodic batch fashion.
  • Unlike static report, this pre-generated "static data" is much larger and wider that covers a broader scope of questions that the execs may ask.
  • Because the "static data" is large and wide, it is impossible to visualize all aspects in the report.  Therefore, only one perspective of the static data (based on the exec's pre-specified requirement) is shown in the report.
  • However, if the exec wants to ask a different question, he/she can switch to a different perspective of the same "static data".

By providing a much large volume of static data, "interactive report" provides a more dynamic data navigation experience to the execs to find out the answer of their ad/hoc unplanned questions.



There are many open source technologies (such as Googlevis...) to support interactive data visualization from which the "interactive report" can be built.  And many of them provides a programmatic interface with R so now data scientist without much Javascript experience can produce highly interactive web pages.

Wednesday, March 12, 2014

Common Text Mining workflow

In this post, I want to summarize a common pattern that I have used in my previous text mining projects.

Text mining is different in that it uses vocabulary term as a key elements in feature engineering, otherwise it is quite similar to a statistical data mining project.  Following are the key steps ...
  1. Determine the "object" that we are interested to analyze.  In some cases, the text document itself is the object (e.g. an  email).  In other cases, the text document is providing information about the object (e.g. user comment of a product, tweaks about a company)
  2. Determine the features of the object we are interested, and create the corresponding feature vector of the object.
  3. Feed the data (each object and its corresponding set of features) to standard descriptive analytics and predictive analytics techniques.
The overall process of text mining can be described in the following flow ...



Extract docs

In this phase, we are extracting text document from various types of external sources into a text index (for subsequent search) as well as a text corpus (for text mining).

Document source can be a public web site, an internal file system, or a SaaS offerings.  Extracting documents typically involves one of the following ...
  • Perform a google search or crawl a predefined list of web sites, then download the web page from the list of URL, parse the DOM to extract text data from its sub-elements, and eventually creating one or multiple documents, store them into the text index as well as text Corpus.
  • Invoke the Twitter API to search for tweets (or monitor a particular topic stream of tweets), store them into the text index and text Corpus.
  • There is no limit in where to download the text data.  In an intranet environment, this can be downloading text document from a share drive.  On the other hand, in a compromised computer, user's email or IM can also be download from the virus agent.
  • If the text is in a different language, we may also invoke some machine translation service (e.g. Google translate) to convert the language to English.
Once the document is stored in the text index (e.g. Lucene index), it is available for search.  Also, once the document is stored in the text corpus, further text processing will be involved.

Transformation

After the document is stored in the Corpus, here are some typical transformations ...
  • If we want to extract information about some entities mentioned in the document, we need to conduct sentence segmentation, paragraph segmentation in order to provide some local context from which we can analyze the entity with respect to its relationship with other entities.
  • Attach Part-Of-Speech tagging, or Entity tagging (person, place, company) to each word.
  • Apply standard text processing such as lower case, removing punctuation, removing numbers, removing stopword, stemming.
  • Perform domain specific conversion such as replace dddd-dd-dd with , (ddd)ddd-dddd to , remove header and footer template text, remove terms according to domain-specific stop-word dictionary.
  • Optionally, normalize the words to its synonyms using Wordnet or domain specific dictionary.

Extract Features

For text mining, the "bag-of-words model" is commonly used as the feature set.  In this model, each document is represented as a word vector (a high dimensional vector with magnitude represents the importance of that word in the document).  Hence all documents within the corpus is represented as a giant document/term matrix.  The "term" can be generalized as uni-gram, bi-gram, tri-gram or n-gram, while the cell value in the matrix represents the frequency of the term appears in the document.  We can also use TF/IDF as the cell value to dampen the importance of those terms if it appears in many documents.  If we just want to represent whether the term appears in the document, we can binarize the cell value into 0 or 1.

After this phase, the Corpus will turn into a large and sparse document term matrix.

Reduce Dimensions

Since each row in the document/term matrix represents each document as a high dimension vector (with each dimension represents the occurrence of each term), there are two reasons we want to reduce its dimension ...
  1. For efficiency reason, we want to reduce the memory footprint for storing the corpus
  2. We want to transform the vector from the "term" space to a "topic" space, which allows document of similar topics to situate close by each other even they use different terms.  (e.g. document using the word "pet" and "cat" are map to the same topic based on their co-occurrence)
SVD (Singular Value Decomposition) is a common matrix factorization technique to convert a "term" vector into a "concept" vector.  SVD can be used to factor a large sparse matrix of M by N into the multiplication of three smaller dense matrix M*K, K*K, K*N.  Latent Semantic Indexing (LSI) is applying the SVD in the document term matrix.

Another popular technique call topic modeling, based on LDA (Latent Dirichlet Allocation) is also commonly used to transform the document into a smaller set of topic dimensions.

Apply standard data mining

At this point, each document is represented as a topic vector.  We can also add more domain specific features (such as for spam detection, whether the document contains certain word or character patterns such as '$', '!').  After that we can feed the each vector into the regular machine learning process.

Tools and Library

I have used Python's NLTK as well as R's TM, topicmodel library for performing the text mining work that I described above.  Both of these library provide a good set of features for mining text documents.

Monday, March 3, 2014

Estimating statistics via Bootstrapping and Monte Carlo simulation

We want to estimate some "statistics" (e.g. average income, 95 percentile height, variance of weight ... etc.) from a population.

It will be too tedious to enumerate all members of the whole population.  For efficiency reason, we randomly pick a number samples from the population, compute the statistics of the sample set to estimate the corresponding statistics of the population.  We understand the estimation done this way (via random sampling) can deviate from the population.  Therefore, in additional to our estimated statistics, we also include a "standard error" (how big our estimation may be deviated from the actual population statistics) or a "confidence interval" (a lower and upper bound of the statistics which we are confident about containing the true statistics).

The challenge is how do we estimate the "standard error" or the "confidence interval".  A straightforward way is to repeat the sampling exercise many times, each time we create a different sample set from which we compute one estimation.  Then we look across all estimations from different sample sets to estimate the standard error and confidence interval of the estimation.

But what if collecting data from a different sample set is expensive, or for any reason the population is no longer assessable after we collected our first sample set.  Bootstrapping provides a way to address this ...

Bootstrapping

Instead of creating additional sample sets from the population, we create additional sample sets by re-sampling data (with replacement) from the original sample set.  Each of the created sample set will follow the same data distribution of the original sample set, which in turns, follow the population.

R provides a nice "bootstrap" library to do this.

> library(boot)
> # Generate a population
> population.weight <- rnorm(100000, 160, 60)
> # Lets say we care about the ninety percentile
> quantile(population.weight, 0.9)
     90% 
236.8105 
> # We create our first sample set of 500 samples
> sample_set1 <- sample(population.weight, 500)
> # Here is our sample statistic of ninety percentile
> quantile(sample_set1, 0.9)
     90% 
232.3641 
> # Notice that the sample statistics deviates from the population statistics
> # We want to estimate how big is this deviation by using bootstrapping
> # I need to define my function to compute the statistics
> ninety_percentile <- function(x, idx) {return(quantile(x[idx], 0.9))}
> # Bootstrapping will call this function many times with different idx
> boot_result <- boot(data=sample_set1, statistic=ninety_percentile, R=1000)
> boot_result

ORDINARY NONPARAMETRIC BOOTSTRAP


Call:
boot(data = sample_set1, statistic = ninety_percentile, R = 1000)


Bootstrap Statistics :
    original   bias    std. error
t1* 232.3641 2.379859     5.43342
> plot(boot_result)
> boot.ci(boot_result, type="bca")
BOOTSTRAP CONFIDENCE INTERVAL CALCULATIONS
Based on 1000 bootstrap replicates

CALL : 
boot.ci(boot.out = boot_result, type = "bca")

Intervals : 
Level       BCa          
95%   (227.2, 248.1 )  
Calculations and Intervals on Original Scale


Here is the visual output of the bootstrap plot

Bootstrapping is a powerful simulation technique for estimate any statistics in an empirical way.  It is also non-parametric because it doesn't assume any model as well as parameters and just use the original sample set to estimate the statistics. 

If we assume certain distribution model want to see the distribution of certain statistics.  Monte Carlo simulation provides a powerful way for this.

Monte Carlo Simulation

The idea is pretty simple, based on a particular distribution function (defined by a specific model parameters), we generate many sets of samples.  We compute the statistics of each sample set and see how the statistics distributed across different sample sets.

For example, given a normal distribution population, what is the probability distribution of the max value of 5 randomly chosen samples.

> sample_stats <- rep(0, 1000)
> for (i in 1:1000) {
+     sample_stats[i] <- max(rnorm(5))
+ }
> mean(sample_stats)
[1] 1.153008
> sd(sample_stats)
[1] 0.6584022
> par(mfrow=c(1,2))
> hist(sample_stats, breaks=30)
> qqnorm(sample_stats)
> qqline(sample_stats)


Here is the distribution of the "max(5)" statistics, which shows some right skewness

Bootstrapping and Monte Carlo simulation is a powerful tool to estimate statistics in an empirical manner, especially when we don't have an analytic form of solution.

Friday, December 27, 2013

Spark: Low latency, massively parallel processing framework

While Hadoop fits well in most batch processing workload, and is the primary choice of big data processing today, it is not optimized for other types of workload  due to its following limitation
  • Lack of iteration support
  • High latency due to persisting intermediate data onto disk
 For a more detail elaboration of the Hadoop limitation, refer to my previous post.

Nevertheless, the Map/Reduce processing paradigm is a proven mechanism for dealing with large scale data.  On the other hand, many of Hadoop's infrastructure piece such as HDFS, HBase has been mature over time.

In this blog post, we'll look at a different architecture called Spark, which has taken the strength of Hadoop and make improvement in a number of Hadoop's weakness, and provides a more efficient batch processing framework with a much lower latency (from the benchmark result, Spark (using RAM cache) claims to be 100x faster than Hadoop, and 10x faster than Hadoop when running on disk.  Although competing with Hadoop MapRed, Spark integrates well with other parts of Hadoop Ecosystem (such as HDFS, HBase ... etc.).  Spark has generated a lot of excitement in the big data community and represents a very promising parallel execution stack for big data analytics.

Berkeley Spark

Within the Spark cluster, there is a driver program where the application logic execution is started, with multiple workers which processing data in parallel.  Although this is not mandated, data is typically collocated with the worker and partitioned across the same set of machines within the cluster.  During the execution, the driver program will passed code/closure into the worker machine where processing of corresponding partition of data will be conducted.  The data will undergoing different steps of transformation while staying in the same partition as much as possible (to avoid data shuffling across machines).  At the end of the execution, actions will be executed at the worker and result will be returned to the driver program.


Underlying the cluster, there is an important Distributed Data Structure called RDD (Resilient Distributed Dataset), which is a logically centralized entity but physically partitioned across multiple machines inside a cluster based on some notion of key.  Controlling how different RDD are co-partitioned (with the same keys) across machines can reduce inter-machine data shuffling within a cluster.  Spark provides a "partition-by" operator which create a new RDD by redistributing the data in the original RDD across machines within the cluster.



RDD can optionally be cached in RAM and hence providing fast access.  Currently the granularity of caching is done at the RDD level, either the whole or none of the RDD is cached.  Cached is a hint but not a guarantee.  Spark will try to cache the RDD if sufficient memory is available in the cluster, based on LRU (Least Recent Use) eviction algorithm.

RDD provides an abstract data structure from which application logic can be expressed as a sequence of transformation processing, without worrying about the underlying distributed nature of the data.

Typically an application logic are expressed in terms of a sequence of TRANSFORMATION and ACTION.  "Transformation" specifies the processing dependency DAG among RDDs and "Action" specifies what the output will be (ie: the sink node of the DAG with no outgoing edge).  The scheduler will perform a topology sort to determine the execution sequence of the DAG, tracing all the way back to the source nodes, or node that represents a cached RDD.


Notice that dependencies in Spark come in two forms.  "Narrow dependency" means the all partitions of an RDD will be consumed by a single child RDD (but a child RDD is allowed to have multiple parent RDDs).  "Wide dependencies" (e.g. group-by-keys, reduce-by-keys, sort-by-keys) means a parent RDD will be splitted with elements goes to different children RDDs based on their keys.  Notice that RDD with narrow dependencies preserve the key partitioning between parent and child RDD.  Therefore RDD can be co-partitioned with the same keys (parent key range to be a subset of child key range) such that the processing (generating child RDD from parent RDD) can be done within a machine with no data shuffling across network.  On the other hand, RDD will wide dependencies involves data shuffling.


Narrow transformation (involves no data shuffling) includes the following operators
  • Map
  • FlatMap
  • Filter
  • Sample
Wide transformation (involves data shuffling) includes the following operators
  •  SortByKey
  • ReduceByKey
  • GroupByKey
  • CogroupByKey
  • Join
  • Cartesian
Action output the RDD to the external world and includes the following operators
  • Collect
  • Take(n)
  • Reduce
  • ForEach
  • Sample
  • Count
  • Save
The scheduler will examine the type of dependencies and group the narrow dependency RDD into a unit of processing called a stage.  Wide dependencies will span across consecutive stages within the execution and require the number of partition of the child RDD to be explicitly specified.


A typical execution sequence is as follows ...
  1. RDD is created originally from external data sources (e.g. HDFS, Local file ... etc)
  2. RDD undergoes a sequence of TRANSFORMATION (e.g. map, flatMap, filter, groupBy, join), each provide a different RDD that feed into the next transformation.
  3. Finally the last step is an ACTION (e.g. count, collect, save, take), which convert the last RDD into an output to external data sources
The above sequence of processing is called a lineage (outcome of the topological sort of the DAG).  Each RDD produced within the lineage is immutable.  In fact, unless if it is cached, it is used only once to feed the next transformation to produce the next RDD and finally produce some action output.

In a classical distributed system, fault resilience is achieved by replicating data across different machines together with a active monitoring system.  In case of any machine crashes, there is always another copy of data residing in a different machine from where recovery can take place.

Fault resiliency in Spark takes a different approach.  First of all, as a large scale compute cluster, Spark is not meant to be a large scale data cluster at all.  Spark makes two assumptions of its workload.
  • The processing time is finite (although the longer it takes, the cost of recovery after fault will be higher)
  • Data persistence is the responsibility of external data sources, which keeps the data stable within the duration of processing.
Spark has made a tradeoff decision that in case of any data lost during the execution, it will re-execute the previous steps to recover the lost data.  However, this doesn't mean everything done so far is discarded and we need to start from scratch at the beginning.  We just need to re-executed the corresponding partition in the parent RDD which is responsible for generating the lost partitions, in case of narrow dependencies, this resolved to the same machine.

Notice that the re-execution of lost partition is exactly the same as the lazy evaluation of the DAG, which starts from the leaf node of the DAG, tracing back the dependencies on what parent RDD is needed and then eventually track all the way to the source node.  Recomputing the lost partition is done is a similar way, but taking partition as an extra piece of information to determine which parent RDD partition is needed.

However, re-execution across wide dependencies can touch a lot of parent RDD across multiple machines and may cause re-execution of everything. To mitigate this, Spark persist the intermediate data output from a Map phase before it shuffle them to different machines executing the reduce phase.  In case of machine crash, the re-execution (from another surviving machine) just need to trace back to fetch the intermediate data from the corresponding partition of the mapper's persisted output.  Spark also provide a checkpoint API to explicitly persist intermediate RDD so re-execution (when crash) doesn't need to trace all the way back to the beginning.  In future, Spark will perform check-pointing automatically by figuring out a good balance between the latency of recovery and the overhead of check-pointing based on statistical result.

Spark provides a powerful processing framework for building low latency, massively parallel processing for big data analytics.  It supports API around the RDD abstraction with a set of operation for transformation and action for a number of popular programming language like Scala, Java and Python.

In future posts, I'll cover other technologies in the Spark stack including real-time analytics using streaming as well as machine learning frameworks.