Tuesday, June 10, 2008

Exploring Erlang with Map/Reduce

Under the category of "Concurrent Oriented Programming", Erlang has got some good attention recently due to some declared success from Facebook engineers of using Erlang in large scale applications. Tempted to figure out the underlying ingredients of Erlang, I decided to spent some time to learn the language.


Multi-threading Problem

Multiple threads of execution is a common programming model in modern languages because it enable a more efficient use of computing resources provided by multi-core and multi-machine architecture. One of question to be answered though, is how these parallel threads of execution interact and work co-operative to solve the application problem.

There are basically two models for communication between concurrent executions. One is based on a "Shared Memory" model which one thread of execution write the information into a shared place where other threads will read from. Java's thread model is based on such a "shared memory" semantics. The typical problem of this model is that concurrent update requires very sophisticated protection scheme, otherwise uncoordinated access can result in inconsistent data.

Unfortunately, this protection scheme is very hard to analyze once there are multiple threads start to interact in combinatorial explosion number of different ways. Hard to debug deadlock problem are frequently pop up. To reduce the complexity, using a coarse grain locking model is usually recommended but this may reduce the concurrency.

Erlang has picked the other model based on "message passing". In this model, any information that needs to be shared will be "copied" into a message and send to other executions. In this model, each thread of execution has its state "completely local" (not viewable by other thread of executions). Their local state is updated when they learn what is going on in other threads by receiving their messages. This model mirrors how people in real life interact with each other.


Erlang Sequential Processing

Coming from an object oriented imperative programming background, there are a couple of things I need to unlearn/learn in Erlang.

Erlang is a functional programming language and have no OO concepts. Erlang code is structured as "function" at a basic unit, grouped under a "module". Each "function" takes a number of inputs parameters and produce an output value. Like many functional programming language, Erlang encourage the use of "pure function" which is "side-effect-free" and "deterministic". "Side-effect-free" means there is no state changes within the execution of the function. "Deterministic" means the same output will always be produced from the same input.

Erlang has a very different concept in variable assignment in that all variables in Erlang is immutable. In other words, every variable can only be assigned once and from then onwards can never be changed. So I cannot do X = X + 1, and I have to use a new variable and assigned it with the changed value, e.g. Y = X + 1. This "immutability" characteristic simplify debugging a lot because I don't need to worry about how the value of X is changed at different point of execution (it simply won't change).

Another uncommon thing about Erlang is that there is no "while loop" construct in the language. To achieve the looping effect, you need to code the function in a recursive way, basically putting a terminal clause to check for the exit condition, as well as carefully structure the logic in a tail recursion fashion. Otherwise, you may run out of memory in case the stack grow too much. Tail recursion function means the function either returns a value (but not an expression) or a recursive function call. Erlang is smart enough to do tail recursion across multiple functions, such as if funcA calls funcB, which calls funcC, which call funcA. Tail recursion is especially important in writing server daemon which typically make a self recursive call after process a request.


Erlang Parallel Processing

The execution thread in Erlang is called a "Process". Don't be confused with OS-level processes, Erlang process is extremely light-weight, much lighter than Java threads. A process is created by a spawn(Node, Module, Function, Arguments) function call and it terminates when that function is return.

Erlang processes communicate with each other by passing messages. Process ids are used by the sender to specify the recipient addresses. The send call happens asynchronously and returns immediately. The receiving process will make a synchronous receive call and specify a number of matching patterns. Arriving messages that match the pattern will be delivered to the receiving process, otherwise it will stay in the queue forever. Therefore, it is good practices to have a match all pattern to clean up garbage message. The receive call also accepts a timeout parameter so that it will return if no matched messages happen within the timeout period.

Error handling in Erlang is also quite different from other programming languages. Although Erlang provides a try/catch model, it is not the preferred approach. Instead of catching the error and handle it within the local process, the process should simply die and let another process to take care of what should be done after its crash. Erlang have the concept of having processes "linked" to each other and monitor the life status among themselves. In a default setting, a dying process will propagate an exit signal to all the processes it links to (links are bi-directional). So there is a chaining effect that when one process die, the whole chain of processes will die. However, a process can redefine its behavior after receiving the exit signal. Instead of "dying", a process can choose to handle the error (perhaps by restarting the dead process).


Other Erlang Features
Pattern matching is a common programming construct in many places of Erlang, namely "Function calls", "Variable assignment", "Case statements" and "Receive messages". It takes some time to get used to this style. After that I feel this construct to be very powerful.

Another cool feature that Erlang provides is the code hot swap. By specifying the module name when making the function call, a running Erlang process can execute the latest code without restarting itself. This is a powerful features for code evolution because you don't need to shutdown the VM when deploying new code.

Since the function itself can be passed as a message to a remote process, execute code remotely is extremely easy in Erlang. The problem of installation, deployment is pretty much non-existent in Erlang

Map/Reduce using Erlang

After learning the basic concepts, my next step is to search for a problem and get some hands on with the language. Based on a work-partition, aggregation, parallel processing model, Map/Reduce seems to have the characteristic model that aligns very nicely into Erlang's parallel processing model. So I pick my project to implement a simple Map/Reduce framework in Erlang.

