最近mapreduce编写遇到了问题。在step4中,reduce可以同时收到从map中传来的A和B两组数据。但是在step5中的reudce却无法同时收到A、B两组数据,出现了有A没B,有B没A的现象,即A和B无法在同一次循环中出现。
step5,我几乎是从step4复制过来的,很奇怪他们的执行步骤为什么不一样。
step4
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.commons.net.telnet.EchoOptionHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
//同现矩阵和用户偏好矩阵相乘
public class Step4 {
public static boolean run(Configuration con, Map<String, String>map) {
try {
FileSystem fs = FileSystem.get(con);
Job job = Job.getInstance();
job.setJobName("step4");
job.setJarByClass(App.class);
job.setMapperClass(Step4_Mapper.class);
job.setReducerClass(Step4_Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job,
new Path[] {
new Path(map.get("Step4Input1")),
new Path(map.get("Step4Input2"))
});
Path outpath = new Path(map.get("Step4Output"));
if(fs.exists(outpath)){
fs.delete(outpath,true);
}
FileOutputFormat.setOutputPath(job, outpath);
boolean f = job.waitForCompletion(true);
return f;
}catch(Exception e) {
e.printStackTrace();
}
return false;
}
static class Step4_Mapper extends Mapper<LongWritable, Text, Text, Text>{
private String flag;
//每次map时都会先判断一次
@Override
protected void setup(Context context )throws IOException,InterruptedException{
FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getParent().getName();
System.out.print(flag+ "*************************");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String[] tokens = Pattern.compile("[\t,]").split(value.toString());
//物品共现矩阵
if(flag.equals("step3")) {
// i2:i3 1
// i2:i2 2
String[] v1 = tokens[0].split(":");
String itemID1 = v1[0];
String itemID2 = v1[1];
String num = tokens[1];
Text k = new Text(itemID1);
Text v = new Text("A:"+itemID2+","+num); //A:i2,1
context.write(k,v);
}else if(flag.equals("step2")) {//用户评价矩阵
// u2 i1:2,i3:4
String userID = tokens[0];
for(int i=1;i<tokens.length;i++) {
String[] vector = tokens[i].split(":");
String itemID = vector[0]; //物品ID
String pref = vector[1];//评分
Text k = new Text(itemID);
Text v = new Text("B:"+userID+","+pref);
context.write(k, v);
}
}
}
}
static class Step4_Reducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text>values, Context context) throws IOException,InterruptedException{
//A为同现矩阵,B为用户偏好矩阵
//某一个物品k,针对它和其他所有物品的同现次数v,都在mapA集合中
// Text k = new Text(itemID1);
//Text v = new Text("A:"+itemID2+","+num); //A:i2,1
// context.write(k,v);
//和该物品(key中的itemID)同现的其他物品的同现集合
//其他物品ID为map的key,同现数字为值
Map<String, Integer> mapA = new HashMap<String,Integer>();
//该物品(key中的itemID),所有用户的推荐权重分数
Map<String, Integer>mapB = new HashMap<String,Integer>();
for(Text line:values) {
String val = line.toString();
if(val.startsWith("A:")) {
String[] kv = Pattern.compile("[\t,]").split(val.substring(2));
try {
mapA.put(kv[0], Integer.parseInt(kv[1]));
}catch(Exception e) {
e.printStackTrace();
}
}else if(val.startsWith("B:")) {
String[] kv = Pattern.compile("[\t,]").split(val.substring(2));
try {
mapB.put(kv[0], Integer.parseInt(kv[1]));
}catch(Exception e) {
e.printStackTrace();
}
}
}
double result = 0;
Iterator<String>iter = mapA.keySet().iterator();
while(iter.hasNext()) {
String mapk = iter.next(); //itemID
int num =mapA.get(mapk).intValue(); // 获取同现值
Iterator<String>iterb = mapB.keySet().iterator();
while(iterb.hasNext()) {
String mapkb = iterb.next();
int pref = mapB.get(mapkb).intValue();
result = num*pref;
Text k = new Text(mapkb);
Text v = new Text(mapk+ "," + result);
context.write(k, v);
}
}
}
}
}
step5
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.commons.net.telnet.EchoOptionHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
//获得结果矩阵
public class Step5 {
public static boolean run(Configuration con, Map<String, String>map) {
try {
FileSystem fs = FileSystem.get(con);
Job job = Job.getInstance();
job.setJobName("step5");
job.setJarByClass(App.class);
job.setMapperClass(Step5_Mapper.class);
job.setReducerClass(Step5_Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job,
new Path[] {
new Path(map.get("Step5Input1")),
new Path(map.get("Step5Input2"))
});
Path outpath = new Path(map.get("Step5Output"));
if(fs.exists(outpath)){
fs.delete(outpath,true);
}
FileOutputFormat.setOutputPath(job, outpath);
boolean f = job.waitForCompletion(true);
return f;
}catch(Exception e) {
e.printStackTrace();
}
return false;
}
static class Step5_Mapper extends Mapper<LongWritable, Text, Text, Text>{
private String flag;
//每次map时都会先判断一次
@Override
protected void setup(Context context )throws IOException,InterruptedException{
FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getParent().getName();
System.out.print(flag+ "*************************");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String[] tokens = Pattern.compile("[\t,]").split(value.toString());
if(flag.equals("step4")) {
// i2:i3 1
// i2:i2 2
Text k = new Text(tokens[0]);
Text v = new Text("A:"+tokens[1]+","+tokens[2]);
context.write(k, v);
}else if(flag.equals("step2")) {//用户评价矩阵
// u2 i1:2,i3:4
String userID = tokens[0];
for(int i=1;i<tokens.length;i++) {
String[] vector = tokens[i].split(":");
String itemID = vector[0]; //物品ID
String pref = vector[1];//评分
Text k = new Text(itemID);
Text v = new Text("B:"+userID+","+pref);
context.write(k, v);
}
}
}
}
//本reduce 负责累加结果
static class Step5_Reducer extends Reducer<Text, Text, Text, Text>{
protected void reduce(Text key, Iterable<Text>values, Context context) throws IOException,InterruptedException{
//其他物品ID为map的key,同现数字为值
Map<String, Double> mapA = new HashMap<String,Double>();
//该物品(key中的itemID),所有用户的推荐权重分数
Map<String, Integer>mapB = new HashMap<String,Integer>();
for(Text line : values) {
String val = line.toString();
if(val.startsWith("A:")) {
String[] kv = Pattern.compile("[\t,]").split(val.substring(2));
String tokens = kv[1];
String itemID = kv[0];//物品id
Double score = Double.parseDouble(tokens); //相乘结果
//相加计算
if(mapA.containsKey(itemID)) {
mapA.put(itemID, mapA.get(itemID)+score);
}else {
mapA.put(itemID, score);
}
}else if(val.startsWith("B:")) {
String[] kv = Pattern.compile("[\t,]").split(val.substring(2));
try {
mapB.put(kv[0], Integer.parseInt(kv[1]));
}catch(Exception e) {
e.printStackTrace();
}
}
}
Iterator<String> iter = mapA.keySet().iterator();
while(iter.hasNext()) {
String itemID = iter.next();
double score = mapA.get(itemID);
Text v = new Text(itemID+","+score);
Iterator<String>iterb = mapB.keySet().iterator();
while(iterb.hasNext()) {
String mapkb = iterb.next();
Text k = new Text(mapkb);
if(k.equals(key)) {
continue;
}else {
context.write(key, v);
}
}
}
}
}
}
step4和step5配置
step4,在for循环中同时出现A和B
step5中,A和B无法出现在同一次循环
直接跳出了for循环进入下面的while循环,此时没有mapB,while无法正常进行
进行了多次step5后,输出完所有mapA之后,在下一次step5才进入mapB,此时轮到mapA是空的,而只有mapB