--- title: Introduction to MapReduce tags: - Course Project - Java - Go categories: - - Coding - Back End Development date: 2021-07-03 12:34:00 --- # Introduction There are tons of large data in today's life. Processing large data on a single computer becomes unacceptable as the scale of data soars. A classic problem is the word count problem: Given very large text data, get the the number of occurrences of each word. If we use multiple computers, we can speed up the process, but the complexity of programming will also increase. MapReduce can help us handle large data on a cluster of computers fast and easily. In fact, MapReduce is an idea of data parallelism, and also a model of data parallelism. In this article, we are going to implement a serial program and a MapReduce parallel program for the word count problem, and compare their performances. # Algorithm Specification ## Brief Introduction to MapReduce MapReduce model consists of two main stages: map stage and reduce stage. Map stage accepts some input records, and transform them into some intermediate records. Let's take the word count problem as an example. Assume that we are given the following text: ``` apple is red orange is orange banana is yellow ``` Before map stage, the input text is divided into several input splits. Each of them is assigned to a mapper. ``` ("apple is red") -> Mapper 1 ("orange is orange") -> Mapper 2 ("banana is yellow") -> Mapper 3 ``` In map stage, we split the data into several pairs `(key, value)`. In this example, `key` represents for a word and `value` represents for one occurrence of that word. ``` Mapper 1 -> ("apple", 1), ("is", 1), ("red", 1) Mapper 2 -> ("orange", 1), ("is", 1), ("orange", 1) Mapper 3 -> ("banana", 1), ("is", 1), ("yellow", 1) ``` Between map stage and reduce stage, there is a shuffle (or partition) process. The pairs with the same key are merged together, forming pairs like `(key, )`, and are sent to the same reducer. ``` ("apple", <1>), ("orange", <1, 1>) -> Reducer 1 ("is", <1, 1, 1>), ("banana", <1>) -> Reducer 2 ("red", <1>), ("yellow", <1>) -> Reducer 3 ``` In reduce stage, we reduce a set of intermediate values which share a key to a smaller set of values. In this example, we simply calculate the sum of values in the set for each key. ``` Reducer 1 -> ("apple", 1), ("orange", 2) Reducer 2 -> ("is", 3), ("banana", 1) Reducer 3 -> ("red", 1), ("yellow", 1) ``` After reduce stage, we merge the results generated by reducers together. ``` ("apple", 1) ("orange", 2) ("is", 3) ("banana", 1) ("red", 1) ("yellow", 1) ``` And here we have finished the task, i.e., find the number of occurrences of each word. ## Brief Introduction to Hadoop Despite simple principle, MapReduce isn't that easy to implement. There are lots of details to consider, such as how to transmit data between computers, how to split the input, how to manage memory and when to write data to disk. Fortunately, Apache Software Foundation has an excellent implementation called Hadoop. We can just focus on map function and reduce function, and deploy Hadoop on a cluster of computers. The Hadoop framework will handle the rest of things. Specifically, Hadoop implements a file system called HDFS to store files and read and write data during MapReduce process. Let's see how to use Hadoop to solve the word count problem. First, we need to build a cluster of computers with Hadoop installed. We use Docker because it is convenient to deploy several containers from the same image. Then, we should specify one of the containers to be name node (manage HDFS), and the other containers to be data nodes (store data). After that, we put the test data into HDFS, and write our own map function and reduce function. Finally, we compile the program with Hadoop library and run it using Hadoop to get the result. To be more concrete, the map function should be defined in a subclass of `Mapper`. We define class `TokenizerMapper` that extends `Mapper`. Here, the first two parameters indicate the type of input pair of map function (``), and the last two parameters indicate the type of output pair of map function (``). We define a word as a string that contains only letters and quotation marks and doesn't start with or end with quotation mark. So we can write our map function like this: ```java public static class TokenizerMapper extends Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); /* map function */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String str = new String(""); for (int i = 0; i < line.length(); i++) { if (Character.isLetter(line.charAt(i)) || str.length() > 0 && line.charAt(i) == '\'') { str += Character.toLowerCase(line.charAt(i)); // extend word } else { for (; str.length() > 0 && str.charAt(str.length() - 1) == '\'';) { str = str.substring(0, str.length() - 1); // remove trailing quotation mark } if (str.length() > 0) { word.set(str); context.write(word, one); // output pair str = ""; } } } for (; str.length() > 0 && str.charAt(str.length() - 1) == '\'';) { str = str.substring(0, str.length() - 1); // remove trailing quotation mark } if (str.length() > 0) { word.set(str); context.write(word, one); // output pair } } } ``` Similarly, the reduce function should be defined in a subclass of `Reducer`. We define class `IntSumReducer` that extends `Reducer`. Here, the first two parameters indicate the type of input pair of reduce function (``), which should be equal to the type of output pair of map function, and the last two parameters indicate the type of output pair of reduce function (``). Note that by default, the output pairs are sorted by their `key`, i.e., by the word in lexicographical order. If we want to customize the order, we need to do extra work after reducing. We can store the pairs and do not output them in reduce function. Instead, we sort and output them in cleanup function, which will be executed after all reducers finish their work. We can write our reduce function like this: ```java public static class IntSumReducer extends Reducer { private HashMap record = new HashMap(); // store count in map, used for final sort private Text word = new Text(); private IntWritable result = new IntWritable(); /* reduce function */ public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } record.put(key.toString(), record.getOrDefault(key.toString(), 0) + sum); // update map, don't output here } /* cleanup function executed after reducing */ public void cleanup(Context context) throws IOException, InterruptedException { ArrayList> list = new ArrayList>(record.entrySet()); // get answer from map Collections.sort(list, new Comparator>() { public int compare(HashMap.Entry o1, HashMap.Entry o2) { // compare function if (o1.getValue().equals(o2.getValue())) { return o1.getKey().compareTo(o2.getKey()); } return o1.getValue() < o2.getValue() ? 1 : -1; } }); for (HashMap.Entry itr : list) { word.set(itr.getKey()); result.set(itr.getValue()); context.write(word, result); // output here } } } ``` ## A Simple Implementation of MapReduce If we just want to process relatively small data on a multicore computer using parallelism, we can implement our own MapReduce framework. We choose Golang as our programming language because it is easy to reach high concurrency. More importantly, the communication between different threads is very convenient, using a special type in Golang called channel. In Golang, we can start a new thread simply using `go funcName(args)`. And we can define a channel of string using `chanName chan string`. To send something via channel, use `chanName <- something`. To receive something from channel, use `something := <-chanName`. Thus, we are able to communicate with other threads using `go funcName(inChan, outChan)`, and send data via `inChan` and receive data from `outChan`. To implement map stage, for simplicity, we just split the input data by file (that is, each file will be assigned to a mapper to be processed). We can write mapper like this: ```go /* Frame-defined mapper */ func Mapper(config Config, filename chan string, shuffle chan Pair) { output := make(chan Pair, chanSize) merge := make(map[string][]int) for name := range filename { text, _ := os.ReadFile(name) // read file content go config.mapFunc(text, output) for pair := range output { if pair.key == "" { // map finish break } merge[pair.key] = append(merge[pair.key], pair.value) // merge in map stage } } for key, list := range merge { shuffle <- config.combineFunc(key, list) // combine local result, to shuffle stage } shuffle <- Pair{"", 0} // mapper finish } ``` Here, we merge and combine local result in map stage. This is because we can send `(key, 3)` once instead of sending `(key, 1)` three times, to reduce data transmission and lighten the work of reducer. To implement shuffle process, we simply use a map to ensure that pairs with the same key are sent to the same reducer. We can write shuffle process like this: ```go merge := make(map[string][]int) for pair := range shuffle { // get from map stage if pair.key == "" { workerCnt-- if workerCnt == 0 { break } } else { merge[pair.key] = append(merge[pair.key], pair.value) // merge in shuffle stage } } ``` To implement reduce stage, we just call reduce function. We can write reduce stage like this: ```go /* Frame-defined reducer */ func Reducer(config Config, keylist chan AggPair, collect chan Pair) { for aggPair := range keylist { collect <- config.reduceFunc(aggPair.key, aggPair.list) // to final stage } collect <- Pair{"", 0} // reducer finish } ``` ## Serial Algorithm We are given a set of documents. We can read them one by one, and scan the content to find words. Use a map (not the map in MapReduce) to store the number of occurrences of each word. After processing all documents, we've got the answer, and we just need to sort them in specified order. # Analysis and Comments The main limitation is the hardware. For a single computer, if it has more threads, then it can run more mappers or reducers simultaneously, which will increase the speed. For a cluster of computers, increase the number of computers means more nodes, and it will also increase the speed. When we increase the number of documents, the serial program surely needs more time to get the result. For the parallel program, if we do not increase the number of mappers, it will run slower because each mapper takes more time to finish. Otherwise, the running time of program will roughly remain invariant because the time for map stage will not change (we have more mappers), so as the time for reduce stage (the number of different words will possibly change very slightly). In short, the larger the data we want to process, the more efficient the parallel program compared to the serial program.