啊水水水啊 2023-01-25 21:29 采纳率: 65.2%
浏览 35
已结题

关于#mapreduce#代码的问题,如何解决?

mapreduce代码报错:(后面就是源码,有没有兄弟帮忙调试调试,这个报错,网上相关资料也没找到,解决了会自动付费打赏)
目的:想以date为key,计算出评论数均值avg_comment、价格均值、书籍数量这三个值

img


JDBCDriver类:

package cn.bigdata.classTop500Big;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.junit.Test;

import java.io.IOException;

public class JDBCDriver {
    @Test
    public void test() throws IOException, InterruptedException, ClassNotFoundException {
        // 1.获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2.设置jar路径
        job.setJarByClass(JDBCDriver.class);
        // 3.数据输入类型为数据库输入
        job.setInputFormatClass(DBInputFormat.class); //read
        // 4.设置数据库配置并且连接
        String driverClass = "com.mysql.jdbc.Driver";
        String url = "jdbc:mysql://localhost:3306/book?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf-8";
        String userName = "root";
        String passWord = "123456";

        DBConfiguration.configureDB(job.getConfiguration(), driverClass, url,
                userName, passWord);

        // 5.设置数据输入内容-sql查询数据作为输入数据  classtop500
        DBInputFormat.setInput(job, MyDBWritable.class,
                "select id,book_id,type,name,image,comment,score,author,press,date,price from classtop500",
                "select count(*) from classtop500");
        // 6.设置输出的表 classTop500CleanPrice
        DBOutputFormat.setOutput(job,"classTop500BigScreenClean","date","book_number","avg_comment","avg_price");
//        DBOutputFormat.setOutput(job,"fruitsCleanPrice","fruits_name","fruits_min_price",
//                "fruits_max_price","fruits_avg_price");
        // 7.关联mapper和reducer
        job.setMapperClass(JDBCMapper.class);
        job.setReducerClass(JDBCReduce.class);

        // 8.设置map输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        // 9.设置最终输出的kv类型
        job.setOutputKeyClass(MyDBWritable.class);
        job.setOutputValueClass(NullWritable.class);

        // 10.提交job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}


JDBCMapper 类:

package cn.bigdata.classTop500Big;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.Date;

public class JDBCMapper extends Mapper<LongWritable, MyDBWritable, Text, Text> {
    private Text text = new Text();
    private Text outWritable = new Text();
//    private MyDBWritable price = new MyDBWritable();
    @Override
    protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {
        String date = value.getDate(); // 读取每一行的年龄作为K
        String comment = value.getAvg_comment();
        float price = value.getPrice();
        String price1 = comment + "\t" + price;
        text.set(date);
        outWritable.set(price1);
        context.write(text, outWritable);
    }
}


JDBCReduce 类:

package cn.bigdata.classTop500Big;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Date;

public class JDBCReduce extends Reducer<Text, Text, MyDBWritable, NullWritable> {
//    Double max = 0.0;
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        MyDBWritable myDBName = new MyDBWritable();
        Double avg_price = new Double(0.0);
        Double avg_comment = new Double(0.0);
        int bookNum=0;
//        text = values.iterator().next();
//        MyDBWritable myDBM_price = new MyDBWritable();
        myDBName.setDate(key.toString());
        Double comment1= new Double(0.0);
        Double price1= new Double(0.0);
        int num=1;
        for (Text value : values){
            String[] str = value.toString().split("\t");
            Double comment2 = Double.parseDouble(str[0]);
            Double price2 = Double.parseDouble(str[1]);
            if(comment2 == 0.0 || price2 == 0.0){
                continue;
            }
            price1=price1+price2;
            comment1=comment1+comment2;
            avg_price=price1/num;
            avg_comment=comment1/num;
            bookNum=num;
            num++;
        }
        myDBName.setBookNumber(String.valueOf(bookNum));
        String avg_comment1 =String.format("%.2f",avg_comment);
        myDBName.setAvg_comment(avg_comment1);
        String avg_price1 =String.format("%.2f",avg_price);
        myDBName.setAvg_price(avg_price1);
        context.write(myDBName, NullWritable.get());
    }
}

MyDBWritable 类:

package cn.bigdata.classTop500Big;


import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;

// 1.实现 DBWritable, Writable
public class MyDBWritable implements DBWritable, Writable {
    // 数据库的写入字段
    private int id;
    private int book_id;
    private String type;
    private String name;
    private String image;
    private long comment;
    private float score;
    private String  author;
    private String press;
    private String date;
    private float price;

