C_jc17
C_jc17
2018-12-22 00:03

Hadoop运行WordCount程序有输入,不生成输出的文件

  • java
  • maven
  • jar

图片说明

代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

public class WordCountMapReduce {

public static void main(String[] args) throws Exception{

    Configuration configuration = new Configuration();

    String[] otherArgs = (new GenericOptionsParser(configuration, args)).getRemainingArgs();

    if (otherArgs.length < 2){

        System.out.println("Usage:wordcount<in>[<in>···]<out>");
        System.exit(2);

    }

    */

/**
* 设置环境参数
//

    Job job = Job.getInstance(configuration, "wordcount");

    */

/**
* 设置整个程序的类名
//

    job.setJarByClass(WordCountMapReduce.class);

    */

/**
* 添加Mapper类
//

    job.setMapperClass(WordCountMapReduce.WordCountMapper.class);

    */

/**
* ?
//

    //job.setCombinerClass(WordCountMapReduce.WordCountReducer.class);

    */

/**
* 添加Reducer类
//

    job.setReducerClass(WordCountMapReduce.WordCountReducer.class);

    */

/**
* 设置输出类型
//

    job.setOutputKeyClass(Text.class);

    */

/**
* 设置输出类型
//

    job.setOutputValueClass(IntWritable.class);

    for (int i = 0;i < otherArgs.length - 1;++i){

        */

/**
* 设置输入文件
//

        FileInputFormat.addInputPath(job, new Path(otherArgs[i]));

    }

    */

/**
* 设置输出文件
//

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));

    System.exit(job.waitForCompletion(true)?0:1);

}

//map程序
public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {

    */

/*
* map方法是提供给map task进程来调用的,map task进程是每读取一行文本来调用一次我们自定义的map方法
* map task在调用map方法时,传递的参数:
* 一行的起始偏移量LongWritable作为key
* 一行的文本内容Text作为value
//

    private static final IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public WordCountMapper() {
    }

    @Override
    protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {

        StringTokenizer stringTokenizer = new StringTokenizer(value.toString());

        while (stringTokenizer.hasMoreTokens()) {

            this.word.set(stringTokenizer.nextToken());
            context.write(this.word, one);

        }

    }
}

//reduce程序
*/

/*
* KEYIN:对应mapper阶段输出的key类型
* VALUEIN:对应mapper阶段输出的value类型
* KEYOUT:reduce处理完之后输出的结果kv对中key的类型
* VALUEOUT:reduce处理完之后输出的结果kv对中value的类型
//

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    */

/*
* reduce方法提供给reduce task进程来调用
*
* reduce task会将shuffle阶段分发过来的大量kv数据对进行聚合,聚合的机制是相同key的kv对聚合为一组
* 然后reduce task对每一组聚合kv调用一次我们自定义的reduce方法
* 比如:
* hello组会调用一次reduce方法进行处理,tom组也会调用一次reduce方法进行处理
* 调用时传递的参数:
* key:一组kv中的key
* values:一组kv中所有value的迭代器
//

    private IntWritable intWritable = new IntWritable();

    public WordCountReducer(){
    }

    public void intWritable(Text key, Iterable<IntWritable>values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)throws IOException, InterruptedException{

        int sum = 0;
        IntWritable val;

        for (Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()){

            val = (IntWritable)i$.next();

        }

        this.intWritable.set(sum);
        context.write(key, this.intWritable);

    }


}

}

  • 点赞
  • 回答
  • 收藏
  • 复制链接分享

0条回答