Hadoop自定义分组和多ReductTask出现异常

我现在有三个节点 程序在windows下编写,并将Job提交到了集群的Yarn上去执行,出现异常.但是在Linux下使用Hadoop jar 执行是可以的.之前在执行WordCount和其他小程序时候, 并没有出错,我认为错误原因在于这个ReductTask.请大牛指导一下.万分感谢..

 2015-12-04 15:33:43,100 INFO  [main] client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at hadoop01/10.5.110.250:8032
2015-12-04 15:33:43,458 WARN  [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2015-12-04 15:33:43,478 WARN  [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(259)) - No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
2015-12-04 15:33:43,525 INFO  [main] input.FileInputFormat (FileInputFormat.java:listStatus(280)) - Total input paths to process : 1
2015-12-04 15:33:43,573 INFO  [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:1
2015-12-04 15:33:43,655 INFO  [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_1449213919153_0002
2015-12-04 15:33:43,744 INFO  [main] mapred.YARNRunner (YARNRunner.java:createApplicationSubmissionContext(369)) - Job jar is not present. Not adding any jar to the list of resources.
2015-12-04 15:33:43,778 INFO  [main] impl.YarnClientImpl (YarnClientImpl.java:submitApplication(204)) - Submitted application application_1449213919153_0002
2015-12-04 15:33:43,807 INFO  [main] mapreduce.Job (Job.java:submit(1289)) - The url to track the job: http://hadoop01:8088/proxy/application_1449213919153_0002/
2015-12-04 15:33:43,808 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1334)) - Running job: job_1449213919153_0002
2015-12-04 15:33:46,823 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1355)) - Job job_1449213919153_0002 running in uber mode : false
2015-12-04 15:33:46,825 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) -  map 0% reduce 0%
2015-12-04 15:33:46,833 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1375)) - Job job_1449213919153_0002 failed with state FAILED due to: Application application_1449213919153_0002 failed 2 times due to AM Container for appattempt_1449213919153_0002_000002 exited with  exitCode: -1000 due to: File file:/tmp/hadoop-yarn/staging/lixiwei/.staging/job_1449213919153_0002/job.splitmetainfo does not exist
.Failing this attempt.. Failing the application.
2015-12-04 15:33:46,861 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 0

程序如下:

 public class FlowSumArea
{
    public static class FlowSumAreaMapper
            extends Mapper<LongWritable, Text, Text, FlowBean>
    {
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, FlowBean>.Context context)
                        throws IOException, InterruptedException
        {
            String line = value.toString();
            String[] fields = StringUtils.split(line, "\t");
            String phoneNo = fields[1];
            long upFlow = Long.parseLong(fields[7]);
            long downFLow = Long.parseLong(fields[8]);

            context.write(new Text(phoneNo),
                    new FlowBean(phoneNo, upFlow, downFLow));
        }
    }

    public static class FlowSumAreaReducer
            extends Reducer<Text, FlowBean, Text, FlowBean>
    {
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values,
                Reducer<Text, FlowBean, Text, FlowBean>.Context context)
                        throws IOException, InterruptedException
        {
            long upFlowCounter = 0;
            long downFlowCounter = 0;
            for (FlowBean bean : values)
            {
                upFlowCounter += bean.getUpFlow();
                downFlowCounter += bean.getDownFlow();
            }

            context.write(key, new FlowBean(key.toString(), upFlowCounter,
                    downFlowCounter));
        }
    }

    public static void main(String[] args)
            throws IOException, ClassNotFoundException, InterruptedException
    {
        // 1.获取配置文件
        Configuration conf = new Configuration();
        // 2.设置Job
        Job job = Job.getInstance();
        job.setJarByClass(FlowSumArea.class);
        job.setMapperClass(FlowSumAreaMapper.class);
        job.setReducerClass(FlowSumAreaReducer.class);

        job.setPartitionerClass(AreaPartitioner.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 设置Reduce的任务并发数,应该跟分组的数量保持一致
        job.setNumReduceTasks(6);
        // 3.设置输入输出路径
         FileInputFormat.setInputPaths(job, new Path("C:\\Users\\51195\\Desktop\\flow\\flowarea\\srcdata"));
         FileOutputFormat.setOutputPath(job, new Path("C:\\Users\\51195\\Desktop\\flow\\flowarea\\outputdata6"));
//      FileInputFormat.setInputPaths(job, new Path(args[0]));
//      FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

这个是分组程序

 public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{

    private static HashMap<String,Integer> areaMap = new HashMap<>();

    static{
        areaMap.put("135", 0);
        areaMap.put("136", 1);
        areaMap.put("137", 2);
        areaMap.put("138", 3);
        areaMap.put("139", 4);
    }





    @Override
    public int getPartition(KEY key, VALUE value, int numPartitions) {
        //从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号

        int areaCoder  = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3));

        return areaCoder;
    }

}

1个回答

不知道楼主后来解决了没有

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问