只一刀 2016-06-22 06:08 采纳率: 0%
浏览 7329

spark 中rdd与dataframe的合并(join)

以下是我写的代码:

 /*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// scalastyle:off println
package com.shine.ncc

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.mllib.classification.NaiveBayesModel
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Time
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.Tokenizer
import org.ansj.splitWord.analysis.ToAnalysis
import org.ansj.util.FilterModifWord
import java.util.Arrays
import org.apache.spark.mllib.feature.HashingTF
import scala.collection.JavaConversions._
import org.apache.spark.mllib.feature.IDF
import org.apache.spark.mllib.feature.IDFModel
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes

object NetworkNewsClassify1 {
  var sameModel = null 

  /** Case class for converting RDD to DataFrame */
  case class Record(content: String,time:String,title:String)


  /** Lazily instantiated singleton instance of SQLContext */
  object SQLContextSingleton {

    @transient  private var instance: SQLContext = _

    def getInstance(sparkContext: SparkContext): SQLContext = {
      if (instance == null) {
        instance = new SQLContext(sparkContext)
      }
      instance
    }
  }

  def main(args: Array[String]) {
//    if (args.length < 2) {
//      System.err.println("Usage: NetworkWordCount <hostname> <port>")
//      System.exit(1)
//    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkNewsClassify")
    sparkConf.setMaster("local[2]");
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the   获取json信息
    val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
    val myNaiveBayesModel = NaiveBayesModel.load(ssc.sparkContext, "D:/myNaiveBayesModel")
    //将接送转换成rdd
    lines.foreachRDD((rdd: RDD[String], time: Time) => {
      // Get the singleton instance of SQLContext
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      val newsDF = sqlContext.read.json(rdd)
      newsDF.count();
      val featurizedData = newsDF.map{
          line => 
            val temp = ToAnalysis.parse(line.getAs("title"))
            //加入停用词 
            FilterModifWord.insertStopWords(Arrays.asList("r","n"))
            //加入停用词性???? 
            FilterModifWord.insertStopNatures("w",null,"ns","r","u","e")
            val filter = FilterModifWord.modifResult(temp)
            //此步骤将会只取分词,不附带词性
            val words = for(i<-Range(0,filter.size())) yield filter.get(i).getName
            //println(words.mkString("  ;  "));
            //计算每个词在文档中的词频
            new HashingTF(500000).transform(words)
      }.cache()
      if(featurizedData.count()>0){
        //计算每个词的TF-IDF
        val idf = new IDF()
        val idfModel = idf.fit(featurizedData)
        val tfidfData = idfModel.transform(featurizedData);
        //分类预测
        val resultData = myNaiveBayesModel.predict(tfidfData)
        println(resultData)

        //将result结果与newsDF信息join在一起
        //**??? 不会实现了。。。**
        //保存新闻到hbase中

      }

    })


    ssc.start()
    ssc.awaitTermination()
  }
}

其中newsDF是新闻信息,包含字段(title,body,date),resultData 是通过贝叶斯模型预测的新闻类型,我现在希望把result结果作为一个type字段与newsDF合并(join),保存到hbase中,这个合并的操作怎么做呢

  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2022-10-27 15:49
    关注
    不知道你这个问题是否已经解决, 如果还没有解决的话:

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

悬赏问题

  • ¥15 安卓adb backup备份应用数据失败
  • ¥15 eclipse运行项目时遇到的问题
  • ¥15 关于#c##的问题:最近需要用CAT工具Trados进行一些开发
  • ¥15 南大pa1 小游戏没有界面,并且报了如下错误,尝试过换显卡驱动,但是好像不行
  • ¥15 没有证书,nginx怎么反向代理到只能接受https的公网网站
  • ¥50 成都蓉城足球俱乐部小程序抢票
  • ¥15 yolov7训练自己的数据集
  • ¥15 esp8266与51单片机连接问题(标签-单片机|关键词-串口)(相关搜索:51单片机|单片机|测试代码)
  • ¥15 电力市场出清matlab yalmip kkt 双层优化问题
  • ¥30 ros小车路径规划实现不了,如何解决?(操作系统-ubuntu)