问题遇到的现象和发生背景
hadoop只有输入没有输出
File Input Format Counters
Bytes Read=26721
问题相关代码,请勿粘贴截图
GraduateMapper
package com.mystudy.hadoopPro;
import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class GraduateMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
String file_name;
@Override
protected void setup(Mapper<LongWritable,Text,Text,IntWritable>.Context context)
throws IOException, InterruptedException {
FileSplit fs = (FileSplit) context.getInputSplit();
file_name = fs.getPath().getName(); }
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable,Text,Text,IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub //super.map(key, value, context);
String[] info = value.toString().split(",");
if(file_name.contains("sq")) {
if(info.length > 5 && info[5].isEmpty()) {
context.write(new Text("商丘 未就业"), new IntWritable(1));
}else { context.write(new Text("商丘 就业"), new IntWritable(1));
}
if( info[5].contains("学") && info[6]=="是") {
context.write(new Text("商丘 成功"), new IntWritable(1));
}else { context.write(new Text("商丘 考研"), new IntWritable(1)); }
}else if(file_name.contains("ly")) {
if(info[6].contains("岗")) {
context.write(new Text("洛阳 就业"), new IntWritable(1));
}else { context.write(new Text("洛阳 未就业"), new IntWritable(1));
}
if(info.length > 6 && info[6].length()!=0 && info[6].contains("考研上岸")) {
context.write(new Text("洛阳 成功"), new IntWritable(1));
}else { context.write(new Text("洛阳 考研"), new IntWritable(1)); }}
else if(file_name.contains("cd")) {
if( info[4].isEmpty()) {
context.write(new Text("承德 未就业"), new IntWritable(1));
}else { context.write(new Text("承德 就业"), new IntWritable(1));}
}else if(file_name.contains("xc")) {
if( info[6]=="在岗" || info[6]=="实习") {
context.write(new Text("许昌 就业"), new IntWritable(1));
}else { context.write(new Text("许昌 未就业"), new IntWritable(1));
}
//
if(info[6].contains("录取") || info[7].contains("录取")) {
context.write(new Text("许昌 成功"), new IntWritable(1));
}else { context.write(new Text("许昌 考研"), new IntWritable(1)); }
} else if(file_name.contains("zz")) {
if(info[5].contains("大学") && info[5].isEmpty()) {
context.write(new Text("郑州 未就业"), new IntWritable(1));
}else { context.write(new Text("郑州 就业"), new IntWritable(1));
}
if( info[6]=="是" && info[5].contains("大学")) {
context.write(new Text("郑州 成功"), new IntWritable(1));
}else { context.write(new Text("郑州 考研"), new IntWritable(1)); }
} }}
GraduateReduce
package com.mystudy.hadoopPro;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class GraduateReducer extends Reducer<Text,IntWritable,Text,DoubleWritable>{
Map<String,Integer> map = new HashMap<String,Integer>();
int count_cd = 0;
int count_xc = 0;
@Override
protected void reduce(Text arg0, Iterable<IntWritable> arg1, Reducer<Text,IntWritable,Text,DoubleWritable>.Context arg2)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.reduce(arg0, arg1, arg2);
if(arg0.toString().equals("承德")) {
int total1 = 0;
for(IntWritable v:arg1) {
total1+=v.get(); count_cd++; }
map.put(arg0.toString(), total1);
}else if(arg0.toString().equals("许昌")) {
int total2 = 0;
for(IntWritable v:arg1) {
total2+=v.get(); count_xc++; }
map.put(arg0.toString(), total2);
}else if(arg0.toString().contains("商丘")) {
int total3 = 0;
for(IntWritable v:arg1) {
total3+=v.get(); }
map.put(arg0.toString(), total3);
}else if(arg0.toString().contains("洛阳")) {
int total4 = 0;
for(IntWritable v:arg1) {
total4+=v.get();
} map.put(arg0.toString(),total4);
} }
@Override
protected void cleanup(Reducer<Text,IntWritable,Text,DoubleWritable>.Context context)
throws IOException, InterruptedException {
int total_sq1 = map.get("商丘 未就业")+map.get("商丘 就业");
int total_sq2 = map.get("商丘 成功")+map.get("商丘 考研");
double rate_sq1 = new BigDecimal((float)map.get("商丘 就业")/total_sq1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
double rate_sq2 = new BigDecimal((float)map.get("商丘 成功")/total_sq2).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
context.write(new Text("商丘的升学率是"), new DoubleWritable(rate_sq1));
int total_xc1 = map.get("许昌 未就业")+map.get("许昌 就业");
int total_xc2 = map.get("许昌 成功")+map.get("许昌 考研");
double rate_xc1 = new BigDecimal((float)map.get("许昌 就业")/total_xc1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
double rate_xc2 = new BigDecimal((float)map.get("许昌 成功")/total_xc2).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
int total_ly1 = map.get("洛阳 未就业")+map.get("洛阳 就业");
int total_ly2 = map.get("洛阳 成功")+map.get("洛阳 考研");
double rate_ly1 = new BigDecimal((float)map.get("洛阳 就业")/total_ly1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
double rate_ly2 = new BigDecimal((float)map.get("洛阳 成功")/total_ly2).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
int total_cd1 = map.get("承德 未就业")+map.get("承德 就业");
//int total_cd2 = map.get("承德 成功")+map.get("承德 考研");
double rate_cd1 = new BigDecimal((float)map.get("承德 就业")/total_cd1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
//double rate_cd2 = new BigDecimal((float)map.get("承德 成功")/total_cd2).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
int total_zz1 = map.get("郑州 未就业")+map.get("郑州 就业");
int total_zz2 = map.get("郑州 成功")+map.get("郑州 考研");
double rate_zz1 = new BigDecimal((float)map.get("郑州 就业")/total_zz1).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
double rate_zz2 = new BigDecimal((float)map.get("郑州 成功")/total_zz2).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
// context.write(new Text("商丘的就业率是"), new DoubleWritable(rate_sq1));
// context.write(new Text("商丘的升学率是"), new DoubleWritable(rate_sq2));
//
// context.write(new Text("许昌的就业率是"), new DoubleWritable(rate_xc1));
// context.write(new Text("许昌的升学率是"), new DoubleWritable(rate_xc2));
//
// context.write(new Text("洛阳的就业率是"), new DoubleWritable(rate_ly1));
// context.write(new Text("洛阳的升学率是"), new DoubleWritable(rate_ly2));
//
// context.write(new Text("承德的就业率是"), new DoubleWritable(rate_cd1));
// //context.write(new Text("承德的升学率是"), new DoubleWritable(rate_cd2));
//
// context.write(new Text("郑州的就业率是"), new DoubleWritable(rate_zz1));
// context.write(new Text("郑州的升学率是"), new DoubleWritable(rate_zz2));
context.write(new Text("河南的就业率是"), new DoubleWritable((rate_sq1+rate_xc1+rate_ly1+rate_cd1+rate_zz1)/5));
context.write(new Text("河南的升学率是"), new DoubleWritable((rate_sq2+rate_xc2+rate_ly2+rate_zz2)/4));
} }
GraduateMain
package com.mystudy.hadoopPro;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
public class GraduateMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
BasicConfigurator.configure();
Configuration conf = new Configuration();
String OUTPATH = "hdfs://localhost:8020/out";
String INPUTPATH = "hdfs://localhost:8020/001/input";
Path path = new Path(OUTPATH);
FileSystem fs = path.getFileSystem(conf);
if(fs.exists(path)) {
fs.delete(path,true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(GraduateMain.class);
job.setMapperClass(GraduateMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(GraduateReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.setInputPaths(job, new Path(INPUTPATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPATH));
job.waitForCompletion(true);
}
}