| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Comparator;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class WordCount {
- public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
- /* map function */
- public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
- String line = value.toString();
- String str = new String("");
- for (int i = 0; i < line.length(); i++) {
- if (Character.isLetter(line.charAt(i)) || str.length() > 0 && line.charAt(i) == '\'') {
- str += Character.toLowerCase(line.charAt(i)); // extend word
- } else {
- for (; str.length() > 0 && str.charAt(str.length() - 1) == '\'';) {
- str = str.substring(0, str.length() - 1); // remove trailing quotation mark
- }
- if (str.length() > 0) {
- word.set(str);
- context.write(word, one); // output pair
- str = "";
- }
- }
- }
- for (; str.length() > 0 && str.charAt(str.length() - 1) == '\'';) {
- str = str.substring(0, str.length() - 1); // remove trailing quotation mark
- }
- if (str.length() > 0) {
- word.set(str);
- context.write(word, one); // output pair
- }
- }
- }
- public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
- private HashMap<String, Integer> record = new HashMap<String, Integer>(); // store count in map, used for final sort
- private Text word = new Text();
- private IntWritable result = new IntWritable();
- /* reduce function */
- public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- int sum = 0;
- for (IntWritable val : values) {
- sum += val.get();
- }
- record.put(key.toString(), record.getOrDefault(key.toString(), 0) + sum); // update map, don't output here
- }
- /* cleanup function executed after reducing */
- public void cleanup(Context context) throws IOException, InterruptedException {
- ArrayList<HashMap.Entry<String, Integer>> list = new ArrayList<HashMap.Entry<String, Integer>>(record.entrySet()); // get answer from map
- Collections.sort(list, new Comparator<HashMap.Entry<String, Integer>>() {
- public int compare(HashMap.Entry<String, Integer> o1, HashMap.Entry<String, Integer> o2) { // compare function
- if (o1.getValue().equals(o2.getValue())) {
- return o1.getKey().compareTo(o2.getKey());
- }
- return o1.getValue() < o2.getValue() ? 1 : -1;
- }
- });
- for (HashMap.Entry<String, Integer> itr : list) {
- word.set(itr.getKey());
- result.set(itr.getValue());
- context.write(word, result); // output here
- }
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- conf.set("mapreduce.output.textoutputformat.separator", " ");
- Job job = Job.getInstance(conf, "word count");
- job.setJarByClass(WordCount.class);
- job.setNumReduceTasks(5); // number of reducers
- job.setMapperClass(TokenizerMapper.class); // map class
- job.setCombinerClass(IntSumReducer.class); // combine class
- job.setReducerClass(IntSumReducer.class); // reduce class
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- FileOutputFormat.setOutputPath(job, new Path(args[0])); // output directory
- for (int i = 1; i < args.length; i++) {
- FileInputFormat.addInputPath(job, new Path(args[i])); // input files
- }
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
|