@hhw 2017-11-10 03:42 采纳率: 0%
浏览 1613

MapReducer 写入到数据库 报错

【 DBUserWritable 类 】

package org.neworigin.com.Database;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class DBUserWritable implements DBWritable,WritableComparable{

private String name="";
private String sex="";
private int age=0;
private int num=0;
private String department="";
private String tables="";

@Override
public String toString() {
    return "DBUserWritable [name=" + name + ", sex=" + sex + ", age=" + age + ", department=" + department + "]";
}


public DBUserWritable(DBUserWritable d){
    this.name=d.getName();
    this.sex=d.getSex();
    this.age=d.getAge();
    this.num=d.getNum();
    this.department=d.getDepartment();
    this.tables=d.getTables();
}
public String getName() {
    return name;
}

public void setName(String name) {
    this.name = name;
}

public String getSex() {
    return sex;
}

public void setSex(String sex) {
    this.sex = sex;
}

public int getAge() {
    return age;
}

public void setAge(int age) {
    this.age = age;
}

public int getNum() {
    return num;
}

public void setNum(int num) {
    this.num = num;
}

public String getDepartment() {
    return department;
}

public void setDepartment(String department) {
    this.department = department;
}

public String getTables() {
    return tables;
}

public void setTables(String tables) {
    this.tables = tables;
}

public DBUserWritable(String name, String sex, int age, int num, String department, String tables) {
    super();
    this.name = name;
    this.sex = sex;
    this.age = age;
    this.num = num;
    this.department = department;
    this.tables = tables;
}

public DBUserWritable() {
    super();
    // TODO Auto-generated constructor stub
}

public void write(DataOutput out) throws IOException {
    // TODO Auto-generated method stub
    out.writeUTF(name);
    out.writeUTF(sex);
    out.writeInt(age);
out.writeInt(num);
    out.writeUTF(department);
out.writeUTF(tables);
}

public void readFields(DataInput in) throws IOException {
    // TODO Auto-generated method stub
    name = in.readUTF();
    sex=in.readUTF();
    age=in.readInt();
    num=in.readInt();
    department=in.readUTF();
    tables=in.readUTF();
}

public int compareTo(Object o) {
    // TODO Auto-generated method stub
    return 0;
}

public void write(PreparedStatement statement) throws SQLException {
    // TODO Auto-generated method stub
    statement.setString(1, this.getName());
    statement.setString(2, this.getSex());
    statement.setInt(3, this.getAge());
    statement.setString(4, this.getDepartment());
}

public void readFields(ResultSet resultSet) throws SQLException {
    // TODO Auto-generated method stub
     this.name=resultSet.getString(1);
     this.sex=resultSet.getString(2);
     this.age=resultSet.getInt(3);
     this.department=resultSet.getString(4);
}

}

【mapper】

package org.neworigin.com.Database;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class UserDBMapper extends Mapper {
DBUserWritable DBuser= new DBUserWritable();
@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
String[] values=value.toString().split(" ");

    if(values.length==4){

     DBuser.setName(values[0]);
     DBuser.setSex(values[1]);
     DBuser.setAge(Integer.parseInt(values[2]));
     DBuser.setNum(Integer.parseInt(values[3]));
     DBuser.setTables("t1");
     System.out.println("mapper---t1---------------"+DBuser);
     context.write(new Text(values[3]),DBuser);

     }


        if(values.length==2){

             DBuser.setNum(Integer.parseInt(values[0]));
             DBuser.setDepartment(values[1]);   
             DBuser.setTables("t2");
             context.write(new Text(values[0]),DBuser);
             //System.out.println("mapper --t2"+"--"+values[0]+"----"+DBuser);
        }

}

}

【reducer 】

