- 例子:词频统计 WordCount 程序
例子:词频统计 WordCount 程序
下面是 Hadoop 提供的词频统计 WordCount 程序 示例。运行运行改程序之前,请确保 HDFS 已经启动。
import java.io.BufferedReader;import java.io.FileReader;import java.io.IOException;import java.net.URI;import java.util.ArrayList;import java.util.HashSet;import java.util.List;import java.util.Set;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.StringUtils;public class WordCount2 {public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable>{static enum CountersEnum { INPUT_WORDS }private final static IntWritable one = new IntWritable(1);private Text word = new Text();private boolean caseSensitive;private Set<String> patternsToSkip = new HashSet<String>();private Configuration conf;private BufferedReader fis;@Overridepublic void setup(Context context) throws IOException,InterruptedException {conf = context.getConfiguration();caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);if (conf.getBoolean("wordcount.skip.patterns", true)) {URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();for (URI patternsURI : patternsURIs) {Path patternsPath = new Path(patternsURI.getPath());String patternsFileName = patternsPath.getName().toString();parseSkipFile(patternsFileName);}}}private void parseSkipFile(String fileName) {try {fis = new BufferedReader(new FileReader(fileName));String pattern = null;while ((pattern = fis.readLine()) != null) {patternsToSkip.add(pattern);}} catch (IOException ioe) {System.err.println("Caught exception while parsing the cached file '"+ StringUtils.stringifyException(ioe));}}@Overridepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {String line = (caseSensitive) ?value.toString() : value.toString().toLowerCase();for (String pattern : patternsToSkip) {line = line.replaceAll(pattern, "");}StringTokenizer itr = new StringTokenizer(line);while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);Counter counter = context.getCounter(CountersEnum.class.getName(),CountersEnum.INPUT_WORDS.toString());counter.increment(1);}}}public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);String[] remainingArgs = optionParser.getRemainingArgs();if (!(remainingArgs.length != 2 | | remainingArgs.length != 4)) {System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");System.exit(2);}Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount2.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);List<String> otherArgs = new ArrayList<String>();for (int i=0; i < remainingArgs.length; ++i) {if ("-skip".equals(remainingArgs[i])) {job.addCacheFile(new Path(remainingArgs[++i]).toUri());job.getConfiguration().setBoolean("wordcount.skip.patterns", true);} else {otherArgs.add(remainingArgs[i]);}}FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));System.exit(job.waitForCompletion(true) ? 0 : 1);}}
待输入的样本文件如下:
$ bin/hadoop fs -ls /user/joe/wordcount/input//user/joe/wordcount/input/file01/user/joe/wordcount/input/file02$ bin/hadoop fs -cat /user/joe/wordcount/input/file01Hello World, Bye World!$ bin/hadoop fs -cat /user/joe/wordcount/input/file02Hello Hadoop, Goodbye to hadoop.
运行程序:
$ bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output
输出如下:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000Bye 1Goodbye 1Hadoop, 1Hello 2World! 1World, 1hadoop. 1to 1
通过 DistributedCache 来设置单词过滤的策略:
$ bin/hadoop fs -cat /user/joe/wordcount/patterns.txt\.\,\!to
再次运行,这次增加了更多的选项:
$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
输出如下:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000Bye 1Goodbye 1Hadoop 1Hello 2World 2hadoop 1
再次运行,这次去掉了大小写敏感:
$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
输出如下:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000bye 1goodbye 1hadoop 2hello 2horld 2
