spark RDD中的元组如何按照指定格式保存到HDFS上? 5C

请教一个问题:spark数据清洗的结果为RDD[(String, String)]类型的rdd,在这个RDD中,每一个元素都是一个元组。元组的key值是文件名,value值是文件内容,我现在想把整个RDD保存在HDFS上,让RDD中的每一个元素保存为一个文件,其中key值作为文件名,而value值作为文件内容。

应该如何实现呢?

RDD好像不支持遍历,只能通过collect()方法保存为一个数组,再进行遍历,但是这样可能会把内存撑爆,目前的做法是先把RDD通过saveAsTextFile方法保存在HDFS上,然后再使用FSDataInputStream输入流对保存后的part文件进行遍历读取,使用输出流写到HDFS上,这样很耗时。

请问有没有好一点的方法,可以直接把RDD的内容写到HDFS上呢?

3个回答

RDD有forEachPartition,然后你可以在遍历每个partition的时候,取出对应的key和value值,然后通过实例化的FileSystem生成对应的文件,这样就可以了。但是估计效率不高

简单类型展开变成表不就行了吗 使用的时候在做成元组

RDD好像不支持遍历,只能通过collect()方法保存为一个数组,再进行遍历,但是这样可能会把内存撑爆,目前的做法是先把RDD通过saveAsTextFile方法保存在HDFS上,然后再使用FSDataInputStream输入流对保存后的part文件进行遍历读取,使用输出流写到HDFS上,这样很耗时

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
在使用Spark Streaming向HDFS中保存数据时,文件内容会被覆盖掉,怎么解决?

我的Spark Streaming代码如下所示: ``` val lines=FlumeUtils.createStream(ssc,"hdp2.domain",22222,StorageLevel.MEMORY_AND_DISK_SER_2) val words = lines.filter(examtep(_)) words.foreachRDD(exam(_)) //some other code def exam(rdd:RDD[SparkFlumeEvent]):Unit={ if(rdd.count()>0) { println("****Something*****") val newrdd=rdd.map(sfe=>{ val tmp=new String(sfe.event.getBody.array()) tmp }) newrdd.saveAsTextFile("/user/spark/appoutput/Temperaturetest") } } ``` 当words.foreachRDD(exam(_))中每次执行exam()方法的时候,都会执行newrdd.saveAsTextFile("/user/''''''"),但是HDFS上Temperaturetest文件夹里的内容每次都会被覆盖掉,只保存着最后一次saveAsTextFIle的内容,怎样才能让所有数据都存储到Temperaturetest中呢??

对Spark RDD中的数据进行处理

Spark新手。 现在在程序中生成了一个VertexRDD[(String,String)]. 其中的值是如下这种形式的: (3477,267 6106 7716 8221 18603 19717 28189) (2631,18589 18595 25725 26023 26026 27866) (10969,18591 25949 25956 26041) (10218,9320 19950 20493 26031) (5860,18583 18595 25725 26233) (11501,1551 26187 27170) (5717,2596 5187 5720 18583 25725) (950,19667 20493 25725 26024 26033 26192 27279 27281) (13397,19943 26377) (2899,4720 8411 19081 20100 20184 20270 20480 20493 20573 20574 25891) (11424,19816 19819 19841 20244 27098) (8951,5914 18609 26057) (1909,8797 18608 19785 19786 27531) (12807,20040 20608 27159)(后面用到的数据) (17953,1718 6112 18603 18608) 前面的值是key,后面的一串字符是value(由空格隔开) 现在我想对于这个RDD,将每一条数据value中的空格隔开的每个值取出并两两组合,形成一个新的key-value的数据,然后形成一个新的RDD,比如 对(12807,20040 20608 27159)这一条数据,处理后得到的是 (20040,20608) (20040,27159) (20608,27159) 怎么才能实现?求问

Spark RDD和HDFS数据一致性问题

这里想问个问题。 我用Spark SQL从HDFS load上来了一张表。 然后我现在有如下两种情况: 1. 新增数据都是通过Spark SQL load进去的 - 这时候我HDFS和RDD上面的数据是否一致 2. 我数据是直接load到了HDFS上面(例如是个分区表,增加了一个分区) - 这时候我HDFS和RDD上面的数据是否一致 麻烦给出详细的原理过程或者参考链接