    private String bookNumber;
    private String avg_price;
    private String avg_comment;

    // 写出字段
    // 2.反序列化所需要的空参构造
    public MyDBWritable() {
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getBook_id() {
        return book_id;
    }

    public void setBook_id(int book_id) {
        this.book_id = book_id;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getImage() {
        return image;
    }

    public void setImage(String image) {
        this.image = image;
    }

    public long getComment() {
        return comment;
    }

    public void setComment(long comment) {
        this.comment = comment;
    }

    public float getScore() {
        return score;
    }

    public void setScore(int score) {
        this.score = score;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }

    public String getPress() {
        return press;
    }

    public void setPress(String press) {
        this.press = press;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public float getPrice() {
        return price;
    }

    public void setPrice(float price) {
        this.price = price;
    }


    public String getAvg_price() {
        return avg_price;
    }

    public void setAvg_price(String avg_price) {
        this.avg_price = avg_price;
    }

    public void setScore(float score) {
        this.score = score;
    }

    public String getBookNumber() {
        return bookNumber;
    }

    public void setBookNumber(String bookNumber) {
        this.bookNumber = bookNumber;
    }

    public String getAvg_comment() {
        return avg_comment;
    }

    public void setAvg_comment(String avg_comment) {
        this.avg_comment = avg_comment;
    }

    // 3.序列化
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(id);
        dataOutput.writeInt(book_id);
        dataOutput.writeUTF(type);
        dataOutput.writeUTF(name);
        dataOutput.writeUTF(image);
        dataOutput.writeLong(comment);
        dataOutput.writeFloat(score);
        dataOutput.writeUTF(author);
        dataOutput.writeUTF(press);
        dataOutput.writeUTF(date);
        dataOutput.writeDouble(price);
        dataOutput.writeUTF(bookNumber);
        dataOutput.writeUTF(avg_price);
        dataOutput.writeUTF(avg_comment);
    }

    // 4.反序列化
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.id=dataInput.readInt();
        this.book_id = dataInput.readInt();
        this.type=dataInput.readUTF();
        this.name = dataInput.readUTF();
        this.image = dataInput.readUTF();
        this.score = dataInput.readFloat();
       this.author=dataInput.readUTF();
       this.press=dataInput.readUTF();
       this.date=dataInput.readUTF();
       this.price=dataInput.readFloat();
//       this.min_price=dataInput.readUTF();
//       this.avg_price=dataInput.readUTF();
//       this.max_price=dataInput.readUTF();
    }

    // 5.从DB读取
    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
        id=resultSet.getInt(1);
        book_id = resultSet.getInt(2); // 1,2,3对应列的坐标,从1开始
        type=resultSet.getString(3);
        name = resultSet.getString(4);
        image = resultSet.getString(5);
        comment = resultSet.getLong(6);
        score = resultSet.getFloat(7);
        author = resultSet.getString(8);
        press = resultSet.getString(9);
        date = resultSet.getString(10);
        price = resultSet.getFloat(11);
//        min_price=resultSet.getString(11);
//        avg_price=resultSet.getString(12);
//        max_price=resultSet.getString(13);

    }

    // 6.写入数据库
    @Override
    public void write(PreparedStatement preparedStatement) throws SQLException {
        preparedStatement.setString(1, date);
        preparedStatement.setString(2, bookNumber);
        preparedStatement.setString(3, avg_comment);
        preparedStatement.setString(4, avg_price);
    }

}


mysql表结构:

img

img

  • 写回答

2条回答 默认 最新

  • Halifax ‎ 2023-01-26 08:40
    关注

    JDBCMapper 里面的 map 方法,里面的 MyDbWritable 为空。

    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 1月28日
  • 修改了问题 1月25日
  • 修改了问题 1月25日
  • 创建了问题 1月25日

悬赏问题

  • ¥100 set_link_state
  • ¥15 虚幻5 UE美术毛发渲染
  • ¥15 CVRP 图论 物流运输优化
  • ¥15 Tableau online 嵌入ppt失败
  • ¥100 支付宝网页转账系统不识别账号
  • ¥15 基于单片机的靶位控制系统
  • ¥15 真我手机蓝牙传输进度消息被关闭了,怎么打开?(关键词-消息通知)
  • ¥15 装 pytorch 的时候出了好多问题,遇到这种情况怎么处理?
  • ¥20 IOS游览器某宝手机网页版自动立即购买JavaScript脚本
  • ¥15 手机接入宽带网线,如何释放宽带全部速度