在编写topN类型的mapreduer程序时,有一种方法是,使用treemap的排序功能,把要排序的字段保存为key,需要输出的字段保存为value,最后根据是取最大还是取最小来保留treemap的最前方元素还是最后方元素。但是我在编写这个程序时,把treemap的value设置成我自定义的一个类,然后,就发生了,每次put元素时,都会把treemap中所有key的所有value设置成新put进来元素的value值,十分蛋疼。
自定义的类student 继承 Writable类,只用一个Text的name属性,和一个int的age属性。student的代码在最后。
mapreducer的代码如下,
map就是输出类型为,key是随机生成的0-9之间的数字,studet的name是从输入的value,age是随机的,这个属性没用。
reducer的输出类型为,key就是map的key,Text是treemap的toString(),也就是执行一次reducer,treemap的内容。
treemap的key就是reducer传过来的key(即LongWritable的long),value就是传过来的student。按照key的大小,取key最大的三个。因为treemap已经是升序排列好了,所以其实只要去treemap的后三个就行了。
具体问题的说明,在输出文件的下面。
public static class MapImpl extends
Mapper<LongWritable, Text, LongWritable, student> {
private Random ra = new Random(System.currentTimeMillis());
private LongWritable k = new LongWritable();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
int d = ra.nextInt(10);
student s = new student();
s.setName(value);
s.setAge(ra.nextInt(100));
k.set(d);
context.write(k, s);
}
}
public static class ReducerImpl extends
Reducer<LongWritable, student, LongWritable, Text> {
private int n = 0;
private TreeMap<Long, ArrayList<student>> tmap = new TreeMap<Long, ArrayList<student>>();
private static NullWritable k = NullWritable.get();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
n = Integer.valueOf(context.getConfiguration().get("n", "5"));
}
@Override
protected void reduce(LongWritable arg0, Iterable<student> arg1,
Context arg2) throws IOException, InterruptedException {
Long k = arg0.get();
for (student text : arg1) {
ArrayList<student> arrayList = tmap.get(k);
if (arrayList == null) {
arrayList = new ArrayList<student>();
}
arrayList.add(text);
tmap.put(k, arrayList);
}
while (tmap.size() > n) {
Long firstKey = tmap.firstKey();
tmap.remove(firstKey);
}
arg2.write(arg0, new Text(tmap.toString()));
}
}
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
conf.set("n", "3");
String[] argss = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (argss.length != 2) {
System.out.println("parameter error");
System.exit(1);
}
Job job = Job.getInstance(conf);
job.setJarByClass(test02.class);
job.setMapperClass(MapImpl.class);
job.setReducerClass(ReducerImpl.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(student.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path inpath = new Path(argss[0]);
Path outpath = new Path(argss[1]);
HadoopUtils.PathCheck(conf, inpath, outpath);
FileInputFormat.addInputPath(job, inpath);
FileOutputFormat.setOutputPath(job, outpath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
输入文件如下
a
b
c
d
e
f
g
h
i
输出文件如下
0 {0=[d 55]}
2 {0=[b 6], 2=[b 6, b 6]}
3 {0=[f 86], 2=[f 86, f 86], 3=[f 86]}
4 {2=[h 37, h 37], 3=[h 37], 4=[h 37]}
5 {3=[a 98], 4=[a 98], 5=[a 98, a 98]}
6 {4=[i 18], 5=[i 18, i 18], 6=[i 18]}
8 {5=[g 55, g 55], 6=[g 55], 8=[g 55]}
这样可以很清楚的看到,本来第一次reducer之后,key为0的键值还是[d 55],但是第二次reducer之后,key为0的键值被[b 6]给覆盖了,而[b 6]就是新进来的key为2的键值。而且,key为2的键值[b 6]竟然有两个。因为输出文件每行的字母都不相同,所以产生的也不应该有重复的呀,很费解。
student 类 继承 Writable类,只用一个Text的name属性,和一个int的age属性。
public class student implements Writable {
Text name = new Text();
int age = 0;
public Text getName() {
return name;
}
public void setName(Text name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return name + "\t" + age;
}
@Override
public void write(DataOutput out) throws IOException {
this.name.write(out);
out.writeInt(this.age);
}
@Override
public void readFields(DataInput in) throws IOException {
this.name.readFields(in);
this.age = in.readInt();
}
}