缓缓归_ 2021-03-31 15:37 采纳率: 0%
浏览 30

提交两个Hbase的BulkLoad程序,只有第二个生效

向集群提交了两个Hbase的BulkLoad程序,写入Hbase中不同的表,用的是相同的数据,只有第二个表中有数据,第一个表中数据为空。

public class GeneratehfileDriver {

    public static void main(String[] args) throws Exception {
        //30546298 row(s) in 1099.9770 seconds


        /**
         * 获取Hbase配置,创建连接到目标表,表在Shell中已经创建好,建表语句create 'BulkLoad','Info',这里注意HBase对大小写很敏感
         */
        Configuration conf= HBaseConfiguration.create();
        conf.set("hbase.fs.tmp.dir", "partitions_" + UUID.randomUUID());
        //conf.set("hbase.zookeeper.quorum", "master.hadoop.com:2181,slave1.hadoop.com:2181,slave2.hadoop.com:2181");
        conf.set("zookeeper.znode.parent","/hbase");
        //conf.set("fs.defaultFS", "hdfs://localhost:9000");
        /*conf.set("fs.defaultFS", "hdfs://hadoop100:9000");
        conf.set("hbase.zookeeper.quorum","hadoop100:2181,hadoop101:2181,hadoop102:2181");*/
        conf.set("fs.defaultFS", "hdfs://master.hadoop.com:9000");
        conf.set("hbase.zookeeper.quorum","master.hadoop.com:2181,slave1.hadoop.com:2181,slave2.hadoop.com:2181");
        Connection conn=ConnectionFactory.createConnection(conf);

        Table table=conn.getTable(TableName.valueOf(args[1]));
        //Table table=conn.getTable(TableName.valueOf("RG"));

        Admin admin=conn.getAdmin();

        final String InputFile=args[0];//文件输入路径地址

        //final String InputFile="/user/hive/warehouse/taxidata/20121008000058.txt";//文件输入路径地址

        //final String OutputFile="/HBaseTest/Regularoutput";
        final String OutputFile=args[2];
        //final String InputFile="hdfs://master.hadoop.com:9000/fortest";
        //final String OutputFile="hdfs://master.hadoop.com:9000/HBaseTest/output";
        try {
            FileSystem fs = FileSystem.get(URI.create(OutputFile), conf);
            fs.delete(new Path(OutputFile), true);
            fs.close();
        } catch (IOException e1) {
            e1.printStackTrace();
        }
        final Path OutputPath=new Path(OutputFile);

        //设置相关类名
        Job job=Job.getInstance(conf,"RegularBulkLoad");
        job.setJarByClass(GeneratehfileDriver.class);
        job.setMapperClass(Generatehfile.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setNumReduceTasks(4);
        //设置文件的输入路径和输出路径
       // job.setInputFormatClass(TextInputFormat.class);

        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job, 1024*1024*150);
        CombineTextInputFormat.setInputDirRecursive(job, true);
        CombineTextInputFormat.addInputPath(job, new Path(InputFile));

        job.setOutputFormatClass(HFileOutputFormat2.class);

        //FileInputFormat.setInputPaths(job, InputFile);
        FileOutputFormat.setOutputPath(job, OutputPath);

        FileInputFormat.setInputDirRecursive(job,true);

        //配置MapReduce作业,以执行增量加载到给定表中。
        HFileOutputFormat2.configureIncrementalLoad(job, table, conn.getRegionLocator(TableName.valueOf(args[1])));
        //HFileOutputFormat2.configureIncrementalLoad(job, table, conn.getRegionLocator(TableName.valueOf("RG")));

        //MapReduce作业完成,告知RegionServers在哪里找到这些文件,将文件加载到HBase中
        if(job.waitForCompletion(true)) {
            LoadIncrementalHFiles Loader=new LoadIncrementalHFiles(conf);
            Loader.doBulkLoad(OutputPath, admin, table, conn.getRegionLocator(TableName.valueOf(args[1])));
            //Loader.doBulkLoad(OutputPath, admin, table, conn.getRegionLocator(TableName.valueOf("RG")));

        }
    }
}
  • 写回答

