m0_37786032
Baymaxy
2017-03-06 11:35

MapReduce中reduce函数不执行

  • 解决方案
  • mapreduce

准备自己写一个代码熟悉一下mapreduce,但是写好之后发现reduce函数不执行,运行程序也没有报错,逛了很多论坛都没有解决方案,因为是初步接触mapreduce,所以对mapreduce的编程不太了解,希望各位大大帮我看下代码有没有问题。
代码如下:
Mapper:

 package Utils;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class BayMapper extends Mapper<Object, Text, Cell, Text> {
    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
        StringTokenizer itr = new StringTokenizer(value.toString());
        Cell[][] cells = new Cell[ClusterConfig.cellRow][ClusterConfig.cellColumn];
        int cellx = 0;
        int celly = 0;
        for(int i = 0;i<ClusterConfig.cellRow;i++)
            for(int j = 0;j<ClusterConfig.cellColumn;j++){
                cells[i][j] = new Cell();
            }

        while(itr.hasMoreTokens()){
            String outValue = new String(itr.nextToken());
            System.out.println(outValue);
            String[] list = outValue.split(",");

            //list.length = 2;
            for(int i = 0;i<list.length;i++){
                double x;
                double y;
                x = Double.valueOf(list[0]);
                y = Double.valueOf(list[1]);

                cellx = (int) Math.ceil((x - ClusterConfig.xmin)
                        / ClusterConfig.intervalx);
                celly = (int) Math.ceil((y - ClusterConfig.ymin)
                        / ClusterConfig.intervaly);
                //cells[cellx][celly].addnumberPoints();                //传入该格子中点的个数
            }
            context.write(cells[cellx][celly],new Text(outValue));
        }
    }
}

Reducer:

 package Utils;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class BayReducer extends Reducer<Cell, Text, Cell, IntWritable> {

    @Override
    protected void reduce(Cell key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
        int count = 0;
        Iterator<Text> iterator = values.iterator();
        while(iterator.hasNext()){
            count ++;
        }
        if(count >= 20){
            context.write(key,new IntWritable(count));
        }
    }
}

Driver:

 package Cluster;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import Utils.BayMapper;
import Utils.BayReducer;
import Utils.Cell;

public class ClusterDriver {

    /**
     * @param args
     * @throws IOException
     * @throws InterruptedException
     * @throws ClassNotFoundException
     */
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        conf.set("mapred.job.tracker", "localhost:9000");
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();

        if (otherArgs.length != 2) {
            System.err.println("Usage: Data Cluster <in> <out>");
            System.exit(2);
        }

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "Baymax Cluster");
        job.setJarByClass(ClusterDriver.class);

        job.setMapperClass(BayMapper.class);
        job.setReducerClass(BayReducer.class);

        job.setOutputKeyClass(Cell.class);
        job.setOutputValueClass(IntWritable.class);

        Path in = new Path(otherArgs[0]);
        Path out = new Path(otherArgs[1]);

        FileInputFormat.addInputPath(job, in);// 设置输入路径
        FileOutputFormat.setOutputPath(job, out);// 设置输出路径

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

  • 点赞
  • 回答
  • 收藏
  • 复制链接分享

1条回答

为你推荐