spark RDD中的元组如何按照指定格式保存到HDFS上? 5C

请教一个问题:spark数据清洗的结果为RDD[(String, String)]类型的rdd,在这个RDD中,每一个元素都是一个元组。元组的key值是文件名,value值是文件内容,我现在想把整个RDD保存在HDFS上,让RDD中的每一个元素保存为一个文件,其中key值作为文件名,而value值作为文件内容。

应该如何实现呢?

RDD好像不支持遍历,只能通过collect()方法保存为一个数组,再进行遍历,但是这样可能会把内存撑爆,目前的做法是先把RDD通过saveAsTextFile方法保存在HDFS上,然后再使用FSDataInputStream输入流对保存后的part文件进行遍历读取,使用输出流写到HDFS上,这样很耗时。

请问有没有好一点的方法,可以直接把RDD的内容写到HDFS上呢?

0

3个回答

RDD有forEachPartition,然后你可以在遍历每个partition的时候,取出对应的key和value值,然后通过实例化的FileSystem生成对应的文件,这样就可以了。但是估计效率不高

0

简单类型展开变成表不就行了吗 使用的时候在做成元组

0

RDD好像不支持遍历,只能通过collect()方法保存为一个数组,再进行遍历,但是这样可能会把内存撑爆,目前的做法是先把RDD通过saveAsTextFile方法保存在HDFS上,然后再使用FSDataInputStream输入流对保存后的part文件进行遍历读取,使用输出流写到HDFS上,这样很耗时