Here is the Erlang implementation ...




First of all, I need some Helper functions

-module(mapreduce).
-export([reduce_task/2, map_task/2,
        test_reduce_task/0, test_map_reduce/0,
        repeat_exec/2]).

%%% Execute the function N times,
%%%   and put the result into a list
repeat_exec(N,Func) ->
 lists:map(Func, lists:seq(0, N-1)).
 

%%% Identify the reducer process by
%%%   using the hashcode of the key
find_reducer(Processes, Key) ->
 Index = erlang:phash(Key, length(Processes)),
 lists:nth(Index, Processes).

%%% Identify the mapper process by random
find_mapper(Processes) ->
 case random:uniform(length(Processes)) of
   0 ->
     find_mapper(Processes);
   N ->
     lists:nth(N, Processes)
 end.

%%% Collect result synchronously from
%%%   a reducer process
collect(Reduce_proc) ->
 Reduce_proc ! {collect, self()},
 receive
   {result, Result} ->
     Result
 end.


Main function
The MapReduce() function is the entry point of the system.
  1. It first starts all the R number of Reducer processes
  2. It starts all the M number of Mapper processes, passing them the R reducer processes ids
  3. For each line of input data, it randomly pick one of the M mapper processes and send the line to it
  4. Wait until the completion has finished
  5. Collect result from the R reducer processes
  6. Return the collected result
The corresponding Erlang code is as follows ...
%%% The entry point of the map/reduce framework
map_reduce(M, R, Map_func,
          Reduce_func, Acc0, List) ->

 %% Start all the reducer processes
 Reduce_processes =
   repeat_exec(R,
     fun(_) ->
       spawn(mapreduce, reduce_task,
             [Acc0, Reduce_func])
     end),

 io:format("Reduce processes ~w are started~n",
           [Reduce_processes]),

 %% Start all mapper processes
 Map_processes =
   repeat_exec(M,
     fun(_) ->
       spawn(mapreduce, map_task,
             [Reduce_processes, Map_func])
     end),

 io:format("Map processes ~w are started~n",
           [Map_processes]),

 %% Send the data to the mapper processes
 Extract_func =
   fun(N) ->
     Extracted_line = lists:nth(N+1, List),
     Map_proc = find_mapper(Map_processes),
     io:format("Send ~w to map process ~w~n",
               [Extracted_line, Map_proc]),
     Map_proc ! {map, Extracted_line}
   end,

 repeat_exec(length(List), Extract_func),

 timer:sleep(2000),

 %% Collect the result from all reducer processes
 io:format("Collect all data from reduce processes~n"),
 All_results =
   repeat_exec(length(Reduce_processes),
     fun(N) ->
       collect(lists:nth(N+1, Reduce_processes))
     end),
 lists:flatten(All_results).


Map Process

The Map processes, once started, will perform the following ...
  1. Receive the input line
  2. Execute the User provided Map function to turn into a list of key, value pairs
  3. For each key and value, select a reducer process and send the key, value to it
The corresponding Erlang code will be as follows ...

%%% The mapper process
map_task(Reduce_processes, MapFun) ->
 receive
   {map, Data} ->
     IntermediateResults = MapFun(Data),
     io:format("Map function produce: ~w~n",
               [IntermediateResults ]),
     lists:foreach(
       fun({K, V}) ->
         Reducer_proc =
           find_reducer(Reduce_processes, K),
         Reducer_proc ! {reduce, {K, V}}
       end, IntermediateResults),

     map_task(Reduce_processes, MapFun)
 end.


Reduce Process
On the other hand, the reducer processes will execute as follows ...
  1. Receive the key, value from the Mapper process
  2. Get the current accumulated value by the key. If no accumulated value is found, use the initial accumulated value
  3. Invoke the user provided reduce function to calculate the new accumulated value
  4. Store the new accumulated value under the key

The corresponding Erlang code will be as follows ...

%%% The reducer process
reduce_task(Acc0, ReduceFun) ->
 receive
   {reduce, {K, V}} ->
     Acc = case get(K) of
             undefined ->
               Acc0;
             Current_acc ->
               Current_acc
           end,
     put(K, ReduceFun(V, Acc)),
     reduce_task(Acc0, ReduceFun);
   {collect, PPid} ->
     PPid ! {result, get()},
     reduce_task(Acc0, ReduceFun)
 end.

Word Count Example
To test the Map/Reduce framework using a word count example ...

%%% Testing of Map reduce using word count
test_map_reduce() ->
 M_func = fun(Line) ->
            lists:map(
              fun(Word) ->
                {Word, 1}
              end, Line)
          end,

 R_func = fun(V1, Acc) ->
            Acc + V1
          end,

 map_reduce(3, 5, M_func, R_func, 0,
            [[this, is, a, boy],
             [this, is, a, girl],
             [this, is, lovely, boy]]).

This is the result when execute the test program.

Erlang (BEAM) emulator version 5.6.1 [smp:2] [async-threads:0]

