WordCount.java 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import java.io.IOException;
  2. import java.util.HashMap;
  3. import java.util.ArrayList;
  4. import java.util.Collections;
  5. import java.util.Comparator;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. public class WordCount {
  16. public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
  17. private final static IntWritable one = new IntWritable(1);
  18. private Text word = new Text();
  19. /* map function */
  20. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  21. String line = value.toString();
  22. String str = new String("");
  23. for (int i = 0; i < line.length(); i++) {
  24. if (Character.isLetter(line.charAt(i)) || str.length() > 0 && line.charAt(i) == '\'') {
  25. str += Character.toLowerCase(line.charAt(i)); // extend word
  26. } else if (str.length() > 0) {
  27. word.set(str);
  28. context.write(word, one);
  29. str = "";
  30. }
  31. }
  32. if (str.length() > 0) {
  33. word.set(str);
  34. context.write(word, one);
  35. }
  36. }
  37. }
  38. public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  39. private HashMap<String, Integer> record = new HashMap<String, Integer>(); // store count in map, used for final sort
  40. private Text word = new Text();
  41. private IntWritable result = new IntWritable();
  42. /* reduce function */
  43. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  44. int sum = 0;
  45. for (IntWritable val : values) {
  46. sum += val.get();
  47. }
  48. record.put(key.toString(), record.getOrDefault(key.toString(), 0) + sum); // update map, don't write context here
  49. }
  50. /* cleanup function executed after reducing */
  51. public void cleanup(Context context) throws IOException, InterruptedException {
  52. ArrayList<HashMap.Entry<String, Integer>> list = new ArrayList<HashMap.Entry<String, Integer>>(record.entrySet()); // get answer from map
  53. Collections.sort(list, new Comparator<HashMap.Entry<String, Integer>>() {
  54. public int compare(HashMap.Entry<String, Integer> o1, HashMap.Entry<String, Integer> o2) { // compare function
  55. if (o1.getValue().equals(o2.getValue())) {
  56. return o1.getKey().compareTo(o2.getKey());
  57. }
  58. return o1.getValue() < o2.getValue() ? 1 : -1;
  59. }
  60. });
  61. for (HashMap.Entry<String, Integer> itr : list) {
  62. word.set(itr.getKey());
  63. result.set(itr.getValue());
  64. context.write(word, result); // write context here
  65. }
  66. }
  67. }
  68. public static void main(String[] args) throws Exception {
  69. Configuration conf = new Configuration();
  70. Job job = Job.getInstance(conf, "word count");
  71. job.setJarByClass(WordCount.class);
  72. job.setMapperClass(TokenizerMapper.class); // map class
  73. job.setCombinerClass(IntSumReducer.class); // combine class
  74. job.setReducerClass(IntSumReducer.class); // reduce class
  75. job.setOutputKeyClass(Text.class);
  76. job.setOutputValueClass(IntWritable.class);
  77. FileOutputFormat.setOutputPath(job, new Path(args[0])); // output directory
  78. for (int i = 1; i < args.length; i++) {
  79. FileInputFormat.addInputPath(job, new Path(args[i])); // input files
  80. }
  81. System.exit(job.waitForCompletion(true) ? 0 : 1);
  82. }
  83. }