1条回答 默认 最新

  • 久绊A 全栈领域新星创作者 2023-01-21 15:22
    关注
    > [public class GeneratehfileDriver {
     
        public static void main(String[] args) throws Exception {
            //30546298 row(s) in 1099.9770 seconds
     
     
            /**
             * 获取Hbase配置,创建连接到目标表,表在Shell中已经创建好,建表语句create 'BulkLoad','Info',这里注意HBase对大小写很敏感
             */
            Configuration conf= HBaseConfiguration.create();
            conf.set("hbase.fs.tmp.dir", "partitions_" + UUID.randomUUID());
            //conf.set("hbase.zookeeper.quorum", "master.hadoop.com:2181,slave1.hadoop.com:2181,slave2.hadoop.com:2181");
            conf.set("zookeeper.znode.parent","/hbase");
            //conf.set("fs.defaultFS", "hdfs://localhost:9000");
            /*conf.set("fs.defaultFS", "hdfs://hadoop100:9000");
            conf.set("hbase.zookeeper.quorum","hadoop100:2181,hadoop101:2181,hadoop102:2181");*/
            conf.set("fs.defaultFS", "hdfs://master.hadoop.com:9000");
            conf.set("hbase.zookeeper.quorum","master.hadoop.com:2181,slave1.hadoop.com:2181,slave2.hadoop.com:2181");
            Connection conn=ConnectionFactory.createConnection(conf);
     
            Table table=conn.getTable(TableName.valueOf(args[1]));
            //Table table=conn.getTable(TableName.valueOf("RG"));
     
            Admin admin=conn.getAdmin();
     
            final String InputFile=args[0];//文件输入路径地址
     
            //final String InputFile="/user/hive/warehouse/taxidata/20121008000058.txt";//文件输入路径地址
     
            //final String OutputFile="/HBaseTest/Regularoutput";
            final String OutputFile=args[2];
            //final String InputFile="hdfs://master.hadoop.com:9000/fortest";
            //final String OutputFile="hdfs://master.hadoop.com:9000/HBaseTest/output";
            try {
                FileSystem fs = FileSystem.get(URI.create(OutputFile), conf);
                fs.delete(new Path(OutputFile), true);
                fs.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            final Path OutputPath=new Path(OutputFile);
     
            //设置相关类名
            Job job=Job.getInstance(conf,"RegularBulkLoad");
            job.setJarByClass(GeneratehfileDriver.class);
            job.setMapperClass(Generatehfile.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Put.class);
            job.setNumReduceTasks(4);
            //设置文件的输入路径和输出路径
           // job.setInputFormatClass(TextInputFormat.class);
     
            job.setInputFormatClass(CombineTextInputFormat.class);
            CombineTextInputFormat.setMaxInputSplitSize(job, 1024*1024*150);
            CombineTextInputFormat.setInputDirRecursive(job, true);
            CombineTextInputFormat.addInputPath(job, new Path(InputFile));
     
            job.setOutputFormatClass(HFileOutputFormat2.class);
     
            //FileInputFormat.setInputPaths(job, InputFile);
            FileOutputFormat.setOutputPath(job, OutputPath);
     
            FileInputFormat.setInputDirRecursive(job,true);
     
            //配置MapReduce作业,以执行增量加载到给定表中。
            HFileOutputFormat2.configureIncrementalLoad(job, table, conn.getRegionLocator(TableName.valueOf(args[1])));
            //HFileOutputFormat2.configureIncrementalLoad(job, table, conn.getRegionLocator(TableName.valueOf("RG")));
     
            //MapReduce作业完成,告知RegionServers在哪里找到这些文件,将文件加载到HBase中
            if(job.waitForCompletion(true)) {
                LoadIncrementalHFiles Loader=new LoadIncrementalHFiles(conf);
                Loader.doBulkLoad(OutputPath, admin, table, conn.getRegionLocator(TableName.valueOf(args[1])));
                //Loader.doBulkLoad(OutputPath, admin, table, conn.getRegionLocator(TableName.valueOf("RG")));
     
            }
        }
    }](
    
    
    

    代码是不出错了

    评论

报告相同问题?

悬赏问题

  • ¥20 易康econgnition精度验证
  • ¥15 线程问题判断多次进入
  • ¥15 msix packaging tool打包问题
  • ¥28 微信小程序开发页面布局没问题,真机调试的时候页面布局就乱了
  • ¥15 python的qt5界面
  • ¥15 无线电能传输系统MATLAB仿真问题
  • ¥50 如何用脚本实现输入法的热键设置
  • ¥20 我想使用一些网络协议或者部分协议也行,主要想实现类似于traceroute的一定步长内的路由拓扑功能
  • ¥30 深度学习,前后端连接
  • ¥15 孟德尔随机化结果不一致