he___H 2022-07-09 16:18 采纳率: 75%
浏览 29
已结题

hadoop进行数据分析缺少数据的输出

问题遇到的现象和发生背景

hadoop只有输入没有输出
File Input Format Counters
Bytes Read=26721

问题相关代码,请勿粘贴截图

GraduateMapper

package com.mystudy.hadoopPro;



import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class GraduateMapper extends Mapper<LongWritable,Text,Text,IntWritable>{

    String file_name; 
    @Override 
    protected void setup(Mapper<LongWritable,Text,Text,IntWritable>.Context context) 
            throws IOException, InterruptedException { 
        FileSplit fs = (FileSplit) context.getInputSplit(); 
        file_name = fs.getPath().getName(); } 
    
    @Override 
    protected void map(LongWritable key, Text value, Mapper<LongWritable,Text,Text,IntWritable>.Context context)
            throws IOException, InterruptedException { 
        // TODO Auto-generated method stub //super.map(key, value, context); 
        String[] info = value.toString().split(","); 
        if(file_name.contains("sq")) { 
            if(info.length > 5 && info[5].isEmpty()) {
                context.write(new Text("商丘 未就业"), new IntWritable(1)); 
                }else { context.write(new Text("商丘 就业"), new IntWritable(1));
                }
            if( info[5].contains("学") && info[6]=="是") { 
                context.write(new Text("商丘 成功"), new IntWritable(1)); 
                }else { context.write(new Text("商丘 考研"), new IntWritable(1)); }
             }else if(file_name.contains("ly")) { 
                 if(info[6].contains("岗")) {
                        context.write(new Text("洛阳 就业"), new IntWritable(1));
                        }else { context.write(new Text("洛阳 未就业"), new IntWritable(1)); 
                        }
                    
                    if(info.length > 6 && info[6].length()!=0 && info[6].contains("考研上岸")) { 
                        context.write(new Text("洛阳 成功"), new IntWritable(1)); 
                        }else { context.write(new Text("洛阳 考研"), new IntWritable(1)); }} 
                else if(file_name.contains("cd")) { 
                    if( info[4].isEmpty()) {
                        context.write(new Text("承德 未就业"), new IntWritable(1)); 
                        }else { context.write(new Text("承德 就业"), new IntWritable(1));}
                    }else if(file_name.contains("xc")) { 
                        if( info[6]=="在岗" || info[6]=="实习") {
                            context.write(new Text("许昌 就业"), new IntWritable(1));
                            }else { context.write(new Text("许昌 未就业"), new IntWritable(1)); 
                            }
//                                
                                    
                                    if(info[6].contains("录取") || info[7].contains("录取")) { 
                                        context.write(new Text("许昌 成功"), new IntWritable(1)); 
                                        }else { context.write(new Text("许昌 考研"), new IntWritable(1)); }
                    }  else if(file_name.contains("zz")) { 
                        if(info[5].contains("大学") && info[5].isEmpty()) {
                            context.write(new Text("郑州 未就业"), new IntWritable(1)); 
                            }else { context.write(new Text("郑州 就业"), new IntWritable(1));
                            }
                        
                        if( info[6]=="是" && info[5].contains("大学")) { 
                            context.write(new Text("郑州 成功"), new IntWritable(1)); 
                            }else { context.write(new Text("郑州 考研"), new IntWritable(1)); }
        } }}

GraduateReduce

package com.mystudy.hadoopPro;

import java.io.IOException; 
import java.math.BigDecimal; 
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 

public class GraduateReducer extends Reducer<Text,IntWritable,Text,DoubleWritable>{


