2 qq 806913882 qq_806913882 于 2017.01.09 17:38 提问

hadoop中reducer中使用treemap,新的value会覆盖所有旧的value

在编写topN类型的mapreduer程序时,有一种方法是,使用treemap的排序功能,把要排序的字段保存为key,需要输出的字段保存为value,最后根据是取最大还是取最小来保留treemap的最前方元素还是最后方元素。但是我在编写这个程序时,把treemap的value设置成我自定义的一个类,然后,就发生了,每次put元素时,都会把treemap中所有key的所有value设置成新put进来元素的value值,十分蛋疼。
自定义的类student 继承 Writable类,只用一个Text的name属性,和一个int的age属性。student的代码在最后。

mapreducer的代码如下,

map就是输出类型为,key是随机生成的0-9之间的数字,studet的name是从输入的value,age是随机的,这个属性没用。

reducer的输出类型为,key就是map的key,Text是treemap的toString(),也就是执行一次reducer,treemap的内容。

treemap的key就是reducer传过来的key(即LongWritable的long),value就是传过来的student。按照key的大小,取key最大的三个。因为treemap已经是升序排列好了,所以其实只要去treemap的后三个就行了。

具体问题的说明,在输出文件的下面。

    public static class MapImpl extends
            Mapper<LongWritable, Text, LongWritable, student> {

        private Random ra = new Random(System.currentTimeMillis());
        private LongWritable k = new LongWritable();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            int d = ra.nextInt(10);
            student s = new student();
            s.setName(value);
            s.setAge(ra.nextInt(100));

            k.set(d);
            context.write(k, s);
        }
    }

    public static class ReducerImpl extends
            Reducer<LongWritable, student, LongWritable, Text> {
        private int n = 0;

        private TreeMap<Long, ArrayList<student>> tmap = new TreeMap<Long, ArrayList<student>>();

        private static NullWritable k = NullWritable.get();

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            super.setup(context);
            n = Integer.valueOf(context.getConfiguration().get("n", "5"));
        }

        @Override
        protected void reduce(LongWritable arg0, Iterable<student> arg1,
                Context arg2) throws IOException, InterruptedException {

            Long k = arg0.get();
            for (student text : arg1) {

                ArrayList<student> arrayList = tmap.get(k);
                if (arrayList == null) {
                    arrayList = new ArrayList<student>();
                }
                arrayList.add(text);
                tmap.put(k, arrayList);
            }

            while (tmap.size() > n) {
                Long firstKey = tmap.firstKey();
                tmap.remove(firstKey);
            }
            arg2.write(arg0, new Text(tmap.toString()));
        }
    }

    public static void main(String[] args) {

        try {
            Configuration conf = new Configuration();

            conf.set("n", "3");

            String[] argss = new GenericOptionsParser(conf, args)
                    .getRemainingArgs();

            if (argss.length != 2) {
                System.out.println("parameter error");
                System.exit(1);
            }

            Job job = Job.getInstance(conf);

            job.setJarByClass(test02.class);


            job.setMapperClass(MapImpl.class);
            job.setReducerClass(ReducerImpl.class);

            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(student.class);
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(Text.class);


            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);

            Path inpath = new Path(argss[0]);
            Path outpath = new Path(argss[1]);

            HadoopUtils.PathCheck(conf, inpath, outpath);
            FileInputFormat.addInputPath(job, inpath);

            FileOutputFormat.setOutputPath(job, outpath);

            System.exit(job.waitForCompletion(true) ? 0 : 1);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

输入文件如下

a
b
c
d
e
f
g
h
i

输出文件如下

0   {0=[d   55]}
2   {0=[b   6], 2=[b    6, b    6]}
3   {0=[f   86], 2=[f   86, f   86], 3=[f   86]}
4   {2=[h   37, h   37], 3=[h   37], 4=[h   37]}
5   {3=[a   98], 4=[a   98], 5=[a   98, a   98]}
6   {4=[i   18], 5=[i   18, i   18], 6=[i   18]}
8   {5=[g   55, g   55], 6=[g   55], 8=[g   55]}

这样可以很清楚的看到,本来第一次reducer之后,key为0的键值还是[d 55],但是第二次reducer之后,key为0的键值被[b 6]给覆盖了,而[b 6]就是新进来的key为2的键值。而且,key为2的键值[b 6]竟然有两个。因为输出文件每行的字母都不相同,所以产生的也不应该有重复的呀,很费解。

student 类 继承 Writable类,只用一个Text的name属性,和一个int的age属性。

public class student implements Writable {
    Text name = new Text();
    int age = 0;

    public Text getName() {
        return name;
    }

    public void setName(Text name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return name + "\t" + age;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        this.name.write(out);
        out.writeInt(this.age);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.name.readFields(in);
        this.age = in.readInt();
    }

}
Csdn user default icon
上传中...
上传图片
插入图片
准确详细的回答,更有利于被提问者采纳,从而获得C币。复制、灌水、广告等回答会被删除,是时候展现真正的技术了!
其他相关推荐
众所周知,Hadoop框架使用Mapper将数据处理成一个个的key/value键值对,在网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出。这其中假如我们有10亿
 众所周知,Hadoop框架使用Mapper将数据处理成一个个的key/value键值对,在网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出。这其中假如我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输(网络带宽严重被占降低程序效率),所有数据都经过reduce处理,造成Reducer的巨大压力,从而大大降低程序的性能。     
TreeMap按照value进行排序
TreeMap底层是根据红黑树的数据结构构建的,默认是根据key的自然排序来组织(比如integer的大小,String的字典排序)。所以,TreeMap只能根据key来排序,是不能根据value来排序的(否则key来排序根本就不能形成TreeMap)。 今天有个需求,就是要根据treeMap中的value排序。所以网上看了一下,大致的思路是把TreeMap的EntrySet转换成lis
关于TreeMap中如何输出排序好第一个key及value
在做实验的时候,遇到了需要从treemap中得到第一个 key和value。我们都知道,treemap会根据键值进行排序(字典序),那怎么得到第一个value呢?通过查看treemap的方法文件介绍,我写了以下测试程序package test;import java.util.ArrayList;import java.util.Collections;import java.util.Linke...
Hadoop-2.4.1学习之Mapper和Reducer
Hadoop-2.4.1中MapReduce作业的Mapper和Reducer综述
TreeMap集合中的两种取出元素方式
TreeMap集合中取出元素的两种方式: 1.直接获取该TreeMap集合中的关系: entrySet():Map集合中的方法,返回值类型是该集合中的各个关系;返回值类型是:Set类型的Map.EntrySet类型;然后在通过Set集合中特有的元素取出方式:将集合中的各个元素迭代取出; 例子: import java.util.*; class MapDemo{ pulbic stat
Reducer类——hadoop
1、Reducer类 , 由Map过程输出的一组键值对【(k2;v2)】将被进行合并处理,将同样主键下的不同value合并到一个列表【v2】中,因此Reduce的输入为(k2;【v2】)。Reducer对传入的中间结果列表数据进行某种整理或进一步的处理,并产生最终的某种形式的结果输出【(k3;v3)】 一个示例 Reducer类的基本定义 public static class IntSumR
关于使用TreeMap按照value进行排序的解决方案
今天在写Dijkstra's Algorithm用heap实现的过程中,遇到类似于使用TreeMap按照value进行排序的情况,即每个node对应一个当前最短路,将这些node按照最短路的距离排序,但是同时需要做到随时找到该点并作必要的操作。由于TreeMap是按键值排序并且不能有重复,所以产生了问题,以下提出两个解决办法: 使用TreeMap按照value进行排序 具体的实现过程可
使用比较器对Treemap按照value进行排序
使用比较器对Treemap按照value进行排序(value值只有是string类型时才适用)方式一public class MapSortDemo { public static void main(String[] args) { Map<String, String> map = new TreeMap<String, String>(); map.put("
对TreeMap按照value进行排序
public class Testing {          public static void main(String[] args) {              HashMap map = new HashMap();           ValueComparator bvc =  new ValueComparator(map);           TreeMapLon
mapreduce中的context类
在Mapper中的map、以及Reducer中的reduce都有一个Context的类型  1 public void map(Object key, Textvalue, Context context)  2                 throwsOException,InterruptedException{  3    StringTokenizer itr = new Str