public class MysqllReadMapper extends Mapper<LongWritable,MysqlWritable,LongWritable,Text> {
@Override
protected void map(LongWritable key, MysqlWritable value, Context context) throws IOException, InterruptedException {
context.write(key,new Text(value.toString()));
}
}
package com.ll.mysqlreddata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MysqlReadDriver {
public MysqlReadDriver() {
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
final Configuration conf=new Configuration();
DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver","jdbc:mysql://localhost:3306/test","root","123456");
final Job job= Job.getInstance(conf,MysqlReadDriver.class.getSimpleName());
job.setJarByClass(MysqlReadDriver.class);
job.setMapperClass(MysqllReadMapper.class);
job.setReducerClass(MysqlReadReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
DBInputFormat.setInput(job,MysqlWritable.class," SELECT goodsType,AVG(goodsprice),AVG(goodsCount) from goods_table GROUP BY goodsType","select count(goodsType) from goods_table");
job.setInputFormatClass(DBInputFormat.class);
Path output=new Path("./18one");
FileOutputFormat.setOutputPath(job,output);
// job.setPartitionerClass(GoodsPartitioner.class);
//// //一般来说 1个分区 就有一个ReduceTask执行该分区的数据
// job.setNumReduceTasks(2);//与预计的分区一致
boolean flag=job.waitForCompletion(true);
System.exit( flag ? 0 : 1 );
}
//
/*
job.setJarByClass(MysqlReadDriver.class);
job.setMapperClass(MysqllReadMapper.class);
job.setReducerClass(MysqlReadReduce.class);
./mysqlreadoutput
*/
}
package com.ll.goods_data;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MysqlReadReduce extends Reducer<LongWritable,Text,LongWritable,Text> {
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
super.reduce(key, values, context);
}
package com.ll.mysqlreddata;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class MysqlWritable implements Writable,DBWritable {
private String goodsType;
private double avg_Price;
private double avg_Count;
private double goodPrice;
public double getGoodPrice() {
return goodPrice;
}
public void setGoodPrice(double goodPrice) {
this.goodPrice = goodPrice;
}
public String getGoodsType() {
return goodsType;
}
public void setGoodsType(String goodsType) {
this.goodsType = goodsType;
}
public double getAvg_Price() {
return avg_Price;
}
public void setAvg_Price(double avg_Price) {
this.avg_Price = avg_Price;
}
public double getAvg_Count() {
return avg_Count;
}
public void setAvg_Count(double avg_Count) {
this.avg_Count = avg_Count;
}
@Override
public String toString() {
return "类型:" + goodsType+
"商品的平均价格" +avg_Price + "平均数量 "+avg_Count;
}
public MysqlWritable() {
}
@Override
public void write(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setString(1,this.goodsType);
preparedStatement.setDouble(2,this.avg_Price);
preparedStatement.setDouble(3,this.avg_Count);
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
this.avg_Price=resultSet.getDouble(2);//从结果集获取你想要的数据
this.avg_Count=resultSet.getDouble(3);//从结果集获取你想要的数据
this.goodsType=resultSet.getString(1);
// this.last_update=resultSet.getString("last_update");
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(goodsType);
dataOutput.writeDouble(getAvg_Price());
dataOutput.writeDouble(getAvg_Count());
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.goodsType=dataInput.readUTF();
this.avg_Price=dataInput.readDouble();
this.avg_Count=dataInput.readDouble();
}
}