    Map<String,Integer> map = new HashMap<String,Integer>(); 
    int count_cd = 0; 
    int count_xc = 0; 
    @Override 
    protected void reduce(Text arg0, Iterable<IntWritable> arg1, Reducer<Text,IntWritable,Text,DoubleWritable>.Context arg2) 
            throws IOException, InterruptedException { 
        // TODO Auto-generated method stub 
        //super.reduce(arg0, arg1, arg2); 
        if(arg0.toString().equals("承德")) { 
            int total1 = 0;
            for(IntWritable v:arg1) { 
                total1+=v.get(); count_cd++; } 
            map.put(arg0.toString(), total1);
            }else if(arg0.toString().equals("许昌")) {
                int total2 = 0; 
                for(IntWritable v:arg1) { 
                    total2+=v.get(); count_xc++; }
                map.put(arg0.toString(), total2); 
                }else if(arg0.toString().contains("商丘")) {
                    int total3 = 0; 
                    for(IntWritable v:arg1) { 
                        total3+=v.get(); } 
                    map.put(arg0.toString(), total3); 
                    }else if(arg0.toString().contains("洛阳")) {
                        int total4 = 0; 
                        for(IntWritable v:arg1) { 
                            total4+=v.get(); 
                            } map.put(arg0.toString(),total4); 
                            } } 
    @Override 
    protected void cleanup(Reducer<Text,IntWritable,Text,DoubleWritable>.Context context) 
            throws IOException, InterruptedException { 
        int total_sq1 = map.get("商丘 未就业")+map.get("商丘 就业");
        int total_sq2 = map.get("商丘 成功")+map.get("商丘 考研");
        double rate_sq1 = new BigDecimal((float)map.get("商丘 就业")/total_sq1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        double rate_sq2 = new BigDecimal((float)map.get("商丘 成功")/total_sq2).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        context.write(new Text("商丘的升学率是"), new DoubleWritable(rate_sq1));
        
        int total_xc1 = map.get("许昌 未就业")+map.get("许昌 就业");
        int total_xc2 = map.get("许昌 成功")+map.get("许昌 考研");
        double rate_xc1 = new BigDecimal((float)map.get("许昌 就业")/total_xc1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        double rate_xc2 = new BigDecimal((float)map.get("许昌 成功")/total_xc2).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        
        int total_ly1 = map.get("洛阳 未就业")+map.get("洛阳 就业");
        int total_ly2 = map.get("洛阳 成功")+map.get("洛阳 考研");
        double rate_ly1 = new BigDecimal((float)map.get("洛阳 就业")/total_ly1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        double rate_ly2 = new BigDecimal((float)map.get("洛阳 成功")/total_ly2).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        
        
        
        int total_cd1 = map.get("承德 未就业")+map.get("承德 就业");
        //int total_cd2 = map.get("承德 成功")+map.get("承德 考研");
        double rate_cd1 = new BigDecimal((float)map.get("承德 就业")/total_cd1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        //double rate_cd2 = new BigDecimal((float)map.get("承德 成功")/total_cd2).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        
        int total_zz1 = map.get("郑州 未就业")+map.get("郑州 就业");
        int total_zz2 = map.get("郑州 成功")+map.get("郑州 考研");
        double rate_zz1 = new BigDecimal((float)map.get("郑州 就业")/total_zz1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        double rate_zz2 = new BigDecimal((float)map.get("郑州 成功")/total_zz2).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        
        
//        context.write(new Text("商丘的就业率是"), new DoubleWritable(rate_sq1));
//        context.write(new Text("商丘的升学率是"), new DoubleWritable(rate_sq2));
//        
//        context.write(new Text("许昌的就业率是"), new DoubleWritable(rate_xc1));
//        context.write(new Text("许昌的升学率是"), new DoubleWritable(rate_xc2));
//        
//        context.write(new Text("洛阳的就业率是"), new DoubleWritable(rate_ly1));
//        context.write(new Text("洛阳的升学率是"), new DoubleWritable(rate_ly2));
//        
//        context.write(new Text("承德的就业率是"), new DoubleWritable(rate_cd1));
//        //context.write(new Text("承德的升学率是"), new DoubleWritable(rate_cd2));
//        
//        context.write(new Text("郑州的就业率是"), new DoubleWritable(rate_zz1));
//        context.write(new Text("郑州的升学率是"), new DoubleWritable(rate_zz2));
        
        context.write(new Text("河南的就业率是"), new DoubleWritable((rate_sq1+rate_xc1+rate_ly1+rate_cd1+rate_zz1)/5));
        context.write(new Text("河南的升学率是"), new DoubleWritable((rate_sq2+rate_xc2+rate_ly2+rate_zz2)/4));
        } }

GraduateMain


package com.mystudy.hadoopPro;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
public class GraduateMain {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        BasicConfigurator.configure();
        Configuration conf = new Configuration();
        String OUTPATH = "hdfs://localhost:8020/out";
        String INPUTPATH = "hdfs://localhost:8020/001/input";
        Path path = new Path(OUTPATH);
        FileSystem fs = path.getFileSystem(conf);
        if(fs.exists(path)) {
            fs.delete(path,true);
        }
        
        Job job = Job.getInstance(conf);
        job.setJarByClass(GraduateMain.class);
        
        job.setMapperClass(GraduateMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        job.setReducerClass(GraduateReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        
        FileInputFormat.setInputPaths(job, new Path(INPUTPATH));
        FileOutputFormat.setOutputPath(job,  new Path(OUTPATH));
        
        job.waitForCompletion(true);
    }
}
运行结果及报错内容
我的解答思路和尝试过的方法
我想要达到的结果
  • 写回答

1条回答 默认 最新

  • he___H 2022-07-10 09:50
    关注

    问题解决了,不好提供数据源,是201.42的虚拟机实验
    Mapper
    package com.mystudy.hadoopPro;

    import java.io.IOException;
    import java.util.regex.Pattern;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;

    public class GraduateMapper extends Mapper<LongWritable,Text,Text,IntWritable>{

    String file_name; 
    @Override 
    protected void setup(Mapper<LongWritable,Text,Text,IntWritable>.Context context) 
            throws IOException, InterruptedException { 
        FileSplit fs = (FileSplit) context.getInputSplit(); 
        file_name = fs.getPath().getName(); } 
    
    @Override 
    protected void map(LongWritable key, Text value, Mapper<LongWritable,Text,Text,IntWritable>.Context context)
            throws IOException, InterruptedException { 
        // TODO Auto-generated method stub //super.map(key, value, context); 
         String[] info = value.toString().split(",");
            if(file_name.contains("sq")) {
                if(info.length > 5 && info[5].isEmpty()) {
                    context.write(new Text("商丘 未就业"), new IntWritable(1));
                }else {
                    context.write(new Text("商丘 就业"), new IntWritable(1));
                }
                if(info.length > 6 && info[6].length()!=0 && info[6].equals("是")) { 
                    context.write(new Text("商丘 成功"), new IntWritable(1)); 
                    }else { context.write(new Text("商丘 考研"), new IntWritable(1)); }
                
            }else if(file_name.contains("ly")) {
                if(info.length > 6 && info[6].length()!=0 && info[6].contains("考研上岸")) {
                    context.write(new Text("洛阳 成功"), new IntWritable(1));
                }else {
                    context.write(new Text("洛阳 考研"), new IntWritable(1));
                }
                
            
                if(info.length > 6 && info[6].length()!=0 && info[6].contains("岗")) {
                    context.write(new Text("洛阳 未就业"), new IntWritable(1));
                    }else { context.write(new Text("洛阳 就业"), new IntWritable(1)); 
                    }
                
            }else if(file_name.contains("cd")) {
                if(info.length>8) {
                    context.write(new Text("承德 就业"), new IntWritable(1));
                    }else { context.write(new Text("承德 未就业"), new IntWritable(1)); }
    }else if(file_name.contains("xc")) {
        if( info[6].equals("未就业")) {
            context.write(new Text("许昌 未就业"), new IntWritable(1));
            }else { context.write(new Text("许昌 就业"), new IntWritable(1)); 
            }
        
        if(info[6].contains("录取") || info[7].contains("录取")) { 
            context.write(new Text("许昌 成功"), new IntWritable(1)); 
            }else { context.write(new Text("许昌 考研"), new IntWritable(1)); }
            }
            
     else if(file_name.contains("zz")) { 
            if( info.length > 6 &&  (info[5].contains("大学") || info[5].length()==0)) {
                context.write(new Text("郑州 未就业"), new IntWritable(1)); 
                }else { context.write(new Text("郑州 就业"), new IntWritable(1));
                }
            
            if(info.length > 6 &&  info[6].length()!=0 && info[6].equals("是")) { 
                context.write(new Text("郑州 成功"), new IntWritable(1)); 
                }else { context.write(new Text("郑州 考研"), new IntWritable(1)); }
    

    }

        }
    }
    

    Reducer
    package com.mystudy.hadoopPro;

    import java.io.IOException;
    import java.math.BigDecimal;
    import java.util.HashMap;
    import java.util.Map;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    public class GraduateReducer extends Reducer<Text,IntWritable,Text,DoubleWritable>{

    Map<String,Integer> map = new HashMap<String,Integer>(); 
    int count_cd = 0; 
    int count_xc = 0; 
    @Override 
    protected void reduce(Text arg0, Iterable<IntWritable> arg1, Reducer<Text,IntWritable,Text,DoubleWritable>.Context arg2) 
            throws IOException, InterruptedException { 
        // TODO Auto-generated method stub 
        //super.reduce(arg0, arg1, arg2); 
        if(arg0.toString().contains("承德")) { 
            int total1 = 0;
            for(IntWritable v:arg1) { 
                total1+=v.get(); count_cd++; } 
            map.put(arg0.toString(), total1);
            }else if(arg0.toString().contains("许昌")) {
                int total2 = 0; 
                for(IntWritable v:arg1) { 
                    total2+=v.get(); count_xc++; }
                map.put(arg0.toString(), total2); 
                }else if(arg0.toString().contains("商丘")) {
                    int total3 = 0; 
                    for(IntWritable v:arg1) { 
                        total3+=v.get(); } 
                    map.put(arg0.toString(), total3); 
                    }else if(arg0.toString().contains("洛阳")) {
                        int total4 = 0; 
                        for(IntWritable v:arg1) { 
                            total4+=v.get(); 
                            } map.put(arg0.toString(),total4); 
                            } else if(arg0.toString().contains("郑州")) {
                                int total5 = 0; 
                                for(IntWritable v:arg1) { 
                                    total5+=v.get(); 
                                    } map.put(arg0.toString(),total5); 
                                    }} 
    @Override 
    protected void cleanup(Reducer<Text,IntWritable,Text,DoubleWritable>.Context context) 
            throws IOException, InterruptedException { 
        
        int total_xc1 = map.get("许昌 就业")+map.get("许昌 未就业");
        int total_xc2 = map.get("许昌 成功")+map.get("许昌 考研");
        double rate_xc1 = new BigDecimal((float)map.get("许昌 未就业")/total_xc1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        double rate_xc2 = new BigDecimal((float)map.get("许昌 成功")/total_xc2).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        
        int total_cd1 = map.get("承德 未就业")+map.get("承德 就业");
        double rate_cd1 = new BigDecimal((float)map.get("承德 就业")/total_cd1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        
        int total_sq1 = map.get("商丘 未就业")+map.get("商丘 就业");
        int total_sq2 = map.get("商丘 成功")+map.get("商丘 考研");
        double rate_sq1 = new BigDecimal((float)map.get("商丘 就业")/total_sq1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        double rate_sq2 = new BigDecimal((float)map.get("商丘 成功")/total_sq2).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        //context.write(new Text("商丘的升学率是"), new DoubleWritable(rate_sq1));
        
        int total_ly1 = map.get("洛阳 未就业")+map.get("洛阳 就业");
        int total_ly2 = map.get("洛阳 成功")+map.get("洛阳 考研");
        double rate_ly1 = new BigDecimal((float)map.get("洛阳 就业")/total_ly1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        double rate_ly2 = new BigDecimal((float)map.get("洛阳 成功")/total_ly2).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        
        int total_zz1 = map.get("郑州 未就业")+map.get("郑州 就业");
        int total_zz2 = map.get("郑州 成功")+map.get("郑州 考研");
        double rate_zz1 = new BigDecimal((float)map.get("郑州 就业")/total_zz1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        double rate_zz2 = new BigDecimal((float)map.get("郑州 成功")/total_zz2).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
        
        //context.write(new Text("许昌的升学率是"), new DoubleWritable(rate_xc2));
        //context.write(new Text("郑州的就业率是"), new DoubleWritable(rate_zz1));
        //context.write(new Text("郑州的升学率是"), new DoubleWritable(rate_zz2));
        //context.write(new Text("承德的就业率是"), new DoubleWritable(rate_cd1));
        //context.write(new Text("洛阳的就业率是"), new DoubleWritable(rate_ly1));
        //context.write(new Text("商丘的就业率是"), new DoubleWritable(rate_sq1));
        //context.write(new Text("洛阳的升学率是"), new DoubleWritable(rate_ly2));
        //context.write(new Text("许昌的就业率是"), new DoubleWritable(rate_xc1));
        
        context.write(new Text("河南的升学率是"), new DoubleWritable((rate_sq2+rate_xc2+rate_ly2+rate_zz2)/4));
        context.write(new Text("河南的就业率是"), new DoubleWritable((rate_sq1+rate_xc1+rate_ly1+rate_cd1+rate_zz1)/5));
    }
    

    }

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 系统已结题 7月18日
  • 已采纳回答 7月10日
  • 创建了问题 7月9日

悬赏问题

  • ¥15 kafka 分区副本增加会导致消息丢失或者不可用吗?
  • ¥15 微信公众号自制会员卡没有收款渠道啊
  • ¥15 stable diffusion
  • ¥100 Jenkins自动化部署—悬赏100元
  • ¥15 关于#python#的问题:求帮写python代码
  • ¥20 MATLAB画图图形出现上下震荡的线条
  • ¥15 关于#windows#的问题:怎么用WIN 11系统的电脑 克隆WIN NT3.51-4.0系统的硬盘
  • ¥15 perl MISA分析p3_in脚本出错
  • ¥15 k8s部署jupyterlab,jupyterlab保存不了文件
  • ¥15 ubuntu虚拟机打包apk错误