怎么全是bug(哭 2024-01-12 10:45 采纳率: 0%
浏览 6

mapreduce编程(无输出)

求大家帮忙看一下,不知道为什么输出为空
我的csv文件格式是:

img

CsvBean

package com.position.clean;

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
 
public class CsvBean implements Writable{
 
    private String date;
    private String month;
    //private String week;
    private String weather;
    private String wind;
    private String winddirection;//风向
    private int max;
    private int min;
    private int windforce;    //风力

 
    public CsvBean() {
 
    }
 
    public CsvBean(String date, String month, String weather, String wind, String winddirection, int max, int min, int windforce) {
 
        this.date = date;
        this.month = month;
        //this.week = week;
        this.weather = weather;
        this.max = max;    
        this.min = min;
        this.weather = weather;
        this.wind = wind;
        this.winddirection = winddirection;
        this.windforce = windforce;

    }
 
    public String getDate() {
        return date;
    }
    public void setDate(String date) {
        this.date = date;
    }
    public String getMonth() {
        return month;
    }
    public void setMonth(String Month) {
        this.month = Month;
    }
//    public String getWeek() {
//        return week;
//    }
//    public void setWeek(String week) {
//        this.week = week;
//    }
    public int getMax() {
        return max;
    }
    public void setMax(int max) {
        this.max = max;
    }
    public int getMin() {
        return min;
    }
    public void setMin(int min) {
        this.min = min;
    }
    public String getWeather() {
        return weather;
    }
    public void setWeather(String weather) {
        this.weather = weather;
    }
    public String getWind() {
        return wind;
    }
    public void setWind(String wind) {
        this.wind = wind;
    }
    public String getWinddirection() {
        return winddirection;
    }
    public void setWinddirection(String winddirection) {
        this.winddirection = winddirection;
    }
    public int getWindforce() {
        return windforce;
    }
    public void setWindforce(int windforce) {
        this.windforce = windforce;
    }
 
    @Override
    public String toString() {
        return date + "," + month + "," + max + "," + min+ "," + weather + "," + wind + "," + winddirection+ "," + windforce;
    }
 
    public void write(DataOutput dataOutput) throws IOException {
 
        dataOutput.writeUTF(date);
        dataOutput.writeUTF(month);
//        dataOutput.writeUTF(week);
        dataOutput.writeInt(max);
        dataOutput.writeInt(min);
        dataOutput.writeUTF(weather);
        dataOutput.writeUTF(wind);
        dataOutput.writeUTF(winddirection);
        dataOutput.writeInt(windforce);
    }
 
    public void readFields(DataInput dataInput) throws IOException {
 
        this.date = dataInput.readUTF();
        this.month=dataInput.readUTF();
 //       this.week = dataInput.readUTF();
        this.max=dataInput.readInt();    
        this.min=dataInput.readInt();
        this.weather=dataInput.readUTF();
        this.wind=dataInput.readUTF();
        this.winddirection=dataInput.readUTF();
        this.windforce=dataInput.readInt();
    }
}

CsvSplitMapper

package com.position.clean;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.text.SimpleDateFormat;
import java.text.ParseException;
import java.util.Date;
import java.io.IOException;
 
public class CsvSplitMapper extends Mapper<LongWritable, Text,Text,CsvBean> {
 
    private Text outK = new Text();
    private CsvBean outV = new CsvBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1 获取一行转为String
        String line = value.toString();

        // 2 按照逗号分割
        String[] csvComments = line.split(",");

        // 3 获取需要的值
        String date = csvComments[1];
        //String week = csvComments[2];
        int max = extractTemperature(csvComments[3]);
        int min = extractTemperature(csvComments[4]);
        String weather = csvComments[5];
        String wind = csvComments[6];
        //分割wind字段
        String[] windComponents = extractWindInfo(wind);
        String winddirection = windComponents[0];
        int windforce = extractWindForce(windComponents[1]);

        // 4 封装到对象
        outV.setDate(date);
        //outV.setWeek(week);
        //outK.setMonth(month);
        outK.set(getMonthFromDate(date));//月份
        outV.setMax(max);
        outV.setMin(min);
        outV.setWeather(weather);
        outV.setWind(wind);
        outV.setWinddirection(winddirection);
        outV.setWindforce(windforce);

        // 5 写出
        context.write(outK, outV);
    }
    private String getMonthFromDate(String date) {
        //日期格式为 "yyyy/MM/dd"
        String[] parts = date.split("/");
        return parts[1];
    }
    private int extractTemperature(String temperature) {
        // 提取数字部分并转换为整数
        return Integer.parseInt(temperature.replaceAll("[^0-9-]", ""));
    }
    private String[] extractWindInfo(String windInfo) {
        // 分割风向和风力信息
        //return windInfo.split("\\s+");
        return windInfo.split("\t");
    }
    private int extractWindForce(String windForce) {
        // 提取风力数字部分并转换为整数
        return Integer.parseInt(windForce.replaceAll("[^0-9-]", ""));
    }

}

