在spark集成hbase的过程中,我先将hbase的数据读成如下RDD:
val sourceRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
然后再通过一系列转换后得到如下RDD:
val dataRDD: RDD[util.LinkedList[(ImmutableBytesWritable, Put)]] = sourceRDD.map(mapper(sourceFamily, sourceQualifier, sparse, targetFamily, targetQualifier))
现在我想将dataRDD写入到hbase中,但是如果用dataRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)的话,需要将dataRDD的RDD类型由RDD[util.LinkedList[(ImmutableBytesWritable, Put)]]转换成RDD[(ImmutableBytesWritable, Put)],想问下如何才能实现呢?