异常:
Caused by: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.Cell
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:152)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1125)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1123)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1123)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
核心代码:
Configuration conf = HBaseConfiguration.create();
conf.set(TableOutputFormat.OUTPUT_TABLE, "wwtest");
conf.set(ConfigUtils.getHbaseZK()._1(), ConfigUtils.getHbaseZK()._2());
conf.set(ConfigUtils.getHbaseZKPort()._1(), ConfigUtils.getHbaseZKPort()._2());
Job job = Job.getInstance();
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
Connection conn = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf("wwtest");
HRegionLocator regionLocator = new HRegionLocator(tableName, (ClusterConnection) conn);
Table realTable = ((ClusterConnection) conn).getTable(tableName);
HFileOutputFormat2.configureIncrementalLoad(job, realTable, regionLocator);
SparkSession spark = SparkUtils.initSparkSessionESConf(HbaseBulkLoad.class.getName(), "local[3]");
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
ArrayList<Integer> integers = new ArrayList<>();
integers.add(1);
integers.add(2);
integers.add(3);
JavaRDD<Integer> parallelize = javaSparkContext.parallelize(integers);
JavaPairRDD<ImmutableBytesWritable, Put> mapToPair = parallelize.mapToPair(new PairFunction<Integer, ImmutableBytesWritable, Put>() {
@Override
public Tuple2<ImmutableBytesWritable, Put> call(Integer integer) throws Exception {
/*KeyValue kv = new KeyValue(Bytes.toBytes(integer), "cf".getBytes(), "c1".getBytes(), Bytes.toBytes(integer));*/
Put put = new Put(Bytes.toBytes(integer));
/*put.addColumn("info".getBytes(), "c1".getBytes(), Bytes.toBytes(integer));*/
put.add(new KeyValue(Bytes.toBytes(integer), "cf".getBytes(), "c1".getBytes(), Bytes.toBytes(integer)));
/*put.addImmutable("info".getBytes(), "c1".getBytes(), Bytes.toBytes(integer));*/
return new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(integer)), put);
}
}).sortByKey();
mapToPair.saveAsNewAPIHadoopFile("/tmp/wwtest", ImmutableBytesWritable.class,
Put.class, HFileOutputFormat2.class, job.getConfiguration());