CsvSplitReducer

package com.position.clean;

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class CsvSplitReducer extends Reducer<Text, CsvBean, Text, CsvBean> {
    @Override
    protected void reduce(Text key, Iterable<CsvBean> values, Context context) throws IOException, InterruptedException {
        int max = Integer.MIN_VALUE;
        int min = Integer.MAX_VALUE;
        CsvBean maxBean = null;
        CsvBean minBean = null;

        for (CsvBean val : values) {
            if (val.getMax() > max) {
                max = val.getMax();
                maxBean = val;
                //maxBean = new CsvBean(val.getDate(), val.getMonth(), val.getWeather(), val.getWind(), val.getWinddirection(), val.getMax(), val.getMin(), val.getWindforce());
            }

            if (val.getMin() < min) {
                min = val.getMin();
                minBean = val;
                //minBean = new CsvBean(val.getDate(), val.getMonth(), val.getWeather(), val.getWind(), val.getWinddirection(), val.getMax(), val.getMin(), val.getWindforce());
            }
        }

        if (maxBean != null) {
            context.write(new Text("当月最高气温:" + max), maxBean);
        }

        if (minBean != null) {
            context.write(new Text("当月最低气温:" + min), minBean);
        }
    }
}

CsvSplitDriver

package com.position.clean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
//运行这个右键->RUN AS->JAVA APPLICATION
public class CsvSplitDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        //1 获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2 关联Driver类
        job.setJarByClass(CsvSplitDriver.class);

        //3 关联Mapper和Reducer类
        job.setMapperClass(CsvSplitMapper.class);
        job.setReducerClass(CsvSplitReducer.class);

        //4 设置Map的输入输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CsvBean.class);

        //5    设置最终的输入输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CsvBean.class);

        //6    设置输入输出路径
        FileInputFormat.setInputPaths(job,new Path("/mnt/hgfs/Share/weather.csv"));
        FileOutputFormat.setOutputPath(job,new Path("/mnt/hgfs/Share/weather2"));

        //7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);
    }
}

最终输出了weather2文件夹,但是里面的内容为空

img

  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2024-01-12 12:22
    关注

    【相关推荐】



    • 请看👉 :Mapreduce对csv文件数据进行价格排序处理
    • 除此之外, 这篇博客: MapReduce并行处理csv文件,将船舶数据划分子轨迹中的 Map阶段 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:

      map阶段主要是过滤速度阈值,将速度小于3kn的数据点看作抛锚点过滤

      // Mapper
      public class SubTrajectorMapper extend Mapper<LongWritable, Text, Text, SubTrajectorBean>{
      	
      	// 输出key、value
      	private Text outK = new Text();
      	private SubTrajectorBean outV = new SubTrajectorBean();
      	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
      		
      		// 转为字符串
      		String[] comments = value.toString();
      		// 判断速度是否大于3Kn,大于输出,否则过滤
      		if(comments[5] > 3){
      			String MMSI = comments[1];
      			Double Lat_d = Double.parseDouble(comment[7]);
      			Double Lon_d = Double.parseDouble(comment[8]);
      			Double unixTime = Long.parseLong(comments[9]);
      			
      			// 封装bean对象
      			outV.setMMMSI(MMSI);
      			outV.setLat_d(Lat_d);
      			outV.setLon_d(Lon_d);
      			outV.setUnixTime(unixTime);
      			
      			// 封装Text对象
      			outK.set(MMSI);
      			
      			// 写出context
      			context.write(outK, outV);
      		}
      	}
      }
      

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

问题事件

  • 创建了问题 1月12日

悬赏问题

  • ¥15 有偿求苍穹外卖环境配置
  • ¥15 代码在keil5里变成了这样怎么办啊,文件图像也变了,
  • ¥20 Ue4.26打包win64bit报错,如何解决?(语言-c++)
  • ¥15 clousx6整点报时指令怎么写
  • ¥30 远程帮我安装软件及库文件
  • ¥15 关于#自动化#的问题:如何通过电脑控制多相机同步拍照或摄影(相机或者摄影模组数量大于60),并将所有采集的照片或视频以一定编码规则存放至规定电脑文件夹内
  • ¥20 深信服vpn-2050这台设备如何配置才能成功联网?
  • ¥15 Arduino的wifi连接,如何关闭低功耗模式?
  • ¥15 Android studio 无法定位adb是什么问题?
  • ¥15 C#连接不上服务器,