wangweislk 2017-10-11 11:27
浏览 3891
已结题

Spark使用 saveAsNewAPIHadoopFile 通过Bulkload写Hbase错误

异常:
Caused by: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.Cell
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:152)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1125)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1123)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1123)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

核心代码:
Configuration conf = HBaseConfiguration.create();
    conf.set(TableOutputFormat.OUTPUT_TABLE, "wwtest");
    conf.set(ConfigUtils.getHbaseZK()._1(), ConfigUtils.getHbaseZK()._2());
    conf.set(ConfigUtils.getHbaseZKPort()._1(), ConfigUtils.getHbaseZKPort()._2());
    Job job = Job.getInstance();
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    job.setOutputFormatClass(HFileOutputFormat2.class);

    Connection conn = ConnectionFactory.createConnection(conf);
    TableName tableName = TableName.valueOf("wwtest");
    HRegionLocator regionLocator = new HRegionLocator(tableName, (ClusterConnection) conn);
    Table realTable = ((ClusterConnection) conn).getTable(tableName);

    HFileOutputFormat2.configureIncrementalLoad(job, realTable, regionLocator);

    SparkSession spark = SparkUtils.initSparkSessionESConf(HbaseBulkLoad.class.getName(), "local[3]");
    JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
    ArrayList<Integer> integers = new ArrayList<>();
    integers.add(1);
    integers.add(2);
    integers.add(3);
    JavaRDD<Integer> parallelize = javaSparkContext.parallelize(integers);
    JavaPairRDD<ImmutableBytesWritable, Put> mapToPair = parallelize.mapToPair(new PairFunction<Integer, ImmutableBytesWritable, Put>() {
        @Override
        public Tuple2<ImmutableBytesWritable, Put> call(Integer integer) throws Exception {

            /*KeyValue kv = new KeyValue(Bytes.toBytes(integer), "cf".getBytes(), "c1".getBytes(), Bytes.toBytes(integer));*/
            Put put = new Put(Bytes.toBytes(integer));
            /*put.addColumn("info".getBytes(), "c1".getBytes(), Bytes.toBytes(integer));*/
            put.add(new KeyValue(Bytes.toBytes(integer), "cf".getBytes(), "c1".getBytes(), Bytes.toBytes(integer)));

            /*put.addImmutable("info".getBytes(), "c1".getBytes(), Bytes.toBytes(integer));*/

            return new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(integer)), put);
        }
    }).sortByKey();

    mapToPair.saveAsNewAPIHadoopFile("/tmp/wwtest", ImmutableBytesWritable.class,
            Put.class, HFileOutputFormat2.class, job.getConfiguration());
  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
    • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
    • ¥20 有关区间dp的问题求解
    • ¥15 多电路系统共用电源的串扰问题
    • ¥15 slam rangenet++配置
    • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
    • ¥15 ubuntu子系统密码忘记
    • ¥15 保护模式-系统加载-段寄存器
    • ¥15 电脑桌面设定一个区域禁止鼠标操作
    • ¥15 求NPF226060磁芯的详细资料