【 DBUserWritable 类 】
package org.neworigin.com.Database;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
public class DBUserWritable implements DBWritable,WritableComparable{
private String name="";
private String sex="";
private int age=0;
private int num=0;
private String department="";
private String tables="";
@Override
public String toString() {
return "DBUserWritable [name=" + name + ", sex=" + sex + ", age=" + age + ", department=" + department + "]";
}
public DBUserWritable(DBUserWritable d){
this.name=d.getName();
this.sex=d.getSex();
this.age=d.getAge();
this.num=d.getNum();
this.department=d.getDepartment();
this.tables=d.getTables();
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public String getDepartment() {
return department;
}
public void setDepartment(String department) {
this.department = department;
}
public String getTables() {
return tables;
}
public void setTables(String tables) {
this.tables = tables;
}
public DBUserWritable(String name, String sex, int age, int num, String department, String tables) {
super();
this.name = name;
this.sex = sex;
this.age = age;
this.num = num;
this.department = department;
this.tables = tables;
}
public DBUserWritable() {
super();
// TODO Auto-generated constructor stub
}
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(name);
out.writeUTF(sex);
out.writeInt(age);
out.writeInt(num);
out.writeUTF(department);
out.writeUTF(tables);
}
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
name = in.readUTF();
sex=in.readUTF();
age=in.readInt();
num=in.readInt();
department=in.readUTF();
tables=in.readUTF();
}
public int compareTo(Object o) {
// TODO Auto-generated method stub
return 0;
}
public void write(PreparedStatement statement) throws SQLException {
// TODO Auto-generated method stub
statement.setString(1, this.getName());
statement.setString(2, this.getSex());
statement.setInt(3, this.getAge());
statement.setString(4, this.getDepartment());
}
public void readFields(ResultSet resultSet) throws SQLException {
// TODO Auto-generated method stub
this.name=resultSet.getString(1);
this.sex=resultSet.getString(2);
this.age=resultSet.getInt(3);
this.department=resultSet.getString(4);
}
}
【mapper】
package org.neworigin.com.Database;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class UserDBMapper extends Mapper {
DBUserWritable DBuser= new DBUserWritable();
@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
String[] values=value.toString().split(" ");
if(values.length==4){
DBuser.setName(values[0]);
DBuser.setSex(values[1]);
DBuser.setAge(Integer.parseInt(values[2]));
DBuser.setNum(Integer.parseInt(values[3]));
DBuser.setTables("t1");
System.out.println("mapper---t1---------------"+DBuser);
context.write(new Text(values[3]),DBuser);
}
if(values.length==2){
DBuser.setNum(Integer.parseInt(values[0]));
DBuser.setDepartment(values[1]);
DBuser.setTables("t2");
context.write(new Text(values[0]),DBuser);
//System.out.println("mapper --t2"+"--"+values[0]+"----"+DBuser);
}
}
}
【reducer 】
package org.neworigin.com.Database;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class UserDBReducer extends Reducer {
// public DBUserWritable db= new DBUserWritable();
@Override
protected void reduce(Text k2, Iterable<DBUserWritable> v2,
Reducer<Text, DBUserWritable, NullWritable,DBUserWritable>.Context context) throws IOException, InterruptedException {
String Name="";
List<DBUserWritable> list=new LinkedList<DBUserWritable>();
for(DBUserWritable val : v2){
list.add(new DBUserWritable(val));//new 一个对象 给list
// System.out.println("[table]"+val.getTables()+"----key"+k2+"---"+val);
if(val.getTables().equals("t2")){
Name=val.getDepartment();
}
}
//键是 num
for(DBUserWritable join : list){
System.out.println("[table]"+join.getTables()+"----key"+k2+"---"+join);
if(join.getTables().equals("t1")){
join.setDepartment(Name);
System.out.println("db-----"+join);
context.write(NullWritable.get(), join);
}
}
}
}
【app】
package org.neworigin.com.Database;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class UserDBAPP {
public static void main(String[] args) throws Exception, URISyntaxException {
// TODO Auto-generated method stub
String INPUT_PATH="file:///E:/BigData_eclipse_database/Database/data/table1";
String INPUT_PATH1="file:///E:/BigData_eclipse_database/Database/data/table2";
// String OUTPUT_PARH="file:///E:/BigData_eclipse_database/Database/data/output";
Configuration conf = new Configuration();
// FileSystem fs=FileSystem.get(new URI(OUTPUT_PARH),conf);
// if(fs.exists(new Path(OUTPUT_PARH))){
// fs.delete(new Path(OUTPUT_PARH));
// }
Job job = new Job(conf,"mydb");
//设置数据库配置
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost/hadoop", "root", "123456");
FileInputFormat.addInputPaths(job,INPUT_PATH);
FileInputFormat.addInputPaths(job,INPUT_PATH1);
job.setMapperClass(UserDBMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DBUserWritable.class);
job.setReducerClass(UserDBReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(DBUserWritable.class);
// FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PARH));
//设置输出路径
DBOutputFormat.setOutput(job,"user_tables", "name","sex","age","department");
job.setOutputFormatClass(DBOutputFormat.class);
boolean re = job.waitForCompletion(true);
System.out.println(re);
}
}
【报错】ps 表链接 ,写到本地没问题 写到数据库 就报错;
17/11/10 11:39:11 WARN output.FileOutputCommitter: Output Path is null in cleanupJob()
17/11/10 11:39:11 WARN mapred.LocalJobRunner: job_local1812680657_0001
java.lang.Exception: java.io.IOException
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.io.IOException
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.getRecordWriter(DBOutputFormat.java:185)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.(ReduceTask.java:541)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:614)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/11/10 11:39:12 INFO mapreduce.Job: Job job_local1812680657_0001 running in uber mode : false
17/11/10 11:39:12 INFO mapreduce.Job: map 100% reduce 0%
17/11/10 11:39:12 INFO mapreduce.Job: Job job_local1812680657_0001 failed with state FAILED due to: NA
17/11/10 11:39:12 INFO mapreduce.Job: Counters: 35