代码如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
public class WordCountMapReduce {
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
String[] otherArgs = (new GenericOptionsParser(configuration, args)).getRemainingArgs();
if (otherArgs.length < 2){
System.out.println("Usage:wordcount<in>[<in>···]<out>");
System.exit(2);
}
*/
/**
* 设置环境参数
//
Job job = Job.getInstance(configuration, "wordcount");
*/
/**
* 设置整个程序的类名
//
job.setJarByClass(WordCountMapReduce.class);
*/
/**
* 添加Mapper类
//
job.setMapperClass(WordCountMapReduce.WordCountMapper.class);
*/
/**
* ?
//
//job.setCombinerClass(WordCountMapReduce.WordCountReducer.class);
*/
/**
* 添加Reducer类
//
job.setReducerClass(WordCountMapReduce.WordCountReducer.class);
*/
/**
* 设置输出类型
//
job.setOutputKeyClass(Text.class);
*/
/**
* 设置输出类型
//
job.setOutputValueClass(IntWritable.class);
for (int i = 0;i < otherArgs.length - 1;++i){
*/
/**
* 设置输入文件
//
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
*/
/**
* 设置输出文件
//
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true)?0:1);
}
//map程序
public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
*/
/*
* map方法是提供给map task进程来调用的,map task进程是每读取一行文本来调用一次我们自定义的map方法
* map task在调用map方法时,传递的参数:
* 一行的起始偏移量LongWritable作为key
* 一行的文本内容Text作为value
//
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public WordCountMapper() {
}
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer stringTokenizer = new StringTokenizer(value.toString());
while (stringTokenizer.hasMoreTokens()) {
this.word.set(stringTokenizer.nextToken());
context.write(this.word, one);
}
}
}
//reduce程序
*/
/*
* KEYIN:对应mapper阶段输出的key类型
* VALUEIN:对应mapper阶段输出的value类型
* KEYOUT:reduce处理完之后输出的结果kv对中key的类型
* VALUEOUT:reduce处理完之后输出的结果kv对中value的类型
//
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
*/
/*
* reduce方法提供给reduce task进程来调用
*
* reduce task会将shuffle阶段分发过来的大量kv数据对进行聚合,聚合的机制是相同key的kv对聚合为一组
* 然后reduce task对每一组聚合kv调用一次我们自定义的reduce方法
* 比如:
* hello组会调用一次reduce方法进行处理,tom组也会调用一次reduce方法进行处理
* 调用时传递的参数:
* key:一组kv中的key
* values:一组kv中所有value的迭代器
//
private IntWritable intWritable = new IntWritable();
public WordCountReducer(){
}
public void intWritable(Text key, Iterable<IntWritable>values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)throws IOException, InterruptedException{
int sum = 0;
IntWritable val;
for (Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()){
val = (IntWritable)i$.next();
}
this.intWritable.set(sum);
context.write(key, this.intWritable);
}
}
}