Eshell V5.6.1  (abort with ^G)
1> c (mapreduce).
{ok,mapreduce}
2>
2> mapreduce:test_map_reduce().
Reduce processes [<0.37.0>,<0.38.0>,<0.39.0>,<0.40.0>,<0.41.0>] are started
Map processes [<0.42.0>,<0.43.0>,<0.44.0>] are started
Send [this,is,a,boy] to map process <0.42.0>
Send [this,is,a,girl] to map process <0.43.0>
Map function produce: [{this,1},{is,1},{a,1},{boy,1}]
Send [this,is,lovely,boy] to map process <0.44.0>
Map function produce: [{this,1},{is,1},{a,1},{girl,1}]
Map function produce: [{this,1},{is,1},{lovely,1},{boy,1}]
Collect all data from reduce processes
[{is,3},{this,3},{boy,2},{girl,1},{a,2},{lovely,1}]
3>


The complete Erlang code is attached here ...

-module(mapreduce).
-export([reduce_task/2, map_task/2,
        test_reduce_task/0, test_map_reduce/0,
        repeat_exec/2]).

%%% Execute the function N times,
%%%   and put the result into a list
repeat_exec(N,Func) ->
 lists:map(Func, lists:seq(0, N-1)).
 

%%% Identify the reducer process by
%%%   using the hashcode of the key
find_reducer(Processes, Key) ->
 Index = erlang:phash(Key, length(Processes)),
 lists:nth(Index, Processes).

%%% Identify the mapper process by random
find_mapper(Processes) ->
 case random:uniform(length(Processes)) of
   0 ->
     find_mapper(Processes);
   N ->
     lists:nth(N, Processes)
 end.

%%% Collect result synchronously from
%%%   a reducer process
collect(Reduce_proc) ->
 Reduce_proc ! {collect, self()},
 receive
   {result, Result} ->
     Result
 end.


%%% The reducer process
reduce_task(Acc0, ReduceFun) ->
 receive
   {reduce, {K, V}} ->
     Acc = case get(K) of
             undefined ->
               Acc0;
             Current_acc ->
               Current_acc
           end,
     put(K, ReduceFun(V, Acc)),
     reduce_task(Acc0, ReduceFun);
   {collect, PPid} ->
     PPid ! {result, get()},
     reduce_task(Acc0, ReduceFun)
 end.

%%% The mapper process
map_task(Reduce_processes, MapFun) ->
 receive
   {map, Data} ->
     IntermediateResults = MapFun(Data),
     io:format("Map function produce: ~w~n",
               [IntermediateResults ]),
     lists:foreach(
       fun({K, V}) ->
         Reducer_proc =
           find_reducer(Reduce_processes, K),
         Reducer_proc ! {reduce, {K, V}}
       end, IntermediateResults),

     map_task(Reduce_processes, MapFun)
 end.


%%% The entry point of the map/reduce framework
map_reduce(M, R, Map_func,
          Reduce_func, Acc0, List) ->

 %% Start all the reducer processes
 Reduce_processes =
   repeat_exec(R,
     fun(_) ->
       spawn(mapreduce, reduce_task,
             [Acc0, Reduce_func])
     end),

 io:format("Reduce processes ~w are started~n",
           [Reduce_processes]),

 %% Start all mapper processes
 Map_processes =
   repeat_exec(M,
     fun(_) ->
       spawn(mapreduce, map_task,
             [Reduce_processes, Map_func])
     end),

 io:format("Map processes ~w are started~n",
           [Map_processes]),

 %% Send the data to the mapper processes
 Extract_func =
   fun(N) ->
     Extracted_line = lists:nth(N+1, List),
     Map_proc = find_mapper(Map_processes),
     io:format("Send ~w to map process ~w~n",
               [Extracted_line, Map_proc]),
     Map_proc ! {map, Extracted_line}
   end,

 repeat_exec(length(List), Extract_func),

 timer:sleep(2000),

 %% Collect the result from all reducer processes
 io:format("Collect all data from reduce processes~n"),
 All_results =
   repeat_exec(length(Reduce_processes),
     fun(N) ->
       collect(lists:nth(N+1, Reduce_processes))
     end),
 lists:flatten(All_results).

%%% Testing of Map reduce using word count
test_map_reduce() ->
 M_func = fun(Line) ->
            lists:map(
              fun(Word) ->
                {Word, 1}
              end, Line)
          end,

 R_func = fun(V1, Acc) ->
            Acc + V1
          end,

 map_reduce(3, 5, M_func, R_func, 0,
            [[this, is, a, boy],
             [this, is, a, girl],
             [this, is, lovely, boy]]).
  


Summary

From this exercise of implementing a simple Map/Reduce model using Erlang, I found that Erlang is very powerful in developing distributed systems.

2 comments:

kklis said...

There is a module for parallel list processing available at http://code.google.com/p/plists/

It also contains a map-reduce algorithm implementation.

Anonymous said...

If I compile your code it gives an error:

./mapreduce.erl:2: function test_reduce_task/0 undefined

Thus you should not export test_reduce_task/0 in your header.