WordCount.java 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import java.io.IOException;
  2. import java.util.HashMap;
  3. import java.util.ArrayList;
  4. import java.util.Collections;
  5. import java.util.Comparator;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. public class WordCount {
  16. public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
  17. private final static IntWritable one = new IntWritable(1);
  18. private Text word = new Text();
  19. /* map function */
  20. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  21. String line = value.toString();
  22. String str = new String("");
  23. for (int i = 0; i < line.length(); i++) {
  24. if (Character.isLetter(line.charAt(i)) || str.length() > 0 && line.charAt(i) == '\'') {
  25. str += Character.toLowerCase(line.charAt(i)); // extend word
  26. } else {
  27. for (; str.length() > 0 && str.charAt(str.length() - 1) == '\'';) {
  28. str = str.substring(0, str.length() - 1); // remove trailing quotation mark
  29. }
  30. if (str.length() > 0) {
  31. word.set(str);
  32. context.write(word, one); // output pair
  33. str = "";
  34. }
  35. }
  36. }
  37. for (; str.length() > 0 && str.charAt(str.length() - 1) == '\'';) {
  38. str = str.substring(0, str.length() - 1); // remove trailing quotation mark
  39. }
  40. if (str.length() > 0) {
  41. word.set(str);
  42. context.write(word, one); // output pair
  43. }
  44. }
  45. }
  46. public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  47. private HashMap<String, Integer> record = new HashMap<String, Integer>(); // store count in map, used for final sort
  48. private Text word = new Text();
  49. private IntWritable result = new IntWritable();
  50. /* reduce function */
  51. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  52. int sum = 0;
  53. for (IntWritable val : values) {
  54. sum += val.get();
  55. }
  56. record.put(key.toString(), record.getOrDefault(key.toString(), 0) + sum); // update map, don't output here
  57. }
  58. /* cleanup function executed after reducing */
  59. public void cleanup(Context context) throws IOException, InterruptedException {
  60. ArrayList<HashMap.Entry<String, Integer>> list = new ArrayList<HashMap.Entry<String, Integer>>(record.entrySet()); // get answer from map
  61. Collections.sort(list, new Comparator<HashMap.Entry<String, Integer>>() {
  62. public int compare(HashMap.Entry<String, Integer> o1, HashMap.Entry<String, Integer> o2) { // compare function
  63. if (o1.getValue().equals(o2.getValue())) {
  64. return o1.getKey().compareTo(o2.getKey());
  65. }
  66. return o1.getValue() < o2.getValue() ? 1 : -1;
  67. }
  68. });
  69. for (HashMap.Entry<String, Integer> itr : list) {
  70. word.set(itr.getKey());
  71. result.set(itr.getValue());
  72. context.write(word, result); // output here
  73. }
  74. }
  75. }
  76. public static void main(String[] args) throws Exception {
  77. Configuration conf = new Configuration();
  78. conf.set("mapreduce.output.textoutputformat.separator", " ");
  79. Job job = Job.getInstance(conf, "word count");
  80. job.setJarByClass(WordCount.class);
  81. job.setNumReduceTasks(5); // number of reducers
  82. job.setMapperClass(TokenizerMapper.class); // map class
  83. job.setCombinerClass(IntSumReducer.class); // combine class
  84. job.setReducerClass(IntSumReducer.class); // reduce class
  85. job.setOutputKeyClass(Text.class);
  86. job.setOutputValueClass(IntWritable.class);
  87. FileOutputFormat.setOutputPath(job, new Path(args[0])); // output directory
  88. for (int i = 1; i < args.length; i++) {
  89. FileInputFormat.addInputPath(job, new Path(args[i])); // input files
  90. }
  91. System.exit(job.waitForCompletion(true) ? 0 : 1);
  92. }
  93. }