只一刀 2016-06-22 06:08 采纳率: 0%
浏览 7329

spark 中rdd与dataframe的合并(join)

以下是我写的代码:

 /*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// scalastyle:off println
package com.shine.ncc

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.mllib.classification.NaiveBayesModel
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Time
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.Tokenizer
import org.ansj.splitWord.analysis.ToAnalysis
import org.ansj.util.FilterModifWord
import java.util.Arrays
import org.apache.spark.mllib.feature.HashingTF
import scala.collection.JavaConversions._
import org.apache.spark.mllib.feature.IDF
import org.apache.spark.mllib.feature.IDFModel
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes

object NetworkNewsClassify1 {
  var sameModel = null 

  /** Case class for converting RDD to DataFrame */
  case class Record(content: String,time:String,title:String)


  /** Lazily instantiated singleton instance of SQLContext */
  object SQLContextSingleton {

    @transient  private var instance: SQLContext = _

    def getInstance(sparkContext: SparkContext): SQLContext = {
      if (instance == null) {
        instance = new SQLContext(sparkContext)
      }
      instance
    }
  }

  def main(args: Array[String]) {
//    if (args.length < 2) {
//      System.err.println("Usage: NetworkWordCount <hostname> <port>")
//      System.exit(1)
//    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkNewsClassify")
    sparkConf.setMaster("local[2]");
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the   获取json信息
    val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
    val myNaiveBayesModel = NaiveBayesModel.load(ssc.sparkContext, "D:/myNaiveBayesModel")
    //将接送转换成rdd
    lines.foreachRDD((rdd: RDD[String], time: Time) => {
      // Get the singleton instance of SQLContext
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      val newsDF = sqlContext.read.json(rdd)
      newsDF.count();
      val featurizedData = newsDF.map{
          line => 
            val temp = ToAnalysis.parse(line.getAs("title"))
            //加入停用词 
            FilterModifWord.insertStopWords(Arrays.asList("r","n"))
            //加入停用词性???? 
            FilterModifWord.insertStopNatures("w",null,"ns","r","u","e")
            val filter = FilterModifWord.modifResult(temp)
            //此步骤将会只取分词,不附带词性
            val words = for(i<-Range(0,filter.size())) yield filter.get(i).getName
            //println(words.mkString("  ;  "));
            //计算每个词在文档中的词频
            new HashingTF(500000).transform(words)
      }.cache()
      if(featurizedData.count()>0){
        //计算每个词的TF-IDF
        val idf = new IDF()
        val idfModel = idf.fit(featurizedData)
        val tfidfData = idfModel.transform(featurizedData);
        //分类预测
        val resultData = myNaiveBayesModel.predict(tfidfData)
        println(resultData)

        //将result结果与newsDF信息join在一起
        //**??? 不会实现了。。。**
        //保存新闻到hbase中

      }

    })


    ssc.start()
    ssc.awaitTermination()
  }
}

其中newsDF是新闻信息,包含字段(title,body,date),resultData 是通过贝叶斯模型预测的新闻类型,我现在希望把result结果作为一个type字段与newsDF合并(join),保存到hbase中,这个合并的操作怎么做呢

  • 写回答

1条回答

  • CSDN-Ada助手 CSDN-AI 官方账号 2022-10-27 15:49
    关注
    不知道你这个问题是否已经解决, 如果还没有解决的话:

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

悬赏问题

  • ¥15 Stata 面板数据模型选择
  • ¥20 idea运行测试代码报错问题
  • ¥15 网络监控:网络故障告警通知
  • ¥15 django项目运行报编码错误
  • ¥15 请问这个是什么意思?
  • ¥15 STM32驱动继电器
  • ¥15 Windows server update services
  • ¥15 关于#c语言#的问题:我现在在做一个墨水屏设计,2.9英寸的小屏怎么换4.2英寸大屏
  • ¥15 模糊pid与pid仿真结果几乎一样
  • ¥15 java的GUI的运用