我现在有三个节点 程序在windows下编写,并将Job提交到了集群的Yarn上去执行,出现异常.但是在Linux下使用Hadoop jar 执行是可以的.之前在执行WordCount和其他小程序时候, 并没有出错,我认为错误原因在于这个ReductTask.请大牛指导一下.万分感谢..
2015-12-04 15:33:43,100 INFO [main] client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at hadoop01/10.5.110.250:8032
2015-12-04 15:33:43,458 WARN [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2015-12-04 15:33:43,478 WARN [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(259)) - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
2015-12-04 15:33:43,525 INFO [main] input.FileInputFormat (FileInputFormat.java:listStatus(280)) - Total input paths to process : 1
2015-12-04 15:33:43,573 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:1
2015-12-04 15:33:43,655 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_1449213919153_0002
2015-12-04 15:33:43,744 INFO [main] mapred.YARNRunner (YARNRunner.java:createApplicationSubmissionContext(369)) - Job jar is not present. Not adding any jar to the list of resources.
2015-12-04 15:33:43,778 INFO [main] impl.YarnClientImpl (YarnClientImpl.java:submitApplication(204)) - Submitted application application_1449213919153_0002
2015-12-04 15:33:43,807 INFO [main] mapreduce.Job (Job.java:submit(1289)) - The url to track the job: http://hadoop01:8088/proxy/application_1449213919153_0002/
2015-12-04 15:33:43,808 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1334)) - Running job: job_1449213919153_0002
2015-12-04 15:33:46,823 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1355)) - Job job_1449213919153_0002 running in uber mode : false
2015-12-04 15:33:46,825 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) - map 0% reduce 0%
2015-12-04 15:33:46,833 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1375)) - Job job_1449213919153_0002 failed with state FAILED due to: Application application_1449213919153_0002 failed 2 times due to AM Container for appattempt_1449213919153_0002_000002 exited with exitCode: -1000 due to: File file:/tmp/hadoop-yarn/staging/lixiwei/.staging/job_1449213919153_0002/job.splitmetainfo does not exist
.Failing this attempt.. Failing the application.
2015-12-04 15:33:46,861 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 0
程序如下:
public class FlowSumArea
{
public static class FlowSumAreaMapper
extends Mapper<LongWritable, Text, Text, FlowBean>
{
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, FlowBean>.Context context)
throws IOException, InterruptedException
{
String line = value.toString();
String[] fields = StringUtils.split(line, "\t");
String phoneNo = fields[1];
long upFlow = Long.parseLong(fields[7]);
long downFLow = Long.parseLong(fields[8]);
context.write(new Text(phoneNo),
new FlowBean(phoneNo, upFlow, downFLow));
}
}
public static class FlowSumAreaReducer
extends Reducer<Text, FlowBean, Text, FlowBean>
{
@Override
protected void reduce(Text key, Iterable<FlowBean> values,
Reducer<Text, FlowBean, Text, FlowBean>.Context context)
throws IOException, InterruptedException
{
long upFlowCounter = 0;
long downFlowCounter = 0;
for (FlowBean bean : values)
{
upFlowCounter += bean.getUpFlow();
downFlowCounter += bean.getDownFlow();
}
context.write(key, new FlowBean(key.toString(), upFlowCounter,
downFlowCounter));
}
}
public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException
{
// 1.获取配置文件
Configuration conf = new Configuration();
// 2.设置Job
Job job = Job.getInstance();
job.setJarByClass(FlowSumArea.class);
job.setMapperClass(FlowSumAreaMapper.class);
job.setReducerClass(FlowSumAreaReducer.class);
job.setPartitionerClass(AreaPartitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 设置Reduce的任务并发数,应该跟分组的数量保持一致
job.setNumReduceTasks(6);
// 3.设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("C:\\Users\\51195\\Desktop\\flow\\flowarea\\srcdata"));
FileOutputFormat.setOutputPath(job, new Path("C:\\Users\\51195\\Desktop\\flow\\flowarea\\outputdata6"));
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
这个是分组程序
public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{
private static HashMap<String,Integer> areaMap = new HashMap<>();
static{
areaMap.put("135", 0);
areaMap.put("136", 1);
areaMap.put("137", 2);
areaMap.put("138", 3);
areaMap.put("139", 4);
}
@Override
public int getPartition(KEY key, VALUE value, int numPartitions) {
//从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号
int areaCoder = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3));
return areaCoder;
}
}