准备自己写一个代码熟悉一下mapreduce,但是写好之后发现reduce函数不执行,运行程序也没有报错,逛了很多论坛都没有解决方案,因为是初步接触mapreduce,所以对mapreduce的编程不太了解,希望各位大大帮我看下代码有没有问题。
代码如下:
Mapper:
package Utils;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class BayMapper extends Mapper<Object, Text, Cell, Text> {
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
StringTokenizer itr = new StringTokenizer(value.toString());
Cell[][] cells = new Cell[ClusterConfig.cellRow][ClusterConfig.cellColumn];
int cellx = 0;
int celly = 0;
for(int i = 0;i<ClusterConfig.cellRow;i++)
for(int j = 0;j<ClusterConfig.cellColumn;j++){
cells[i][j] = new Cell();
}
while(itr.hasMoreTokens()){
String outValue = new String(itr.nextToken());
System.out.println(outValue);
String[] list = outValue.split(",");
//list.length = 2;
for(int i = 0;i<list.length;i++){
double x;
double y;
x = Double.valueOf(list[0]);
y = Double.valueOf(list[1]);
cellx = (int) Math.ceil((x - ClusterConfig.xmin)
/ ClusterConfig.intervalx);
celly = (int) Math.ceil((y - ClusterConfig.ymin)
/ ClusterConfig.intervaly);
//cells[cellx][celly].addnumberPoints(); //传入该格子中点的个数
}
context.write(cells[cellx][celly],new Text(outValue));
}
}
}
Reducer:
package Utils;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class BayReducer extends Reducer<Cell, Text, Cell, IntWritable> {
@Override
protected void reduce(Cell key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
int count = 0;
Iterator<Text> iterator = values.iterator();
while(iterator.hasNext()){
count ++;
}
if(count >= 20){
context.write(key,new IntWritable(count));
}
}
}
Driver:
package Cluster;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import Utils.BayMapper;
import Utils.BayReducer;
import Utils.Cell;
public class ClusterDriver {
/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "localhost:9000");
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Data Cluster <in> <out>");
System.exit(2);
}
@SuppressWarnings("deprecation")
Job job = new Job(conf, "Baymax Cluster");
job.setJarByClass(ClusterDriver.class);
job.setMapperClass(BayMapper.class);
job.setReducerClass(BayReducer.class);
job.setOutputKeyClass(Cell.class);
job.setOutputValueClass(IntWritable.class);
Path in = new Path(otherArgs[0]);
Path out = new Path(otherArgs[1]);
FileInputFormat.addInputPath(job, in);// 设置输入路径
FileOutputFormat.setOutputPath(job, out);// 设置输出路径
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}