package org.neworigin.com.Database;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class UserDBReducer extends Reducer {
// public DBUserWritable db= new DBUserWritable();

@Override
protected void reduce(Text k2, Iterable<DBUserWritable> v2,
        Reducer<Text, DBUserWritable, NullWritable,DBUserWritable>.Context context) throws IOException, InterruptedException {
    String Name="";
 List<DBUserWritable> list=new LinkedList<DBUserWritable>();
 for(DBUserWritable val : v2){
     list.add(new DBUserWritable(val));//new 一个对象 给list

// System.out.println("[table]"+val.getTables()+"----key"+k2+"---"+val);
if(val.getTables().equals("t2")){
Name=val.getDepartment();

     }
 }
 //键是  num  
for(DBUserWritable join : list){
    System.out.println("[table]"+join.getTables()+"----key"+k2+"---"+join);
if(join.getTables().equals("t1")){

    join.setDepartment(Name);
System.out.println("db-----"+join);
    context.write(NullWritable.get(), join);
    }
}

}

}

【app】

package org.neworigin.com.Database;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class UserDBAPP {

public static void main(String[] args) throws Exception, URISyntaxException {
    // TODO Auto-generated method stub
   String INPUT_PATH="file:///E:/BigData_eclipse_database/Database/data/table1";
   String INPUT_PATH1="file:///E:/BigData_eclipse_database/Database/data/table2";

// String OUTPUT_PARH="file:///E:/BigData_eclipse_database/Database/data/output";
Configuration conf = new Configuration();
// FileSystem fs=FileSystem.get(new URI(OUTPUT_PARH),conf);
// if(fs.exists(new Path(OUTPUT_PARH))){
// fs.delete(new Path(OUTPUT_PARH));
// }

   Job job = new Job(conf,"mydb");
   //设置数据库配置

   DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost/hadoop", "root", "123456");


   FileInputFormat.addInputPaths(job,INPUT_PATH);
   FileInputFormat.addInputPaths(job,INPUT_PATH1);

   job.setMapperClass(UserDBMapper.class);
   job.setMapOutputKeyClass(Text.class);
   job.setMapOutputValueClass(DBUserWritable.class);

   job.setReducerClass(UserDBReducer.class);
   job.setOutputKeyClass(NullWritable.class);
   job.setOutputValueClass(DBUserWritable.class);

// FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PARH));
//设置输出路径
DBOutputFormat.setOutput(job,"user_tables", "name","sex","age","department");
job.setOutputFormatClass(DBOutputFormat.class);
boolean re = job.waitForCompletion(true);
System.out.println(re);
}

}

【报错】ps 表链接 ,写到本地没问题 写到数据库 就报错;
17/11/10 11:39:11 WARN output.FileOutputCommitter: Output Path is null in cleanupJob()
17/11/10 11:39:11 WARN mapred.LocalJobRunner: job_local1812680657_0001
java.lang.Exception: java.io.IOException
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.io.IOException
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.getRecordWriter(DBOutputFormat.java:185)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.(ReduceTask.java:541)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:614)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/11/10 11:39:12 INFO mapreduce.Job: Job job_local1812680657_0001 running in uber mode : false
17/11/10 11:39:12 INFO mapreduce.Job: map 100% reduce 0%
17/11/10 11:39:12 INFO mapreduce.Job: Job job_local1812680657_0001 failed with state FAILED due to: NA
17/11/10 11:39:12 INFO mapreduce.Job: Counters: 35

  • 写回答

6条回答 默认 最新

  • 荣洋 2017-11-10 06:59
    关注

    会不会少了端口号3306,jdbc:mysql://localhost/hadoop

    评论

报告相同问题?

悬赏问题

  • ¥35 平滑拟合曲线该如何生成
  • ¥100 c语言,请帮蒟蒻写一个题的范例作参考
  • ¥15 名为“Product”的列已属于此 DataTable
  • ¥15 安卓adb backup备份应用数据失败
  • ¥15 eclipse运行项目时遇到的问题
  • ¥15 关于#c##的问题:最近需要用CAT工具Trados进行一些开发
  • ¥15 南大pa1 小游戏没有界面,并且报了如下错误,尝试过换显卡驱动,但是好像不行
  • ¥15 自己瞎改改,结果现在又运行不了了
  • ¥15 链式存储应该如何解决
  • ¥15 没有证书,nginx怎么反向代理到只能接受https的公网网站