问题遇到的现象和发生背景
自定义WritableComparable
运行报错
问题相关代码,请勿粘贴截图
StudentScore
```java
package mr.score;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class StudentScore implements WritableComparable<StudentScore>{
private String name;
private int sum;
private double avg;
public StudentScore() {
super();
}
public StudentScore(String name,int sum,double avg) {
this.name = name;
this.avg = avg;
this.sum = sum;
}
public void setSum(int sum) {
this.sum = sum;
}
public void setName(String name) {
this.name = name;
}
public void setAvg(double avg) {
this.avg = avg;
}
public int getSum() {
return sum;
}
public String getName() {
return name;
}
public double getAvg() {
return avg;
}
@Override
public String toString() {
return this.name + "\t" + this.sum + "\t" + this.avg;
}
@Override
public int compareTo(StudentScore o) {
int tmp = o.getSum() - this.getSum();
//如果上行流量相等就按照下行流量进行排序
if(tmp==0){
tmp = (int)(o.getAvg() -this.getAvg());
return tmp;
}
return tmp;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.name = in.readUTF();
this.sum = in.readInt();
this.avg = in.readDouble();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(name);
out.writeInt(sum);
out.writeDouble(avg);
}
}
```Mapper
```java
package mr.score;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class StudentMapper extends Mapper<LongWritable, Text, StudentScore, Text>{
StudentScore p = new StudentScore();
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//取出每一行,并按\t 进行切分
String[] split = value.toString().split("\t");
//清洗出有效数据
int chinese=Integer.parseInt(split[3]);
int math=Integer.parseInt(split[4]);
int english=Integer.parseInt(split[5]);
int sum=chinese+math+english;
Double avg=(1.0)*sum/3;
StudentScore p = new StudentScore(split[2],sum,avg);
Text ovalue = new Text();
ovalue.set(split[0]+"\t"+split[1]);
context.write(p, ovalue);
}
}
```Reducer
```java
package mr.score;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class StudentReducer extends Reducer<StudentScore, Text, Text, StudentScore>{
@SuppressWarnings("unused")
protected void reduce(StudentScore key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
for(Text n:values){
context.write(n, key);
}
}
}
```Main
```java
package mr.score;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
public class Main {
public static void main(String[] args)throws Exception{
Configuration conf = new Configuration();
Job job = new Job(conf,"StudentScore");
job.setJarByClass(Main.class);
job.setMapperClass(StudentMapper.class);
job.setReducerClass(StudentReducer.class);
job.setMapOutputKeyClass(StudentScore.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
String inputfile = "hdfs://localhost:9000/user/hadoop/input/score.txt";
FileInputFormat.addInputPath(job, new Path(inputfile));
String outputfile = "hdfs://localhost:9000/user/hadoop/output/out6.txt";
FileOutputFormat.setOutputPath(job, new Path(outputfile));
System.exit(job.waitForCompletion(true)? 0 : 1);
}
}
运行结果及报错内容
2022-04-22 14:18:19,282 INFO [main] Configuration.deprecation (Configuration.java:logDeprecation(1285)) - session.id is deprecated. Instead, use dfs.metrics.session-id
2022-04-22 14:18:19,285 INFO [main] jvm.JvmMetrics (JvmMetrics.java:init(79)) - Initializing JVM Metrics with processName=JobTracker, sessionId=
2022-04-22 14:18:19,393 WARN [main] mapreduce.JobResourceUploader (JobResourceUploader.java:uploadFiles(64)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2022-04-22 14:18:19,417 WARN [main] mapreduce.JobResourceUploader (JobResourceUploader.java:uploadFiles(171)) - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
2022-04-22 14:18:19,434 INFO [main] input.FileInputFormat (FileInputFormat.java:listStatus(289)) - Total input files to process : 1
2022-04-22 14:18:19,482 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(200)) - number of splits:1
2022-04-22 14:18:19,592 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(289)) - Submitting tokens for job: job_local441479903_0001
2022-04-22 14:18:19,715 INFO [main] mapreduce.Job (Job.java:submit(1345)) - The url to track the job: http://localhost:8080/
2022-04-22 14:18:19,715 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1390)) - Running job: job_local441479903_0001
2022-04-22 14:18:19,716 INFO [Thread-3] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(498)) - OutputCommitter set in config null
2022-04-22 14:18:19,719 INFO [Thread-3] output.FileOutputCommitter (FileOutputCommitter.java:<init>(123)) - File Output Committer Algorithm version is 1
2022-04-22 14:18:19,719 INFO [Thread-3] output.FileOutputCommitter (FileOutputCommitter.java:<init>(138)) - FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2022-04-22 14:18:19,720 INFO [Thread-3] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(516)) - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2022-04-22 14:18:19,780 INFO [Thread-3] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(475)) - Waiting for map tasks
2022-04-22 14:18:19,781 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(251)) - Starting task: attempt_local441479903_0001_m_000000_0
2022-04-22 14:18:19,797 INFO [LocalJobRunner Map Task Executor #0] output.FileOutputCommitter (FileOutputCommitter.java:<init>(123)) - File Output Committer Algorithm version is 1
2022-04-22 14:18:19,798 INFO [LocalJobRunner Map Task Executor #0] output.FileOutputCommitter (FileOutputCommitter.java:<init>(138)) - FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2022-04-22 14:18:19,804 INFO [LocalJobRunner Map Task Executor #0] util.ProcfsBasedProcessTree (ProcfsBasedProcessTree.java:isAvailable(168)) - ProcfsBasedProcessTree currently is supported only on Linux.
2022-04-22 14:18:19,856 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:initialize(619)) - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@54300bc5
2022-04-22 14:18:19,859 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:runNewMapper(756)) - Processing split: hdfs://localhost:9000/user/hadoop/input/score.txt:0+7933
2022-04-22 14:18:19,917 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:setEquator(1205)) - (EQUATOR) 0 kvi 26214396(104857584)
2022-04-22 14:18:19,917 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(998)) - mapreduce.task.io.sort.mb: 100
2022-04-22 14:18:19,917 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(999)) - soft limit at 83886080
2022-04-22 14:18:19,917 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(1000)) - bufstart = 0; bufvoid = 104857600
2022-04-22 14:18:19,917 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(1001)) - kvstart = 26214396; length = 6553600
2022-04-22 14:18:19,920 WARN [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:createSortingCollector(411)) - Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer
java.lang.NullPointerException: Cannot invoke "org.apache.hadoop.io.serializer.Serializer.open(java.io.OutputStream)" because "this.valSerializer" is null
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1011)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)
at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:698)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:270)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:831)
2022-04-22 14:18:19,922 INFO [Thread-3] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(483)) - map task executor complete.
2022-04-22 14:18:20,001 WARN [Thread-3] mapred.LocalJobRunner (LocalJobRunner.java:run(587)) - job_local441479903_0001
java.lang.Exception: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :Cannot invoke "org.apache.hadoop.io.serializer.Serializer.open(java.io.OutputStream)" because "this.valSerializer" is null
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:489)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:549)
Caused by: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :Cannot invoke "org.apache.hadoop.io.serializer.Serializer.open(java.io.OutputStream)" because "this.valSerializer" is null
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:414)
at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:698)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:270)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.hadoop.io.serializer.Serializer.open(java.io.OutputStream)" because "this.valSerializer" is null
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1011)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)
... 10 more
2022-04-22 14:18:20,717 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1411)) - Job job_local441479903_0001 running in uber mode : false
2022-04-22 14:18:20,722 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1418)) - map 0% reduce 0%
2022-04-22 14:18:20,724 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1431)) - Job job_local441479903_0001 failed with state FAILED due to: NA
2022-04-22 14:18:20,728 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1436)) - Counters: 0
我的解答思路和尝试过的方法
搜了很多相关提问,都不是我这个的问题
我想要达到的结果
能够成功运行