Hadoop mapreduce传值问题

最近mapreduce编写遇到了问题。在step4中,reduce可以同时收到从map中传来的A和B两组数据。但是在step5中的reudce却无法同时收到A、B两组数据,出现了有A没B,有B没A的现象,即A和B无法在同一次循环中出现。

step5,我几乎是从step4复制过来的,很奇怪他们的执行步骤为什么不一样。

step4

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.commons.net.telnet.EchoOptionHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;

//同现矩阵和用户偏好矩阵相乘
public class Step4 {
    public static boolean run(Configuration con, Map<String, String>map) {
        try {
            FileSystem fs = FileSystem.get(con);
            Job job = Job.getInstance();
            job.setJobName("step4");
            job.setJarByClass(App.class);
            job.setMapperClass(Step4_Mapper.class);
            job.setReducerClass(Step4_Reducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            FileInputFormat.setInputPaths(job, 
                    new Path[] { 
                            new Path(map.get("Step4Input1")),
                            new Path(map.get("Step4Input2"))
                            });
            Path outpath = new Path(map.get("Step4Output"));
            if(fs.exists(outpath)){
                fs.delete(outpath,true);
            }
            FileOutputFormat.setOutputPath(job, outpath);
            boolean f = job.waitForCompletion(true);
            return f;
        }catch(Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    static class Step4_Mapper extends Mapper<LongWritable, Text, Text, Text>{
        private String flag;

        //每次map时都会先判断一次
        @Override
        protected void setup(Context context )throws IOException,InterruptedException{
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getParent().getName();

            System.out.print(flag+ "*************************");
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String[] tokens = Pattern.compile("[\t,]").split(value.toString());

            //物品共现矩阵
            if(flag.equals("step3")) {
                //  i2:i3 1
                // i2:i2 2

                String[] v1 = tokens[0].split(":");
                String itemID1 = v1[0];
                String itemID2 = v1[1];
                String num = tokens[1];

                Text k = new Text(itemID1);
                Text v = new Text("A:"+itemID2+","+num); //A:i2,1

                context.write(k,v);

            }else if(flag.equals("step2")) {//用户评价矩阵
                // u2 i1:2,i3:4
                String userID = tokens[0];
                for(int i=1;i<tokens.length;i++) {
                    String[] vector = tokens[i].split(":");
                    String itemID = vector[0]; //物品ID
                    String pref = vector[1];//评分

                    Text k = new Text(itemID);
                    Text v = new Text("B:"+userID+","+pref);

                    context.write(k, v);

                }
            }
        }
    }


    static class Step4_Reducer extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text>values, Context context) throws IOException,InterruptedException{
            //A为同现矩阵,B为用户偏好矩阵  
            //某一个物品k,针对它和其他所有物品的同现次数v,都在mapA集合中
        //  Text k = new Text(itemID1);
        //Text v = new Text("A:"+itemID2+","+num); //A:i2,1
        //  context.write(k,v);

            //和该物品(key中的itemID)同现的其他物品的同现集合
            //其他物品ID为map的key,同现数字为值
            Map<String, Integer> mapA = new HashMap<String,Integer>();

            //该物品(key中的itemID),所有用户的推荐权重分数
            Map<String, Integer>mapB = new HashMap<String,Integer>();

            for(Text line:values) {
                String val = line.toString();
                if(val.startsWith("A:")) {
                    String[] kv = Pattern.compile("[\t,]").split(val.substring(2));
                    try {
                        mapA.put(kv[0], Integer.parseInt(kv[1]));
                    }catch(Exception e) {
                        e.printStackTrace();
                    }
                }else if(val.startsWith("B:")) {
                    String[] kv = Pattern.compile("[\t,]").split(val.substring(2));
                    try {
                        mapB.put(kv[0], Integer.parseInt(kv[1]));
                    }catch(Exception e) {
                        e.printStackTrace();
                    }
                }
            }

            double result = 0;
            Iterator<String>iter = mapA.keySet().iterator();
            while(iter.hasNext()) {
                String mapk = iter.next(); //itemID

                int num =mapA.get(mapk).intValue();  // 获取同现值
                Iterator<String>iterb = mapB.keySet().iterator();
                while(iterb.hasNext()) {
                    String mapkb = iterb.next();
                    int pref = mapB.get(mapkb).intValue();
                    result = num*pref;

                    Text k = new Text(mapkb);
                    Text v = new Text(mapk+ "," + result);
                    context.write(k, v);
                }
            }
        }
    }
}

step5


import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.commons.net.telnet.EchoOptionHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;




//获得结果矩阵
public class Step5 {
    public static boolean run(Configuration con, Map<String, String>map) {
        try {
            FileSystem fs = FileSystem.get(con);
            Job job = Job.getInstance();
            job.setJobName("step5");
            job.setJarByClass(App.class);
            job.setMapperClass(Step5_Mapper.class);
            job.setReducerClass(Step5_Reducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            FileInputFormat.setInputPaths(job, 
                    new Path[] { 
                            new Path(map.get("Step5Input1")),
                            new Path(map.get("Step5Input2"))
                            });
            Path outpath = new Path(map.get("Step5Output"));
            if(fs.exists(outpath)){
                fs.delete(outpath,true);
            }
            FileOutputFormat.setOutputPath(job, outpath);
            boolean f = job.waitForCompletion(true);
            return f;
        }catch(Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    static class Step5_Mapper extends Mapper<LongWritable, Text, Text, Text>{
        private String flag;
        //每次map时都会先判断一次
        @Override
        protected void setup(Context context )throws IOException,InterruptedException{
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getParent().getName();
            System.out.print(flag+ "*************************");
        }
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String[] tokens = Pattern.compile("[\t,]").split(value.toString());

            if(flag.equals("step4")) {
                    //  i2:i3 1
                    // i2:i2 2
                Text k = new Text(tokens[0]);
                Text v = new Text("A:"+tokens[1]+","+tokens[2]);
                context.write(k, v);
            }else if(flag.equals("step2")) {//用户评价矩阵
                    // u2 i1:2,i3:4
                    String userID = tokens[0];
                    for(int i=1;i<tokens.length;i++) {
                        String[] vector = tokens[i].split(":");
                        String itemID = vector[0]; //物品ID
                        String pref = vector[1];//评分

                        Text k = new Text(itemID);
                        Text v = new Text("B:"+userID+","+pref);

                        context.write(k, v);
                    }
        }
    }
    }
        //本reduce 负责累加结果
        static class Step5_Reducer extends Reducer<Text, Text, Text, Text>{
            protected void reduce(Text key, Iterable<Text>values, Context context) throws IOException,InterruptedException{
                //其他物品ID为map的key,同现数字为值
                Map<String, Double> mapA = new HashMap<String,Double>();

                //该物品(key中的itemID),所有用户的推荐权重分数
                Map<String, Integer>mapB = new HashMap<String,Integer>();

                for(Text line : values) {
                    String val = line.toString();
                    if(val.startsWith("A:")) {
                        String[] kv = Pattern.compile("[\t,]").split(val.substring(2));
                        String tokens = kv[1];
                        String itemID = kv[0];//物品id
                        Double score = Double.parseDouble(tokens); //相乘结果

                        //相加计算
                        if(mapA.containsKey(itemID)) {
                            mapA.put(itemID, mapA.get(itemID)+score);
                        }else {
                            mapA.put(itemID, score);
                        }

                    }else if(val.startsWith("B:")) {
                        String[] kv = Pattern.compile("[\t,]").split(val.substring(2));
                        try {
                            mapB.put(kv[0], Integer.parseInt(kv[1]));
                        }catch(Exception e) {
                            e.printStackTrace();
                        }
                    }
                }


                Iterator<String> iter = mapA.keySet().iterator();
                while(iter.hasNext()) {
                    String itemID = iter.next();
                    double score = mapA.get(itemID);
                    Text v = new Text(itemID+","+score);
                    Iterator<String>iterb = mapB.keySet().iterator();
                    while(iterb.hasNext()) {
                        String mapkb = iterb.next();
                        Text k = new Text(mapkb);

                        if(k.equals(key)) {
                            continue;
                        }else {
                            context.write(key, v);
                        }
                    }
                }
            }
        }

}


step4和step5配置
图片说明

step4,在for循环中同时出现A和B
step4,在for循环中同时出现A和B

step5中,A和B无法出现在同一次循环
有A没B,此时mapB是无法点击开的
直接跳出了for循环进入下面的while循环,此时没有mapB,while无法正常进行
跳出了for循环

进行了多次step5后,输出完所有mapA之后,在下一次step5才进入mapB,此时轮到mapA是空的,而只有mapB
mapA是空的,只有mapB

1个回答

step4输出的是 UserId\tItemId,Score,也就是Step5的Map的step4数据KEY是UserId,map的step2的数据KEY是itemID,肯定没办法走到同一个循环。

cj2573718_11
cj2573718_11 谢谢,按照您的回答我已经解决了问题,可以的话能再帮我看看我的另一个问题么
2 年多之前 回复
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问
相关内容推荐