「已注销」 2023-04-20 16:12 采纳率: 25%
浏览 12

hadoop中mapreduce运行结果和逾期结果不一致

hadoop中编译运行mapreduce的jar包没有报错也能出现结果,为什么统计出来的结果未计数呢?最后一张是出来的结果

img

img

img

  • 写回答

1条回答 默认 最新

  • AllenGd 领域专家: 大数据技术领域 2023-04-20 16:37
    关注
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     
    public class WordCount {
           /*   
            * 先经过mapper运算,然后才是reducer。    
            * 内部类:映射器 Mapper<Key_IN, Value_IN, Key_OUT, Value_OUT>    
            * 首先读取源文本    
            */
     
        public static class WcMap extends Mapper<Object,Text,Text,IntWritable>{
             //占位体,1,查到一个就占个坑
             private final static IntWritable one=new IntWritable(1);
             //文本
             private Text word=new Text();
             //每次调用map方法会传入split中一行数据。
             //key:该行数据所在文件中的位置下标,value:该行内容(数据),context:上下文对象,在整个wordcount运算周期内存活。
             //这里K、V像这样[K,V]
             //重写map方法,实现理想效果。WcMap的实例只有一个,但实例的这个map方法却一直在执行,直到读取结束
             @Override
             protected void map(Object key, Text value,
                      Mapper<Object, Text, Text, IntWritable>.Context context)
                      throws IOException, InterruptedException {
             //拆分字符串,返回单词集合。默认以空格和换行/回车拆分
                 StringTokenizer itr=new StringTokenizer(value.toString());
             /*补充:
               StringTokenizer是一个用来分隔String的应用类,相当于VB(Visual Basic是一种由微软公司开发的结构化的、模块化的、面向对象的、包含协助开发环境的事件驱动为机制的可视化程序设计语言)的split函数。
               StringTokenizer是字符串分隔解析类型,属于:Java.util包。
               1.StringTokenizer的构造函数
               StringTokenizer(String str):构造一个用来解析str的StringTokenizer对象。
               java默认的分隔符是“空格”、“制表符(‘\t’)”、“换行符(‘\n’)”、“回车符(‘\r’)”。
               StringTokenizer(String str,String delim):构造一个用来解析str的StringTokenizer对象, 并提供一个指定的分隔符。
               StringTokenizer(String str,String delim,boolean returnDelims):构造一个用来解析str的StringTokenizer对象,并提供一个指定的分隔符,同时,指定是否返回分隔符。
              */
             //遍历一行的全部单词
                 while(itr.hasMoreTokens()){
             //将文本转为临时Text变量
                      String curword=itr.nextToken();
                      word.set(curword);
             //将单词保存到上下文对象(单词,占位体),输出
                      context.write(word, one);
                 }
             }       
        }
     
       /************************************************************************
        *  在Mapper后,Reducer前,有个shuffle过程,会根据k2将对应的v2归并为v2[...]  *
        *************************************************************************/
       /**
        * mapper结束后,执行现在的reducer。
        * 内部类:拆分器 Reducer<Key_IN, Value_IN, Key_OUT, Value_OUT>
        */
     
    public static class WcReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
    //计数器。个数统计
        private IntWritable times=new IntWritable();
          /**
            * 重写reduce方法,实现理想效果
            * WcReduce的实例也只有一个,但实例的这个reduce方法却一直在执行,直到完成统计
            * Key:单词。Values:value的集合,也就是[1,1,1,...]。context:上下文对象
            * 这里这里K、V像这样[K,V[1,1,1,...]]。每执行一次,key就是一个新单词,对应的values就是其全部占位体
           **/
     
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                 Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
     
             int sum=0;
            //累加value的元素,有多少个占为体1,即有多少个指定单词
             for(IntWritable i:values){
                 sum +=i.get();//对单词为key的计数统计。i是IntWritable类型不能直接加,所以i.get()就是把IntWritable类型变成整数int类型
             }
             times.set(sum);//每次set一下都会清空之前的值
            //终于将单词和总个数再次输出
             context.write(key, times);//输出到 hdfs:/output中到结果文件
        }
    }
     
        public static void main(String[] args) throws Exception {
            //HDFS配置
             Configuration conf=new Configuration();
            //作业(环境)
             Job job =Job.getInstance(conf);
             job.setJarByClass(WordCount.class);//执行作业的类
             job.setMapperClass(WcMap.class);//读取元数据,执行map运算的类
             /* Combiner
         * 通常,每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
         * combiner的输入输出类型必须和mapper的输出以及reducer的输入类型一致
            */
             //job.setCombinerClass(WcReduce.class);    // 统计数据,执行reducer的类
             job.setReducerClass(WcReduce.class);                  //统计数据,执行reducer的类
             job.setOutputKeyClass(Text.class);                         //设置好输出的key的类型,和context上下文对象write的参数类型一致。
             //job.setNumReduceTasks(1);                                 //设置reduce任务的个数
             job.setOutputValueClass(IntWritable.class);            // 设置输出的value类型
             FileInputFormat.addInputPath(job, new Path("hdfs://manager:8020/test/input/wc.txt"));// 元数据路径,(输入的文件或者目录)必须已存在
             FileOutputFormat.setOutputPath(job, new Path("hdfs://manager:8020/test/output/wc"));// 统计结果输出路径(输出的文件或者目录),程序自动创建
             System.exit(job.waitForCompletion(true)?0:1);// 等待提交作业到集群并完成,才结束程序。等待job完成,若系统运行成功, 则返回0 ,否则返回1
        }
    }
    
    评论

报告相同问题?

问题事件

  • 创建了问题 4月20日