mapreduce代码报错:(后面就是源码,有没有兄弟帮忙调试调试,这个报错,网上相关资料也没找到,解决了会自动付费打赏)
目的:想以date为key,计算出评论数均值avg_comment、价格均值、书籍数量这三个值
JDBCDriver类:
package cn.bigdata.classTop500Big;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.junit.Test;
import java.io.IOException;
public class JDBCDriver {
@Test
public void test() throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置jar路径
job.setJarByClass(JDBCDriver.class);
// 3.数据输入类型为数据库输入
job.setInputFormatClass(DBInputFormat.class); //read
// 4.设置数据库配置并且连接
String driverClass = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://localhost:3306/book?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf-8";
String userName = "root";
String passWord = "123456";
DBConfiguration.configureDB(job.getConfiguration(), driverClass, url,
userName, passWord);
// 5.设置数据输入内容-sql查询数据作为输入数据 classtop500
DBInputFormat.setInput(job, MyDBWritable.class,
"select id,book_id,type,name,image,comment,score,author,press,date,price from classtop500",
"select count(*) from classtop500");
// 6.设置输出的表 classTop500CleanPrice
DBOutputFormat.setOutput(job,"classTop500BigScreenClean","date","book_number","avg_comment","avg_price");
// DBOutputFormat.setOutput(job,"fruitsCleanPrice","fruits_name","fruits_min_price",
// "fruits_max_price","fruits_avg_price");
// 7.关联mapper和reducer
job.setMapperClass(JDBCMapper.class);
job.setReducerClass(JDBCReduce.class);
// 8.设置map输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 9.设置最终输出的kv类型
job.setOutputKeyClass(MyDBWritable.class);
job.setOutputValueClass(NullWritable.class);
// 10.提交job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
JDBCMapper 类:
package cn.bigdata.classTop500Big;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Date;
public class JDBCMapper extends Mapper<LongWritable, MyDBWritable, Text, Text> {
private Text text = new Text();
private Text outWritable = new Text();
// private MyDBWritable price = new MyDBWritable();
@Override
protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {
String date = value.getDate(); // 读取每一行的年龄作为K
String comment = value.getAvg_comment();
float price = value.getPrice();
String price1 = comment + "\t" + price;
text.set(date);
outWritable.set(price1);
context.write(text, outWritable);
}
}
JDBCReduce 类:
package cn.bigdata.classTop500Big;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Date;
public class JDBCReduce extends Reducer<Text, Text, MyDBWritable, NullWritable> {
// Double max = 0.0;
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
MyDBWritable myDBName = new MyDBWritable();
Double avg_price = new Double(0.0);
Double avg_comment = new Double(0.0);
int bookNum=0;
// text = values.iterator().next();
// MyDBWritable myDBM_price = new MyDBWritable();
myDBName.setDate(key.toString());
Double comment1= new Double(0.0);
Double price1= new Double(0.0);
int num=1;
for (Text value : values){
String[] str = value.toString().split("\t");
Double comment2 = Double.parseDouble(str[0]);
Double price2 = Double.parseDouble(str[1]);
if(comment2 == 0.0 || price2 == 0.0){
continue;
}
price1=price1+price2;
comment1=comment1+comment2;
avg_price=price1/num;
avg_comment=comment1/num;
bookNum=num;
num++;
}
myDBName.setBookNumber(String.valueOf(bookNum));
String avg_comment1 =String.format("%.2f",avg_comment);
myDBName.setAvg_comment(avg_comment1);
String avg_price1 =String.format("%.2f",avg_price);
myDBName.setAvg_price(avg_price1);
context.write(myDBName, NullWritable.get());
}
}
MyDBWritable 类:
package cn.bigdata.classTop500Big;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
// 1.实现 DBWritable, Writable
public class MyDBWritable implements DBWritable, Writable {
// 数据库的写入字段
private int id;
private int book_id;
private String type;
private String name;
private String image;
private long comment;
private float score;
private String author;
private String press;
private String date;
private float price;
private String bookNumber;
private String avg_price;
private String avg_comment;
// 写出字段
// 2.反序列化所需要的空参构造
public MyDBWritable() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getBook_id() {
return book_id;
}
public void setBook_id(int book_id) {
this.book_id = book_id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getImage() {
return image;
}
public void setImage(String image) {
this.image = image;
}
public long getComment() {
return comment;
}
public void setComment(long comment) {
this.comment = comment;
}
public float getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
public String getAuthor() {
return author;
}
public void setAuthor(String author) {
this.author = author;
}
public String getPress() {
return press;
}
public void setPress(String press) {
this.press = press;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public float getPrice() {
return price;
}
public void setPrice(float price) {
this.price = price;
}
public String getAvg_price() {
return avg_price;
}
public void setAvg_price(String avg_price) {
this.avg_price = avg_price;
}
public void setScore(float score) {
this.score = score;
}
public String getBookNumber() {
return bookNumber;
}
public void setBookNumber(String bookNumber) {
this.bookNumber = bookNumber;
}
public String getAvg_comment() {
return avg_comment;
}
public void setAvg_comment(String avg_comment) {
this.avg_comment = avg_comment;
}
// 3.序列化
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(id);
dataOutput.writeInt(book_id);
dataOutput.writeUTF(type);
dataOutput.writeUTF(name);
dataOutput.writeUTF(image);
dataOutput.writeLong(comment);
dataOutput.writeFloat(score);
dataOutput.writeUTF(author);
dataOutput.writeUTF(press);
dataOutput.writeUTF(date);
dataOutput.writeDouble(price);
dataOutput.writeUTF(bookNumber);
dataOutput.writeUTF(avg_price);
dataOutput.writeUTF(avg_comment);
}
// 4.反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id=dataInput.readInt();
this.book_id = dataInput.readInt();
this.type=dataInput.readUTF();
this.name = dataInput.readUTF();
this.image = dataInput.readUTF();
this.score = dataInput.readFloat();
this.author=dataInput.readUTF();
this.press=dataInput.readUTF();
this.date=dataInput.readUTF();
this.price=dataInput.readFloat();
// this.min_price=dataInput.readUTF();
// this.avg_price=dataInput.readUTF();
// this.max_price=dataInput.readUTF();
}
// 5.从DB读取
@Override
public void readFields(ResultSet resultSet) throws SQLException {
id=resultSet.getInt(1);
book_id = resultSet.getInt(2); // 1,2,3对应列的坐标,从1开始
type=resultSet.getString(3);
name = resultSet.getString(4);
image = resultSet.getString(5);
comment = resultSet.getLong(6);
score = resultSet.getFloat(7);
author = resultSet.getString(8);
press = resultSet.getString(9);
date = resultSet.getString(10);
price = resultSet.getFloat(11);
// min_price=resultSet.getString(11);
// avg_price=resultSet.getString(12);
// max_price=resultSet.getString(13);
}
// 6.写入数据库
@Override
public void write(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setString(1, date);
preparedStatement.setString(2, bookNumber);
preparedStatement.setString(3, avg_comment);
preparedStatement.setString(4, avg_price);
}
}
mysql表结构: