求大家帮忙看一下,不知道为什么输出为空
我的csv文件格式是:
CsvBean
package com.position.clean;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CsvBean implements Writable{
private String date;
private String month;
//private String week;
private String weather;
private String wind;
private String winddirection;//风向
private int max;
private int min;
private int windforce; //风力
public CsvBean() {
}
public CsvBean(String date, String month, String weather, String wind, String winddirection, int max, int min, int windforce) {
this.date = date;
this.month = month;
//this.week = week;
this.weather = weather;
this.max = max;
this.min = min;
this.weather = weather;
this.wind = wind;
this.winddirection = winddirection;
this.windforce = windforce;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getMonth() {
return month;
}
public void setMonth(String Month) {
this.month = Month;
}
// public String getWeek() {
// return week;
// }
// public void setWeek(String week) {
// this.week = week;
// }
public int getMax() {
return max;
}
public void setMax(int max) {
this.max = max;
}
public int getMin() {
return min;
}
public void setMin(int min) {
this.min = min;
}
public String getWeather() {
return weather;
}
public void setWeather(String weather) {
this.weather = weather;
}
public String getWind() {
return wind;
}
public void setWind(String wind) {
this.wind = wind;
}
public String getWinddirection() {
return winddirection;
}
public void setWinddirection(String winddirection) {
this.winddirection = winddirection;
}
public int getWindforce() {
return windforce;
}
public void setWindforce(int windforce) {
this.windforce = windforce;
}
@Override
public String toString() {
return date + "," + month + "," + max + "," + min+ "," + weather + "," + wind + "," + winddirection+ "," + windforce;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(date);
dataOutput.writeUTF(month);
// dataOutput.writeUTF(week);
dataOutput.writeInt(max);
dataOutput.writeInt(min);
dataOutput.writeUTF(weather);
dataOutput.writeUTF(wind);
dataOutput.writeUTF(winddirection);
dataOutput.writeInt(windforce);
}
public void readFields(DataInput dataInput) throws IOException {
this.date = dataInput.readUTF();
this.month=dataInput.readUTF();
// this.week = dataInput.readUTF();
this.max=dataInput.readInt();
this.min=dataInput.readInt();
this.weather=dataInput.readUTF();
this.wind=dataInput.readUTF();
this.winddirection=dataInput.readUTF();
this.windforce=dataInput.readInt();
}
}
CsvSplitMapper
package com.position.clean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.text.SimpleDateFormat;
import java.text.ParseException;
import java.util.Date;
import java.io.IOException;
public class CsvSplitMapper extends Mapper<LongWritable, Text,Text,CsvBean> {
private Text outK = new Text();
private CsvBean outV = new CsvBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行转为String
String line = value.toString();
// 2 按照逗号分割
String[] csvComments = line.split(",");
// 3 获取需要的值
String date = csvComments[1];
//String week = csvComments[2];
int max = extractTemperature(csvComments[3]);
int min = extractTemperature(csvComments[4]);
String weather = csvComments[5];
String wind = csvComments[6];
//分割wind字段
String[] windComponents = extractWindInfo(wind);
String winddirection = windComponents[0];
int windforce = extractWindForce(windComponents[1]);
// 4 封装到对象
outV.setDate(date);
//outV.setWeek(week);
//outK.setMonth(month);
outK.set(getMonthFromDate(date));//月份
outV.setMax(max);
outV.setMin(min);
outV.setWeather(weather);
outV.setWind(wind);
outV.setWinddirection(winddirection);
outV.setWindforce(windforce);
// 5 写出
context.write(outK, outV);
}
private String getMonthFromDate(String date) {
//日期格式为 "yyyy/MM/dd"
String[] parts = date.split("/");
return parts[1];
}
private int extractTemperature(String temperature) {
// 提取数字部分并转换为整数
return Integer.parseInt(temperature.replaceAll("[^0-9-]", ""));
}
private String[] extractWindInfo(String windInfo) {
// 分割风向和风力信息
//return windInfo.split("\\s+");
return windInfo.split("\t");
}
private int extractWindForce(String windForce) {
// 提取风力数字部分并转换为整数
return Integer.parseInt(windForce.replaceAll("[^0-9-]", ""));
}
}
CsvSplitReducer
package com.position.clean;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class CsvSplitReducer extends Reducer<Text, CsvBean, Text, CsvBean> {
@Override
protected void reduce(Text key, Iterable<CsvBean> values, Context context) throws IOException, InterruptedException {
int max = Integer.MIN_VALUE;
int min = Integer.MAX_VALUE;
CsvBean maxBean = null;
CsvBean minBean = null;
for (CsvBean val : values) {
if (val.getMax() > max) {
max = val.getMax();
maxBean = val;
//maxBean = new CsvBean(val.getDate(), val.getMonth(), val.getWeather(), val.getWind(), val.getWinddirection(), val.getMax(), val.getMin(), val.getWindforce());
}
if (val.getMin() < min) {
min = val.getMin();
minBean = val;
//minBean = new CsvBean(val.getDate(), val.getMonth(), val.getWeather(), val.getWind(), val.getWinddirection(), val.getMax(), val.getMin(), val.getWindforce());
}
}
if (maxBean != null) {
context.write(new Text("当月最高气温:" + max), maxBean);
}
if (minBean != null) {
context.write(new Text("当月最低气温:" + min), minBean);
}
}
}
CsvSplitDriver
package com.position.clean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
//运行这个右键->RUN AS->JAVA APPLICATION
public class CsvSplitDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 关联Driver类
job.setJarByClass(CsvSplitDriver.class);
//3 关联Mapper和Reducer类
job.setMapperClass(CsvSplitMapper.class);
job.setReducerClass(CsvSplitReducer.class);
//4 设置Map的输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CsvBean.class);
//5 设置最终的输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CsvBean.class);
//6 设置输入输出路径
FileInputFormat.setInputPaths(job,new Path("/mnt/hgfs/Share/weather.csv"));
FileOutputFormat.setOutputPath(job,new Path("/mnt/hgfs/Share/weather2"));
//7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
最终输出了weather2文件夹,但是里面的内容为空