关于spark RDD求平均的问题

hi, 假设我有一个spark RDD里面记录的是(时段,分数,次数) 我现在想求:每个时段的平均分数,即:同一个时段下,总分数 / 总次数 不知有什么好方法没有,因为我发现无论是action操作也好,转换成其他Rdd也好, 总没有满意方法,只能分成两个rdd然后关联处理 求大侠帮忙,谢谢

从hdfs中读取数据并用spark操作时出现问题

我从集群环境的hdfs中读取数据,然后处理数据时出现问题,在循环里面添加的对象在循环外就没有了,初学spark和scala,请大佬指点. object Test { case class Passenger(name: String, txn_date: String, txn_time: String, txn_station: String, ticket_type: String, trans_code: String, sub: String, txn_station_id: String) def main(args: Array[String]): Unit = { val inputFile = "hdfs://Master:9000/user/hadoop/input/tmp.txt" val conf = new SparkConf().setAppName("WordCount") val sc = new SparkContext(conf) val text = sc.textFile(inputFile) //25 lines like "000025643 " "20141201" "060912" "0328" "88" "22" "" from hdfs val Passengers = new ArrayBuffer[Passenger]() for (line <- text) { val points = for (i <- 0 until (line.length) if (line.charAt(i) == '"')) yield { i } val items = for (i <- 0 until (points.length) if (i % 2 == 0)) yield { if (!line.slice(points(i).toString.toInt + 1, points(i + 1).toString.toInt).equals("")) { line.slice(points(i).toString.toInt + 1, points(i + 1).toString.toInt).trim } else "null" } val tmp:Passenger=new Passenger(items(0).trim, items(1), items(2), items(3), items(4), items(5), "null", items(6)) println(tmp) //it is Passenger(000026853,20141201,060921,0325,88,21,null,null) [no problem] Passengers.append(tmp) println(Passengers.length) //1,2,3.....25 [no problem] } println("----------------------------" + Passengers.length) //it is 0!!!! why? val passengersArray = Passengers.toArray val customersRDD = sc.parallelize(passengersArray) val customersDF = customersRDD.toDF() } } ``` ```

spark shell在存运算结果到hdfs时报java.io.IOException: Not a file: hdfs://mini1:9000/spark/res

scala> sc.textFile("hdfs://mini1:9000/spark").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://mini1:9000/spark/res2") 执行上面的代码出错,这个目录在hdfs下是有的,而且就算没有也会创建。还有就是我运行的代码中是保存到res2目录 ,这里为什么报没有res目录 18/11/05 19:06:44 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes java.io.IOException: Not a file: hdfs://mini1:9000/spark/res at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:320) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:331) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:331) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:330) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:28) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:39) at $iwC$$iwC$$iwC.<init>(<console>:41) at $iwC$$iwC.<init>(<console>:43) at $iwC.<init>(<console>:45) at <init>(<console>:47) at .<init>(<console>:51) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

spark rdd 取前20条 saveAsHadoopDataset

如题,现在有一个需求,已经排序好了的数据我需要前20条存入Hbase,我的代码如下: certnoRDD.map(x=>{ val key=x._1.split("_")(0)+"_CETRNOTOP20_"+c.get(Calendar.YEAR)+"_"+(c.get(Calendar.MONTH)+1) (key,x._1.split("_")(1),x._2) }).map(convertRDD(HbaseTableName)(hbaseTableFamily)("INFOCONTENT")("NUM")).saveAsHadoopDataset(jobConfig) //关联关系保存 def convertRDD(tableName: String)(columnFamily: String)(columnName1: String)(columnName2: String) (triple: (String,String, Int)) = { val p = new Put(HbaseKeyUtils.convertRowKey(tableName, triple._1)) p.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName1), Bytes.toBytes(String.valueOf(triple._2))) p.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName2), Bytes.toBytes(String.valueOf(triple._3))) (new ImmutableBytesWritable, p) } 但我使用take或者top之后无法再使用saveAsHadoopDataset,求大神给指点下

spark中我需要判断一个rdd中的元素在另一个rdd中的位置

现在我遇到了个问题,我有两个rdd,我希望判断第一个rdd中的元素在第二个rdd中的第几个位置,如果没有就默认为0,请问这能做到吗?

把RDD存入文件,得到的文件内容如下,如何才能正确存入RDD的内容?

``` val result=data.select("termIndices").rdd.map { case Row(termIndices: mutable.WrappedArray[int]) => val res=termIndices.iterator.map{ t=>termMap.get(t) } res.toArray } result.saveAsTextFile("file:///D:/data/ld/ld3") ``` 打开文件,文件中的内容结果为 ``` [Lscala.Option;@7af280cd [Lscala.Option;@12e3ec85 [Lscala.Option;@4aaab25e [Lscala.Option;@6bbfebcf [Lscala.Option;@7bc42a7a ```

scala编写从hdfs文件取数据通过phoenix将数据批量插入到hbase,报错对象没有序列化

求大牛指导指导问题所在 批量插入的方法如下 class PhoenixClient_v2(host:String,port:String) extends Serializable { try{ Class.forName("org.apache.phoenix.jdbc.PhoenixDriver") } catch{ case e :ClassNotFoundException => println(e) } def InsertOfBatch(phoenixSQL:String,Key:Array[String],Column:Array[String],filepath:String=null,dataArr:Array[Map[String,Any]]=null) = { val sparkConf = new SparkConf().setMaster("local").setAppName("hdfs2hbase") val sc = new SparkContext(sparkConf) //从json格式的hdfs文件中读取 if (filepath != null) { val url = "jdbc:phoenix:" + host + ":" + port val conn = DriverManager.getConnection(url) val ps = conn.prepareStatement(phoenixSQL) try { //利用spark从文件系统或者hdfs上读取文件 val Rdd_file = sc.textFile(filepath) var HashKey:String = (hashCode()%100).toString //从rdd中取出相应数据到ps中 Rdd_file.map{y=> Key.foreach(x => if (x == "Optime") HashKey += "|"+new JSONObject(y).getString(x).split(" ")(0) else HashKey += "|"+new JSONObject(y).getString(x)) ps.setString(1,HashKey) Column.foreach(x => ps.setString(Column.indexOf(x) + 2, new JSONObject(y).getString(x))) ps.addBatch() } ps.executeBatch() conn.commit() } catch{ case e: SQLException => println("Insert into phoenix from file failed:" + e.toString) }finally { conn.close() ps.close() } } //读取Map[String,Any]映射的集合 if (dataArr != null){ val url = "jdbc:phoenix:" + host + ":" + port val conn = DriverManager.getConnection(url) val ps = conn.prepareStatement(phoenixSQL) try { val Rdd_Arr = sc.parallelize(dataArr) var HashKey:String = (hashCode()%100).toString Rdd_Arr.map{y=> Key.foreach(x => if (x == "Optime") HashKey += "|"+y.get(x).toString.split(" ")(0) else HashKey += "|"+y.get(x) ) ps.setString(1,HashKey) Column.foreach(x => ps.setString(Column.indexOf(x) + 2, y.get(x).toString)) ps.addBatch() } ps.executeBatch() conn.commit() } catch{ case e: SQLException => println("Insert into phoenix from file failed:" + e.toString) }finally { conn.close() ps.close() } } } } 调用的方式如下 object PhoenixTest{ def main(args: Array[String]): Unit = { val pc = new PhoenixClient_v2("zk1","2181") println("select result :"+pc.Select_Sql("select * from \"OP_record\"").toString) pc.CreateTable("create table if not exists \"OP_record\"(\"pk\" varchar(100) not null primary key, \"cf1\".\"curdonate\" varchar(20),\"cf1\".\"curcharge\" varchar(20),\"cf1\".\"uniqueid\" varchar(20))","OP_record") val KeyArr = Array("serverid","userid","Optime") val ColumnArr = Array("curdonate","curcharge","uniqueid") pc.InsertOfBatch("upsert into \"OP_record\" values(?,?,?,?)",KeyArr,ColumnArr,"hdfs://zk1:9000/chen/hive_OP_record_diamonds_20170702.log.1499028228.106") } } 报错信息如下: Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) at org.apache.spark.rdd.RDD.map(RDD.scala:317) at scala.Hbase.PhoenixClient_v2.InsertOfBatch(PhoenixClient_v2.scala:123) at scala.Hbase.PhoenixTest$.main(PhoenixClient_v2.scala:230) at scala.Hbase.PhoenixTest.main(PhoenixClient_v2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.phoenix.jdbc.PhoenixPreparedStatement Serialization stack: - object not serializable (class: org.apache.phoenix.jdbc.PhoenixPreparedStatement, value: upsert into "OP_record" values(?,?,?,?)) - field (class: scala.Hbase.PhoenixClient_v2$$anonfun$InsertOfBatch$1, name: ps$1, type: interface java.sql.PreparedStatement) - object (class scala.Hbase.PhoenixClient_v2$$anonfun$InsertOfBatch$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 21 more 17/07/06 16:35:26 INFO SparkContext: Invoking stop() from shutdown hook 17/07/06 16:35:26 INFO SparkUI: Stopped Spark web UI at http://192.168.12.243:4040 17/07/06 16:35:26 INFO DAGScheduler: Stopping DAGScheduler 17/07/06 16:35:27 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 17/07/06 16:35:27 INFO MemoryStore: MemoryStore cleared 17/07/06 16:35:27 INFO BlockManager: BlockManager stopped 17/07/06 16:35:27 INFO BlockManagerMaster: BlockManagerMaster stopped 17/07/06 16:35:27 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 17/07/06 16:35:27 INFO SparkContext: Successfully stopped SparkContext 17/07/06 16:35:27 INFO ShutdownHookManager: Shutdown hook called 17/07/06 16:35:27 INFO ShutdownHookManager: Deleting directory /tmp/spark-18cc3337-cdda-498f-a7f3-124f239a6bfe

spark的rdd 可以看做数组吗?那么 可以随机取里面的数据吗?

``` Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Python version 2.7.9 (default, Sep 25 2018 20:42:16) SparkSession available as 'spark'. >>> sc=spark.read.text('/tmp/temp_file_5.part.gz') >>> sc.count() 19839 >>> 我想将这个文件分成4分, 0-5000,5000-10000,15000-19839 怎么将这个rrd分成4份了? 我想取 第h行的数据,能有好的办法吗? ```

spark 中rdd与dataframe的合并(join)

以下是我写的代码: ``` /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // scalastyle:off println package com.shine.ncc import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.mllib.classification.NaiveBayesModel import org.apache.spark.rdd.RDD import org.apache.spark.streaming.Time import org.apache.spark.sql.SQLContext import org.apache.spark.SparkContext import org.apache.spark.ml.feature.Tokenizer import org.ansj.splitWord.analysis.ToAnalysis import org.ansj.util.FilterModifWord import java.util.Arrays import org.apache.spark.mllib.feature.HashingTF import scala.collection.JavaConversions._ import org.apache.spark.mllib.feature.IDF import org.apache.spark.mllib.feature.IDFModel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes object NetworkNewsClassify1 { var sameModel = null /** Case class for converting RDD to DataFrame */ case class Record(content: String,time:String,title:String) /** Lazily instantiated singleton instance of SQLContext */ object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance } } def main(args: Array[String]) { // if (args.length < 2) { // System.err.println("Usage: NetworkWordCount <hostname> <port>") // System.exit(1) // } StreamingExamples.setStreamingLogLevels() // Create the context with a 1 second batch size val sparkConf = new SparkConf().setAppName("NetworkNewsClassify") sparkConf.setMaster("local[2]"); val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a socket stream on target ip:port and count the 获取json信息 val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) val myNaiveBayesModel = NaiveBayesModel.load(ssc.sparkContext, "D:/myNaiveBayesModel") //将接送转换成rdd lines.foreachRDD((rdd: RDD[String], time: Time) => { // Get the singleton instance of SQLContext val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ val newsDF = sqlContext.read.json(rdd) newsDF.count(); val featurizedData = newsDF.map{ line => val temp = ToAnalysis.parse(line.getAs("title")) //加入停用词 FilterModifWord.insertStopWords(Arrays.asList("r","n")) //加入停用词性???? FilterModifWord.insertStopNatures("w",null,"ns","r","u","e") val filter = FilterModifWord.modifResult(temp) //此步骤将会只取分词,不附带词性 val words = for(i<-Range(0,filter.size())) yield filter.get(i).getName //println(words.mkString(" ; ")); //计算每个词在文档中的词频 new HashingTF(500000).transform(words) }.cache() if(featurizedData.count()>0){ //计算每个词的TF-IDF val idf = new IDF() val idfModel = idf.fit(featurizedData) val tfidfData = idfModel.transform(featurizedData); //分类预测 val resultData = myNaiveBayesModel.predict(tfidfData) println(resultData) //将result结果与newsDF信息join在一起 //**??? 不会实现了。。。** //保存新闻到hbase中 } }) ssc.start() ssc.awaitTermination() } } ``` 其中newsDF是新闻信息,包含字段(title,body,date),resultData 是通过贝叶斯模型预测的新闻类型,我现在希望把result结果作为一个type字段与newsDF合并(join),保存到hbase中,这个合并的操作怎么做呢

可否让spark算子执行到某一步时,通过某些控制条件,让整个spark程序停止,或者在那一步保存结果到文件?

有这么一个需求:让rdd1执行map(或其他算子),当出现满足条件的情况时,控制整个spark程序停止,或是保存停止结果到文件(优先保存结果到文件),而不继续执行后续步骤 ``` //示例,大概就是这么一个意思:当出现值为5的行,停止程序或是保存结果 val rdd2 = rdd1.map(x =>{ if(x==5){ //整个spark程序停止,或是保存停止结果到文件 } ........... } ) rdd2.count() ``` 请问有什么可以实现的方法吗?

spark pair RDD创建操作

对于一个文件,每一行如下: ID\t value:value(value的数量不固定) 如何创建RDD使得每一个value对应于一个ID? 希望是python的spark解答

为什么Spark只会惰性计算RDD?

为什么Spark只会惰性计算RDD?为什么只有第一次在一个行动操作中用到时,才会真正计算?

spark读取不了本地文件是怎么回事

``` textFile=sc.textFile("file:///home/hduser/pythonwork/ipynotebook/data/test.txt") stringRDD=textFile.flatMap(lambda line:line.split(' ')) stringRDD.collect() ``` 我此路径下是有test文件的: ![图片说明](https://img-ask.csdn.net/upload/201805/18/1526634813_44673.png) 错误是: ``` Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 8.0 failed 4 times, most recent failure: Lost task 1.3 in stage 8.0 (TID 58, 192.168.56.103, executor 1): java.io.FileNotFoundException: File file:/home/hduser/pythonwork/ipynotebook/data/test.txt does not exist 。 。 。 Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) 。 。 。 Caused by: java.io.FileNotFoundException: File file:/home/hduser/pythonwork/ipynotebook/data/test.txt does not exist ``` 而且发现若我把代码中test.txt随便改一个名字,比如ttest.txt(肯定是没有的文件) 错误竟然发生了变化: ``` Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/hduser/pythonwork/ipynotebook/data/tesst.txt at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:53) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.collect(RDD.scala:938) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) ``` 注意: 此时我是以spark集群跑的:'spark://emaster:7077' 若是以本地跑就可以找到本地的那个test.txt文件 找hdfs文件系统的文件可以找到(在spark集群跑情况下) 。。。处由于字数显示省略了些不重要的错误提示,若想知道的话可以回复我 跪求大神帮助~感激不尽!!!

spark分组排序提取前N个值

求教各位大神: 本人用scalas+spark开发,用RDD实现以下需求时遇到困难! 数据: 用户 位置 天数 user1 L1 28 user1 L2 20 user1 L3 15 user2 L1 30 user2 L2 15 user3 L5 3 user3 L6 18 user4 L7 4 通过spark RDD怎样实现按用户分组提取每个用户天数最大的位置 希望数据结果: RDD: array((user1,L1,28),(user2,L1,30),(user3 , L6,18),(user4,,7 4)) 这里主体是根据用户分组计算最大天数,并把位置带出来,研究半天无果,求大神指教

spark计算hdfs上的文件时报错

scala> val rdd = sc.textFile("hdfs://...") scala> rdd.count java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$AppendRequestProto overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;

在中国程序员是青春饭吗?

今年,我也32了 ,为了不给大家误导,咨询了猎头、圈内好友,以及年过35岁的几位老程序员……舍了老脸去揭人家伤疤……希望能给大家以帮助,记得帮我点赞哦。 目录: 你以为的人生 一次又一次的伤害 猎头界的真相 如何应对互联网行业的「中年危机」 一、你以为的人生 刚入行时,拿着傲人的工资,想着好好干,以为我们的人生是这样的: 等真到了那一天,你会发现,你的人生很可能是这样的: ...

程序员请照顾好自己,周末病魔差点一套带走我。

程序员在一个周末的时间,得了重病,差点当场去世,还好及时挽救回来了。

Java基础知识面试题(2020最新版)

文章目录Java概述何为编程什么是Javajdk1.5之后的三大版本JVM、JRE和JDK的关系什么是跨平台性?原理是什么Java语言有哪些特点什么是字节码?采用字节码的最大好处是什么什么是Java程序的主类?应用程序和小程序的主类有何不同?Java应用程序与小程序之间有那些差别?Java和C++的区别Oracle JDK 和 OpenJDK 的对比基础语法数据类型Java有哪些数据类型switc...

和黑客斗争的 6 天!

互联网公司工作,很难避免不和黑客们打交道,我呆过的两家互联网公司,几乎每月每天每分钟都有黑客在公司网站上扫描。有的是寻找 Sql 注入的缺口,有的是寻找线上服务器可能存在的漏洞,大部分都...

Intellij IDEA 实用插件安利

1. 前言从2020 年 JVM 生态报告解读 可以看出Intellij IDEA 目前已经稳坐 Java IDE 头把交椅。而且统计得出付费用户已经超过了八成(国外统计)。IDEA 的...

搜狗输入法也在挑战国人的智商!

故事总是一个接着一个到来...上周写完《鲁大师已经彻底沦为一款垃圾流氓软件!》这篇文章之后,鲁大师的市场工作人员就找到了我,希望把这篇文章删除掉。经过一番沟通我先把这篇文章从公号中删除了...

总结了 150 余个神奇网站,你不来瞅瞅吗?

原博客再更新,可能就没了,之后将持续更新本篇博客。

副业收入是我做程序媛的3倍,工作外的B面人生是怎样的?

提到“程序员”,多数人脑海里首先想到的大约是:为人木讷、薪水超高、工作枯燥…… 然而,当离开工作岗位,撕去层层标签,脱下“程序员”这身外套,有的人生动又有趣,马上展现出了完全不同的A/B面人生! 不论是简单的爱好,还是正经的副业,他们都干得同样出色。偶尔,还能和程序员的特质结合,产生奇妙的“化学反应”。 @Charlotte:平日素颜示人,周末美妆博主 大家都以为程序媛也个个不修边幅,但我们也许...

MySQL数据库面试题(2020最新版)

文章目录数据库基础知识为什么要使用数据库什么是SQL?什么是MySQL?数据库三大范式是什么mysql有关权限的表都有哪几个MySQL的binlog有有几种录入格式?分别有什么区别?数据类型mysql有哪些数据类型引擎MySQL存储引擎MyISAM与InnoDB区别MyISAM索引与InnoDB索引的区别?InnoDB引擎的4大特性存储引擎选择索引什么是索引?索引有哪些优缺点?索引使用场景(重点)...

如果你是老板,你会不会踢了这样的员工?

有个好朋友ZS,是技术总监,昨天问我:“有一个老下属,跟了我很多年,做事勤勤恳恳,主动性也很好。但随着公司的发展,他的进步速度,跟不上团队的步伐了,有点...

我入职阿里后,才知道原来简历这么写

私下里,有不少读者问我:“二哥,如何才能写出一份专业的技术简历呢?我总感觉自己写的简历太烂了,所以投了无数份,都石沉大海了。”说实话,我自己好多年没有写过简历了,但我认识的一个同行,他在阿里,给我说了一些他当年写简历的方法论,我感觉太牛逼了,实在是忍不住,就分享了出来,希望能够帮助到你。 01、简历的本质 作为简历的撰写者,你必须要搞清楚一点,简历的本质是什么,它就是为了来销售你的价值主张的。往深...

魂迁光刻,梦绕芯片,中芯国际终获ASML大型光刻机

据羊城晚报报道,近日中芯国际从荷兰进口的一台大型光刻机,顺利通过深圳出口加工区场站两道闸口进入厂区,中芯国际发表公告称该光刻机并非此前盛传的EUV光刻机,主要用于企业复工复产后的生产线扩容。 我们知道EUV主要用于7nm及以下制程的芯片制造,光刻机作为集成电路制造中最关键的设备,对芯片制作工艺有着决定性的影响,被誉为“超精密制造技术皇冠上的明珠”,根据之前中芯国际的公报,目...

优雅的替换if-else语句

场景 日常开发,if-else语句写的不少吧??当逻辑分支非常多的时候,if-else套了一层又一层,虽然业务功能倒是实现了,但是看起来是真的很不优雅,尤其是对于我这种有强迫症的程序"猿",看到这么多if-else,脑袋瓜子就嗡嗡的,总想着解锁新姿势:干掉过多的if-else!!!本文将介绍三板斧手段: 优先判断条件,条件不满足的,逻辑及时中断返回; 采用策略模式+工厂模式; 结合注解,锦...

离职半年了,老东家又发 offer,回不回?

有小伙伴问松哥这个问题,他在上海某公司,在离职了几个月后,前公司的领导联系到他,希望他能够返聘回去,他很纠结要不要回去? 俗话说好马不吃回头草,但是这个小伙伴既然感到纠结了,我觉得至少说明了两个问题:1.曾经的公司还不错;2.现在的日子也不是很如意。否则应该就不会纠结了。 老实说,松哥之前也有过类似的经历,今天就来和小伙伴们聊聊回头草到底吃不吃。 首先一个基本观点,就是离职了也没必要和老东家弄的苦...

2020阿里全球数学大赛:3万名高手、4道题、2天2夜未交卷

阿里巴巴全球数学竞赛( Alibaba Global Mathematics Competition)由马云发起,由中国科学技术协会、阿里巴巴基金会、阿里巴巴达摩院共同举办。大赛不设报名门槛,全世界爱好数学的人都可参与,不论是否出身数学专业、是否投身数学研究。 2020年阿里巴巴达摩院邀请北京大学、剑桥大学、浙江大学等高校的顶尖数学教师组建了出题组。中科院院士、美国艺术与科学院院士、北京国际数学...

为什么你不想学习?只想玩?人是如何一步一步废掉的

不知道是不是只有我这样子,还是你们也有过类似的经历。 上学的时候总有很多光辉历史,学年名列前茅,或者单科目大佬,但是虽然慢慢地长大了,你开始懈怠了,开始废掉了。。。 什么?你说不知道具体的情况是怎么样的? 我来告诉你: 你常常潜意识里或者心理觉得,自己真正的生活或者奋斗还没有开始。总是幻想着自己还拥有大把时间,还有无限的可能,自己还能逆风翻盘,只不是自己还没开始罢了,自己以后肯定会变得特别厉害...

百度工程师,获利10万,判刑3年!

所有一夜暴富的方法都写在刑法中,但总有人心存侥幸。这些年互联网犯罪高发,一些工程师高技术犯罪更是引发关注。这两天,一个百度运维工程师的案例传遍朋友圈。1...

程序员为什么千万不要瞎努力?

本文作者用对比非常鲜明的两个开发团队的故事,讲解了敏捷开发之道 —— 如果你的团队缺乏统一标准的环境,那么即使勤劳努力,不仅会极其耗时而且成果甚微,使用...

为什么程序员做外包会被瞧不起?

二哥,有个事想询问下您的意见,您觉得应届生值得去外包吗?公司虽然挺大的,中xx,但待遇感觉挺低,马上要报到,挺纠结的。

当HR压你价,说你只值7K,你该怎么回答?

当HR压你价,说你只值7K时,你可以流畅地回答,记住,是流畅,不能犹豫。 礼貌地说:“7K是吗?了解了。嗯~其实我对贵司的面试官印象很好。只不过,现在我的手头上已经有一份11K的offer。来面试,主要也是自己对贵司挺有兴趣的,所以过来看看……”(未完) 这段话主要是陪HR互诈的同时,从公司兴趣,公司职员印象上,都给予对方正面的肯定,既能提升HR的好感度,又能让谈判气氛融洽,为后面的发挥留足空间。...

面试:第十六章:Java中级开发

HashMap底层实现原理,红黑树,B+树,B树的结构原理 Spring的AOP和IOC是什么?它们常见的使用场景有哪些?Spring事务,事务的属性,传播行为,数据库隔离级别 Spring和SpringMVC,MyBatis以及SpringBoot的注解分别有哪些?SpringMVC的工作原理,SpringBoot框架的优点,MyBatis框架的优点 SpringCould组件有哪些,他们...

面试阿里p7,被按在地上摩擦,鬼知道我经历了什么?

面试阿里p7被问到的问题(当时我只知道第一个):@Conditional是做什么的?@Conditional多个条件是什么逻辑关系?条件判断在什么时候执...

无代码时代来临,程序员如何保住饭碗?

编程语言层出不穷,从最初的机器语言到如今2500种以上的高级语言,程序员们大呼“学到头秃”。程序员一边面临编程语言不断推陈出新,一边面临由于许多代码已存在,程序员编写新应用程序时存在重复“搬砖”的现象。 无代码/低代码编程应运而生。无代码/低代码是一种创建应用的方法,它可以让开发者使用最少的编码知识来快速开发应用程序。开发者通过图形界面中,可视化建模来组装和配置应用程序。这样一来,开发者直...

面试了一个 31 岁程序员,让我有所触动,30岁以上的程序员该何去何从?

最近面试了一个31岁8年经验的程序猿,让我有点感慨,大龄程序猿该何去何从。

大三实习生,字节跳动面经分享,已拿Offer

说实话,自己的算法,我一个不会,太难了吧

程序员垃圾简历长什么样?

已经连续五年参加大厂校招、社招的技术面试工作,简历看的不下于万份 这篇文章会用实例告诉你,什么是差的程序员简历! 疫情快要结束了,各个公司也都开始春招了,作为即将红遍大江南北的新晋UP主,那当然要为小伙伴们做点事(手动狗头)。 就在公众号里公开征简历,义务帮大家看,并一一点评。《启舰:春招在即,义务帮大家看看简历吧》 一石激起千层浪,三天收到两百多封简历。 花光了两个星期的所有空闲时...

《Oracle Java SE编程自学与面试指南》最佳学习路线图2020年最新版(进大厂必备)

正确选择比瞎努力更重要!

字节跳动面试官竟然问了我JDBC?

轻松等回家通知

面试官:你连SSO都不懂,就别来面试了

大厂竟然要考我SSO,卧槽。

实时更新:计算机编程语言排行榜—TIOBE世界编程语言排行榜(2020年6月份最新版)

内容导航: 1、TIOBE排行榜 2、总榜(2020年6月份) 3、本月前三名 3.1、C 3.2、Java 3.3、Python 4、学习路线图 5、参考地址 1、TIOBE排行榜 TIOBE排行榜是根据全世界互联网上有经验的程序员、课程和第三方厂商的数量,并使用搜索引擎(如Google、Bing、Yahoo!)以及Wikipedia、Amazon、YouTube统计出排名数据。

阿里面试官让我用Zk(Zookeeper)实现分布式锁

他可能没想到,我当场手写出来了

立即提问
相关内容推荐