wangweislk 2017-10-11 11:27
浏览 3891
已结题

Spark使用 saveAsNewAPIHadoopFile 通过Bulkload写Hbase错误

异常:
Caused by: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.Cell
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:152)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1125)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1123)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1123)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

核心代码:
Configuration conf = HBaseConfiguration.create();
    conf.set(TableOutputFormat.OUTPUT_TABLE, "wwtest");
    conf.set(ConfigUtils.getHbaseZK()._1(), ConfigUtils.getHbaseZK()._2());
    conf.set(ConfigUtils.getHbaseZKPort()._1(), ConfigUtils.getHbaseZKPort()._2());
    Job job = Job.getInstance();
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    job.setOutputFormatClass(HFileOutputFormat2.class);

    Connection conn = ConnectionFactory.createConnection(conf);
    TableName tableName = TableName.valueOf("wwtest");
    HRegionLocator regionLocator = new HRegionLocator(tableName, (ClusterConnection) conn);
    Table realTable = ((ClusterConnection) conn).getTable(tableName);

    HFileOutputFormat2.configureIncrementalLoad(job, realTable, regionLocator);

    SparkSession spark = SparkUtils.initSparkSessionESConf(HbaseBulkLoad.class.getName(), "local[3]");
    JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
    ArrayList<Integer> integers = new ArrayList<>();
    integers.add(1);
    integers.add(2);
    integers.add(3);
    JavaRDD<Integer> parallelize = javaSparkContext.parallelize(integers);
    JavaPairRDD<ImmutableBytesWritable, Put> mapToPair = parallelize.mapToPair(new PairFunction<Integer, ImmutableBytesWritable, Put>() {
        @Override
        public Tuple2<ImmutableBytesWritable, Put> call(Integer integer) throws Exception {

            /*KeyValue kv = new KeyValue(Bytes.toBytes(integer), "cf".getBytes(), "c1".getBytes(), Bytes.toBytes(integer));*/
            Put put = new Put(Bytes.toBytes(integer));
            /*put.addColumn("info".getBytes(), "c1".getBytes(), Bytes.toBytes(integer));*/
            put.add(new KeyValue(Bytes.toBytes(integer), "cf".getBytes(), "c1".getBytes(), Bytes.toBytes(integer)));

            /*put.addImmutable("info".getBytes(), "c1".getBytes(), Bytes.toBytes(integer));*/

            return new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(integer)), put);
        }
    }).sortByKey();

    mapToPair.saveAsNewAPIHadoopFile("/tmp/wwtest", ImmutableBytesWritable.class,
            Put.class, HFileOutputFormat2.class, job.getConfiguration());
  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 DIFY API Endpoint 问题。
    • ¥20 sub地址DHCP问题
    • ¥15 delta降尺度计算的一些细节,有偿
    • ¥15 Arduino红外遥控代码有问题
    • ¥15 数值计算离散正交多项式
    • ¥30 数值计算均差系数编程
    • ¥15 redis-full-check比较 两个集群的数据出错
    • ¥15 Matlab编程问题
    • ¥15 训练的多模态特征融合模型准确度很低怎么办
    • ¥15 kylin启动报错log4j类冲突