import java.io.IOException; import java.util.HashMap; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); /* map function */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String str = new String(""); for (int i = 0; i < line.length(); i++) { if (Character.isLetter(line.charAt(i)) || str.length() > 0 && line.charAt(i) == '\'') { str += Character.toLowerCase(line.charAt(i)); // extend word } else { for (; str.length() > 0 && str.charAt(str.length() - 1) == '\'';) { str = str.substring(0, str.length() - 1); // remove trailing quotation mark } if (str.length() > 0) { word.set(str); context.write(word, one); // output pair str = ""; } } } for (; str.length() > 0 && str.charAt(str.length() - 1) == '\'';) { str = str.substring(0, str.length() - 1); // remove trailing quotation mark } if (str.length() > 0) { word.set(str); context.write(word, one); // output pair } } } public static class IntSumReducer extends Reducer { private HashMap record = new HashMap(); // store count in map, used for final sort private Text word = new Text(); private IntWritable result = new IntWritable(); /* reduce function */ public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } record.put(key.toString(), record.getOrDefault(key.toString(), 0) + sum); // update map, don't output here } /* cleanup function executed after reducing */ public void cleanup(Context context) throws IOException, InterruptedException { ArrayList> list = new ArrayList>(record.entrySet()); // get answer from map Collections.sort(list, new Comparator>() { public int compare(HashMap.Entry o1, HashMap.Entry o2) { // compare function if (o1.getValue().equals(o2.getValue())) { return o1.getKey().compareTo(o2.getKey()); } return o1.getValue() < o2.getValue() ? 1 : -1; } }); for (HashMap.Entry itr : list) { word.set(itr.getKey()); result.set(itr.getValue()); context.write(word, result); // output here } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("mapreduce.output.textoutputformat.separator", " "); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setNumReduceTasks(5); // number of reducers job.setMapperClass(TokenizerMapper.class); // map class job.setCombinerClass(IntSumReducer.class); // combine class job.setReducerClass(IntSumReducer.class); // reduce class job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[0])); // output directory for (int i = 1; i < args.length; i++) { FileInputFormat.addInputPath(job, new Path(args[i])); // input files } System.exit(job.waitForCompletion(true) ? 0 : 1); } }