-2
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
spark rdd根据key保存进不同的文件夹
1、首先rdd必须是(key,value)形式。本例中是根据createTimeStr作为key. 这个值是yyyy-MM-dd形式 val mrdd = ds.map(x => {         val jsonObject = JSON.parseObject(x._2)         //2017-07-18 14:16:13         val createTi
Spark RDD 按Key保存到不同文件
基本需求 将Keyed RDD[(Key,Value)]按Key保存到不同文件。 测试数据 数据格式:id,studentId,language,math,english,classId,departmentId 1,111,68,69,90,Class1,Economy 2,112,73,80,96,Class1,Economy 3,113,90,74,75,Class1,Economy 4...
spark读hdfs文件实现wordcount并将结果存回hdfs
package iie.udps.example.operator.spark; import scala.Tuple2; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.ap
spark在HDFS上保存/读取 map
保存: 序列化以后保存位ObjectFile val sc =spark.sparkContext var EncodeMap = scala.collection.mutable.Map[String,Map[String,Int]]() sc.parallelize(EncodeMap.toSeq).saveAsObjectFile(feature_map_path) 读取: 先定义Map结...
Spark把RDD数据保存到一个单个文件中
Spark是当前最流行的分布式数据处理框架之一,相比于Hadoop,Spark在数据的处理方面更加灵活方便。然而在最近的使用中遇到了一点小麻烦:Spark保存文件的的函数(如saveAsTextFile)在保存数据时都需要新建一个目录,然后在这个目录下分块保存文件。如果我们想在原有的目录下增加一个文件(而不是增加一个目录),Spark就无能为力了。         有网友给出建议,用rddx.rep
spark 通过 RDD 从HDFS文件加载JSON文件到sql表
RDD定义RDD全称是Resilient Distributed Dataset, 是spark的核心抽象层,通过它可以读取多种文件,这里演示如何读取hdfs文件。所有spark的工作都是发生在RDD上,比如创建新的RDD,转换已有的RDD,对现有的RDD计算求得结果。RDD在spark中是不可变的(immutable)对象集合,RDD可以被划分成多个分区,存放在不同的节点。创建RDD有两种方法,
如何应对SparkSQL DataFrame保存到hdfs时出现的过多小文件问题
原因就不解释了,总之是因为多线程并行往hdfs写造成的(因为每个DataFrame/RDD分成若干个Partition,这些partition可以被并行处理)。 其结果就是一个存下来的文件,其实是hdfs中一个目录,在这个目录下才是众多partition对应的文件,最坏的情况是出现好多size为0的文件。 如果确实想避免小文件,可以在save之前把DaraFrame的partition设为0:
将rdd存储到本地的一个文件中
spark中,将文件存储为单个文件
Spark1.4从HDFS读取文件运行Java语言WordCounts并将结果保存至HDFS
习惯印象笔记客户端记录的方便。想想这几天搭建Spark环境,分析spark运行过程,分析程序编写过程的迷茫,记录一篇从Hadoop2.4.0 HDFS系统中读取文件,并使用java语言编译运行的WordCount过程与大家分享吧。 本次实验相关信息如下: 操作系统:Ubuntu 14 Hadoop版本:2.4.0 Spark版本:1.4.0 运行前提是Hadoop和Spark均已正确安装
spark 加载多个目录; RDD输出到hdfs文件压缩
(1)  spark textFile加载多个目录:   其实很简单,将多个目录(对应多个字符串),用,作为分隔符连接起来    val inputPath = List("hdfs://localhost:9000/test/hiveTest", "hdfs://localhost:9000/test/hiveTest2")                     .mkString(",
11.6 hdfs读取json数据,转成DataFrame保存到hdfs
读取json或者parquet文件创建一个DataFrame DataFrame存储到某一个路径下,默认存储格式是parquet SaveMode.Overwrite:重写 SparkConf conf = new SparkConf() .setAppName("SaveModeTest") .setMaster("local"); JavaSparkContext sc = new...
如何应对SparkSQL DataFrame保存到hdfs时出现的过多小文件问题
原因就不解释了,总之是因为多线程并行往hdfs写造成的(因为每个DataFrame/RDD分成若干个Partition,这些partition可以被并行处理)。 其结果就是一个存下来的文件,其实是hdfs中一个目录,在这个目录下才是众多partition对应的文件,最坏的情况是出现好多size为0的文件。 如果确实想避免小文件,可以在save之前把DaraFrame的partition设为0:
java spark rdd 保存为一份文件
JavaRDD product = ....; product.repartition(1).saveAsTextFile(outPutPath1);
rdd和DF数据存入MYSQL
1.通过RDD函数批量存入数据 object RDDtoMysql { def myFun(iterator: Iterator[(String, Int)]): Unit = { var conn: Connection = null var ps: PreparedStatement = null val sql = "insert into sparktomys
spark根据key输出到多个目录
项目中需要将spark的输出按id输出到不同的目录中,即实现在spark中的多路输出。我们可以调用saveAsHadoopFile函数并自定义一个OutputFormat类,就可以达到上述目的。import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.ap
Spark SQL基础学习【三】以json的方式存储
我们可以把查询的结果以json方式存储
将java RDD结果写入Hive表中
情况一:只需插入一列 JavaRDD titleParticiple = ....; /**  * 将分词结果保存到Hive表,供数据探查使用  * */    HiveContext hiveCtx = new HiveContext(jsc);    SQLContext sqlCtx = new SQLContext(jsc); /**    
spark RDD中的元组如何按照指定格式保存到HDFS上?
请教一个问题:spark数据清洗的结果为RDD[(String, String)]类型的rdd,在这个RDD中,每一个元素都是rnrn一个元组。元组的key值是文件名,value值是文件内容,我现在想把整个RDD保存在HDFS上,让RDD中的每一rnrn个元素保存为一个文件,其中key值作为文件名,而value值作为文件内容。rnrn应该如何实现呢?rnrnRDD好像不支持遍历,只能通过collect()方法保存为一个数组,再进行遍历,但是这样可能会把内存撑爆,rnrn目前的做法是先把RDD通过saveAsTextFile方法保存在HDFS上,然后再使用FSDataInputStream输入流对保存rnrn后的part文件进行遍历读取,使用输出流写到HDFS上,这样很耗时。rnrn请问有没有好一点的方法,可以直接把RDD的内容写到HDFS上呢?
RDD 存储方式
RDD 存储类型      RDD可以设置不同类型存储方式,只存硬盘、只存内存等。      Spark的持久化级别 持久化级别 含义解释 MEMORY_ONLY 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重
spark Streaming 直接消费Kafka数据,保存到 HDFS 实战编程实践
最近在学习spark streaming 相关知识,现在总结一下 主要代码如下 def createStreamingContext():StreamingContext ={ val sparkConf = new SparkConf().setAppName("myStreamingText").setMaster(ConfigInfo.MasterConfig) sparkCo...
spark1.6使用:读取本地外部数据,把RDD转化成DataFrame,保存为parquet格式,读取csv格式
一、先开启Hadoop和spark 略 二、启动spark-shell spark-shell --master local[2] --jars /usr/local/src/spark-1.6.1-bin-hadoop2.6/libext/com.mysql.jdbc.Driver.jar 1.读取spark目录下面的logs日志作为测试: val alllog=sc.textFile...
在使用Spark Streaming向HDFS中保存数据时,文件内容会被覆盖掉的解决方案
我的Spark Streaming代码如下所示: 全选复制放进笔记val lines=FlumeUtils.createStream(ssc,"hdp2.domain",22222,StorageLevel.MEMORY_AND_DISK_SER_2) val words = lines.filter(examtep(_)) words.foreachRDD(exam(_)) //some
Spark处理HDFS数据,并将结果存储在Hive中
接昨天未完待续,继续: 首先,我要完成功能是:将下面的电影的links数据,在Spark上处理,处理结果存入到Hive中 image.png 这个是最后成功的图 image 功能流程如下图: image.png 涉及的代码如下: object ETL { def main(args: Array[String]...
Spark核心编程:创建RDD(集合、本地文件、HDFS文件)
Spark核心编程:创建RDD(集合、本地文件、HDFS文件)
使用Spark分析日志,使用关联算法分析结果,保存到hdfs
object EsPfgTest { //时间间隔,单位为分钟. val grapTime=5 //将时间间隔转化为毫秒 val grapTimeToMill=(grapTime*60000).toString.toLong //一整天时间换算成毫秒 val grapTimeAllDayToMill=(24*60*60*1000).toString.toLong def
第96讲 通过Spark Streaming的foreachRDD把处理后的数据写入外部存储系统(数据库)中
1:写到外部数据源,表面上看是spark去写,实际上就是jvm去操作。jvm写数据库,spark streaming就可以写数据库。jvm如果能写到Hbase或者Redius中,Spark也能。 2:spark streaming中我们使用Dstream.foreachRDD(),来把Dstream中的数据发送到外部的文件系统中,外部文件系统主要是数据库,Hbase,Redius,数据库比较少量
使用SparkSQL分析图书信息
(1)创建RDD (2)将RDD转为DataFrame (3)调用registerTempTable,注册为表,表名为:tb_book (4)使用使用sql语句查询前15条 (5)模糊查询书名包含“微积分”的书 (6)输出图书的前10行的name和price字段信息 (7)统计书名包含“微积分”的书的数量 (8)查询评分大于9的图书,,且只展示前10条 (9)计算所有书名包含“微积分”的评分平均值 (10)把书目按照评分从高到低进行排列,且只展示前15条 (11)把图书按照出版社进行分组,统计出不同出版社图书的总数 (12)将书名包含“微积分”的书记录保存到本地或HDFS上,且保存的格式为csv,文件名为:学号.csv (13)然后再从该csv文件加载,创建DataFrame,并查询和显示
RDD会全部放到内存里吗?
不会 spark是基于内存计算的,但是不会将数据全都加载进内存 RDD包含内存数据和磁盘数据
spark不能在遍历rdd过程中修改全局map
误区一:不能在遍历rdd过程中修改全局map 这个是mutable包下map,是可修改的 将foreach改成map也不行 解决方法: 先将rdd转成map,然后修改map,最后再转成rdd保存出来 这样可能无法实现分布式,在master机器上完成计算 本次需求背景: Spark代码输出的rdd (pkg1, 1) (pkg2, 1) 但我想要的输出: (pkg1, ...
spark写数据到ES
  1、查询ES中ID为1的记录   2、重新写入一条ID为1的数据 import org.apache.spark.{SparkConf, SparkContext}import org.elasticsearch.spark.rdd.EsSparkobject WriteES {   def main(args: Array[String]) {     val conf = ne...
sparksql读取hive中的数据保存到hdfs中
package wondersgroup_0905_Test import org.apache.spark.sql.SparkSession object sparkHive { def main(args: Array[String]): Unit = { //数据库名称 val database ="hivetest" //表名称 val table...
将应用程序提交到spark环境并将结果输出到hdfs
我们在“hadoop学习1--hadoop2.7.3集群环境搭建”  “spark学习1--centOS7.2下基于hadoop2.7.3的spark2.0集群环境搭建”  中已经将hadoop、spark的集群环境都搭建起来了,jdk用的是1.7版本的。        1.启动hadoop集群    centOS7服务器3台    master    192.16
spark RDD系列------2.HadoopRDD分区的创建以及计算
Spark经常需要从hdfs读取文件生成RDD,然后进行计算分析。这种从hdfs读取文件生成的RDD就是HadoopRDD。那么HadoopRDD的分区是怎么计算出来的?如果从hdfs读取的文件非常大,如何高效的从hdfs加载文件生成HadoopRDD呢?本篇文章探讨这两个问题。     SparkContext.objectFile方法经常用于从hdfs加载文件,从加载hdfs文件到生成Had
Spark从HDFS读取数据并转存MySQL
Spark从HDFS读入数据,简单处理并存入MySQLimport org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * Created by admin_ on 27/03/2018. * 1. 创建sparkSession * 2. 获取sparkContext * 3. 读取HDFS数据文件 * 4....
使用pyspark解析json文件,并将统计结果写入InfluxDB中
使用pyspark解析json文件,并将统计结果写入InfluxDB中
Spark--数据读取与保存
1、动机 有时候数据量会大到本机可能无法存储,这时就需要探索别的读取和保存方法了。 Spark支持很多种输入源和输出源。一部分原因是Spark本身是基于Hadoop生态圈二构建的,so spark可以通过Hadoop MapReduce 所使用的InputFormat 和 OutPutFormat 接口访问数据,而大部分常见的文件格式与存储系统(S3,HDFS,Cassandra,HBase等)
通过Spark Streaming的foreachRDD把处理后的数据写入外部存储系统中
本博文主要内容包括: 技术实现foreachRDD与foreachPartition解析 foreachRDD与foreachPartition实现实战 一:技术实现foreach解析:1、首先我们看一下Output Operations on DStreams提供的API: SparkStreaming的DStream提供了一个dstream.foreachRDD方法,该方法是一个功能强大的
Spark-sql结果保存指定位置
//1.读取数据,将每一行的数据使用列分隔符分割 val lineRDD = sc.textFile("hdfs://node1.itcast.cn:9000/person.txt", 1).map(_.split(" ")) //2.定义case class(相当于表的schema) case class Person(id:Int, name:String, age:Int)
Spark:用saveAsTable保存为hive默认纯文本文件
spark中Dataset的的saveAsTable方法可以把数据持久化到hive中,其默认是用parquet格式保存数据文件的,若是想让其保存为其他格式,可以用format方法配置。 如若想保存的数据文件格式为hive默认的纯文本文件: df.write.mode(SaveMode.Append).format("hive").saveAsTable("test") format支持的格式有:...
Spark实战(一):spark读取本地文件输出到Elasticsearch
对于spark的典型应用场景为批处理,一般由基本数据源(文件系统如:hdfs)或者高级数据源(flume、kafka)作为spark的数据接入端。输出一样可以是文件系统或数据库等等。本文介绍一个用java写的demo程序,功能是从本地接收数据,经过spark处理之后输出到Elasticsearch。 先上代码: maven <dependency> <groupId...
文章热词 机器学习教程 Objective-C培训 交互设计视频教程 颜色模型 设计制作学习
相关热词 mysql关联查询两次本表 native底部 react extjs glyph 图标 如何制作网页上收费视频 python元组菜鸟教程