向集群提交了两个Hbase的BulkLoad程序,写入Hbase中不同的表,用的是相同的数据,只有第二个表中有数据,第一个表中数据为空。
public class GeneratehfileDriver {
public static void main(String[] args) throws Exception {
//30546298 row(s) in 1099.9770 seconds
/**
* 获取Hbase配置,创建连接到目标表,表在Shell中已经创建好,建表语句create 'BulkLoad','Info',这里注意HBase对大小写很敏感
*/
Configuration conf= HBaseConfiguration.create();
conf.set("hbase.fs.tmp.dir", "partitions_" + UUID.randomUUID());
//conf.set("hbase.zookeeper.quorum", "master.hadoop.com:2181,slave1.hadoop.com:2181,slave2.hadoop.com:2181");
conf.set("zookeeper.znode.parent","/hbase");
//conf.set("fs.defaultFS", "hdfs://localhost:9000");
/*conf.set("fs.defaultFS", "hdfs://hadoop100:9000");
conf.set("hbase.zookeeper.quorum","hadoop100:2181,hadoop101:2181,hadoop102:2181");*/
conf.set("fs.defaultFS", "hdfs://master.hadoop.com:9000");
conf.set("hbase.zookeeper.quorum","master.hadoop.com:2181,slave1.hadoop.com:2181,slave2.hadoop.com:2181");
Connection conn=ConnectionFactory.createConnection(conf);
Table table=conn.getTable(TableName.valueOf(args[1]));
//Table table=conn.getTable(TableName.valueOf("RG"));
Admin admin=conn.getAdmin();
final String InputFile=args[0];//文件输入路径地址
//final String InputFile="/user/hive/warehouse/taxidata/20121008000058.txt";//文件输入路径地址
//final String OutputFile="/HBaseTest/Regularoutput";
final String OutputFile=args[2];
//final String InputFile="hdfs://master.hadoop.com:9000/fortest";
//final String OutputFile="hdfs://master.hadoop.com:9000/HBaseTest/output";
try {
FileSystem fs = FileSystem.get(URI.create(OutputFile), conf);
fs.delete(new Path(OutputFile), true);
fs.close();
} catch (IOException e1) {
e1.printStackTrace();
}
final Path OutputPath=new Path(OutputFile);
//设置相关类名
Job job=Job.getInstance(conf,"RegularBulkLoad");
job.setJarByClass(GeneratehfileDriver.class);
job.setMapperClass(Generatehfile.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setNumReduceTasks(4);
//设置文件的输入路径和输出路径
// job.setInputFormatClass(TextInputFormat.class);
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 1024*1024*150);
CombineTextInputFormat.setInputDirRecursive(job, true);
CombineTextInputFormat.addInputPath(job, new Path(InputFile));
job.setOutputFormatClass(HFileOutputFormat2.class);
//FileInputFormat.setInputPaths(job, InputFile);
FileOutputFormat.setOutputPath(job, OutputPath);
FileInputFormat.setInputDirRecursive(job,true);
//配置MapReduce作业,以执行增量加载到给定表中。
HFileOutputFormat2.configureIncrementalLoad(job, table, conn.getRegionLocator(TableName.valueOf(args[1])));
//HFileOutputFormat2.configureIncrementalLoad(job, table, conn.getRegionLocator(TableName.valueOf("RG")));
//MapReduce作业完成,告知RegionServers在哪里找到这些文件,将文件加载到HBase中
if(job.waitForCompletion(true)) {
LoadIncrementalHFiles Loader=new LoadIncrementalHFiles(conf);
Loader.doBulkLoad(OutputPath, admin, table, conn.getRegionLocator(TableName.valueOf(args[1])));
//Loader.doBulkLoad(OutputPath, admin, table, conn.getRegionLocator(TableName.valueOf("RG")));
}
}
}