如何过滤 spark 链接 mongodb 时 加载数据量 20C

目前使用 的是mongodb官方提供的一个链接spark的jar包,代码如下
SparkConf conf = new SparkConf().set(...)//设置初始化属性
JavaSparkContext jsc = new JavaSparkContext(conf);
//下面这步加载数据
Dataset explicitDF = MongoSpark.load(jsc).toDF();
由于是这样直接加载的,根本 没地方在加载之前写过滤条件来控制加载的数据量的,所以导致直接加载了整表的数据,十分缓慢,各位有 什么方法或者建议提供参考下么,谢谢啦

1

2个回答

val spark = SparkSession.builder

.appName(this.getClass.getName().stripSuffix("$"))

.getOrCreate()

val inputUri="mongodb://test:pwd123456@192.168.0.1:27017/test.articles"

val df = spark.read.format("com.mongodb.spark.sql").options(

Map("spark.mongodb.input.uri" -> inputUri,

"spark.mongodb.input.partitioner" -> "MongoPaginateBySizePartitioner",

"spark.mongodb.input.partitionerOptions.partitionKey" -> "_id",

"spark.mongodb.input.partitionerOptions.partitionSizeMB"-> "32"))

.load()

val currentTimestamp = System.currentTimeMillis()

val originDf = df.filter(df("updateTime") < currentTimestamp && df("updateTime") >= currentTimestamp - 1440 * 60 * 1000)

.select("_id", "content", "imgTotalCount").toDF("id", "content", "imgnum")

0

可以使用mongoexport导出需要的信息到一个文件中,然后再用spark解析文件,速度也要快不少。

0
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
spark读取mongodb数据
val spark = SparkSession.builder .appName(this.getClass.getName().stripSuffix("$")) .getOrCreate() val df = spark.read.format("com.mongodb.spark.sql").options( Map
spark 连接 mongodb 使用例子
这个主要是spark 用JAVA语言连接mysql , mongodb 数据库的 CRUD 例子;附件有运行使用的截图,运行入口在Test目录的测试用例;仅供参考!
Spark连接MongoDB使用教程
一、前期准备 源自MongoDB官方文档,https://docs.mongodb.com/spark-connector/v1.1/getting-started/  二、编程实现 1. maven工程添加依赖 maven中央仓库搜索:http://mvnrepository.com/artifact/org.mongodb.spark/mongo-spark-connector  ...
Spark从Mysql中根据条件查询数据并写入到Mongodb
Spark支持的数据源非常之多,例如textfile,Hive,jdbc,sequence file等等,这里我想展示的是一个从Mysql中读取数据经过处理分析后存入Mongodb的例子。语言自然是Scala,下面直接放上代码: package com.test import com.mongodb.{MongoClient, MongoClientURI} import org.apach...
【Spark五十】Spark读写MongoDB
    1. Spark写入MongoDB的实例代码 如下代码实现将RDD写入到MongoDB的spark数据库的oc集合(Collection)中    package spark.examples.db import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkContext, Sp...
MongoDB大数据处理
MongoDB大数据处理权威指南 MongoDB大数据处理权威指南 MongoDB大数据处理权威指南
Spark MongoDB查询遇到的withPipeline类型不匹配问题
这两天在用Spark查询MongoDB数据。之前别人做了一些功能所以代码拿过来修改下直接用。但是发现个问题,源程序只是获得数据做Map操作,而我要做SQL查询。官方文档的示范程序要用rdd.withPipeline(Seq(matchQuery,projection1))实现查询条件的设置。但是我总是报错。 查了下以为文档有问题,或者是版本不兼容...
MongoDB在58同城百亿量级数据下的应用实践
58同城作为中国最大的生活服务平台,涵盖了房产、招聘、二手、二手车、黄页等核心业务。58同城发展之初,大规模使用关系型数据库(SQL Server、MySQL等),随着业务扩展速度增加,数据量和并发量演变的越来越有挑战,此阶段58的数据存储架构也需要相应的调整以更好的满足业务快速发展的需求。 MongoDB经过几个版本的迭代,到2.0.0以后,变的越来越稳定,它具备的高性能、高扩展性、Auto-
MongoDB on SparkSql的读取和写入操作(Scala版本)
MongoDB on SparkSql的读取和写入操作(Scala版本) 1.1 添加依赖 需要添加一下依赖: &amp;lt;!-- spark 连接 mongo的连接器 --&amp;gt; &amp;lt;dependency&amp;gt; &amp;lt;groupId&amp;gt;org.mongodb.spark&amp;lt;/groupId&amp;gt; &amp;lt;artifactId&amp;gt;mongo-spark-conn...
MongoDB 用时间筛选_id字段
下面的代码来自stackoverflow,在MongoDB shell中运行> function objectIdWithTimestamp(timestamp) { ... // Convert string date to Date object (otherwise assume timestamp is a date) ... if (typeof(timestamp) =
Spark处理远程mongodb数据报错
报错:Failed with exception java.io.IOException:java.io.IOException: Unable to calculate input splits: not authorized on loan to execute command { splitVector: &quot;loan.operatorMongoModel&quot;, keyPattern: { _i...
spark+mongodb大数据框架搭建
spark+mongodb大数据框架搭建
spark与MogoDB不得不说的故事
一.背景 spark2.x Scala 2.11.x 截取pom.xml &amp;lt;dependencies&amp;gt; &amp;lt;dependency&amp;gt; &amp;lt;groupId&amp;gt;org.mongodb.spark&amp;lt;/groupId&amp;gt; &amp;lt;artifactId&amp;gt;mongo-spark-connector_2.11&amp;lt;/artifactId&amp;gt; &amp;lt;v...
spark+mongo Scala开发环境搭建
Scala开发mongo-spark应用,环境搭建。
Scala+Spark:对文件进行滤重
需求来源:有俩文件,里面存着很多公司信息,但是有重复的,由于数据量太大,因此,决定写个小脚本,用spark跑一下先看一下文件格式,origin为原始公司信息数据,spider为爬虫抓取的数据整体逻辑:文件求并集 ==&amp;gt; map 对公司名做hash用以做索引 ==&amp;gt; 根据公司名的hash值做groupby,取首位数据 ==&amp;gt; 保存文件1、主文件:2、工具包:processinfo:...
MongoDB + Spark: 完整的大数据解决方案
原文链接 Spark介绍 按照官方的定义,Spark 是一个通用,快速,适用于大规模数据的处理引擎。 通用性:我们可以使用Spark SQL来执行常规分析, Spark Streaming 来流数据处理, 以及用Mlib来执行机器学习等。Java,python,scala及R语言的支持也是其通用性的表现之一。快速: 这个可能是Spark成功的最初原因之一,主要归功于其基于
spark入门——读取gz文件并过滤(包括开发环境安装)
安装jdkjdk下载网址 根据需要选择操作系统以及位数,一般就安装在C盘。然后配置环境变量 CLASSPATH .;%JAVA_HOME%\lib;%JAVA_HOME%\lib\tools.jar 注意前面有个点 JAVA_HOME C:\Program Files\Java\jdk1.8.0_111 Path %JAVA_HOME%\bin;%JAVA_HOME%\jre\b
分布式爬虫:使用Scrapy抓取数据
Scrapy是Python开发的一个快速,高层次的屏幕抓取和web抓取框架,用于抓取web站点并从页面中提取结构化的数据。Scrapy用途广泛,可以用于数据挖掘、监测和自动化测试。 官方主页: http://www.scrapy.org/中文文档:Scrapy 0.22 文档GitHub项目主页:https://github.com/scrapy/scrapy Scrapy 使用了 Twi
spark从mongodb导入数据到hive
1、首先添加mongo-spark依赖,官网地址 https://docs.mongodb.com/spark-connector/ org.mongodb.spark mongo-spark-connector_2.10 1.1.0 2、直接上代码 object Mongo2Hive {
spark连接mongodb(权限认证)示例
SparkSession spark = SparkSession.builder() .appName("spot") .config("spark.mongodb.output.uri", "mongodb://spark:spark@" + mgohost + ":27017/admin") .config("spark.mongodb.output.database","demo")
mongodb中简单的根据时间过滤进行查询
目的:查询当某天的文档信息db.getCollection('test').find ({ &quot;name&quot; : { &quot;$regex&quot; : &quot;王&quot; }, &quot;is_history&quot; : 0, &quot;adddate&quot;:{&quot;$gte&quot;:ISODate(&quot;2018-01-02T00:00:00Z&quot;),&quot;$lte&quot;:ISODate(&quot;
如何对10亿数据量级的mongoDB作高效的全表扫描
本文链接: http://quentinXXZ.iteye.com/blog/2149440 一、正常情况下,不应该有这种需求 首先,大家应该有个概念,标题中的这个问题,在大多情况下是一个伪命题,不应该被提出来。要知道,对于一般较大数据量的数据库,全表查询,这种操作一般情况下是不应该出现的,在做正常查询的时候,如果是范围查询,你至少应该要加上limit。 说一下,我的应用场景:用
spark处理mongodb数据(python版)
mongodb是一种文档型数据库,作为一个适用于敏捷开发的数据库,mongodb的数据模式可以随着应用程序的发展而灵活地更新。但是mongodb适合一次查询的需求,对于统计、分析(尤其是在需要跨表、跨库的情况下)并不是太方便,我们可以用spark来处理mongodb数据。
mongodb 聚合查询每天统计
// SQL Here   db.getCollection('wechat_message').aggregate(       [              {   $project : { day : {$substr: ["$sendTime", 0, 10] }}},                   {   $group   : { _id : "$day",  number
Spark读取mongoDB数据写入Hive普通表和分区表
版本: spark 2.2.0 hive 1.1.0 scala 2.11.8 hadoop-2.6.0-cdh5.7.0 jdk 1.8 MongoDB 3.6.4 一 原始数据及Hive表 MongoDB数据格式 { &amp;amp;amp;quot;_id&amp;amp;amp;quot; : ObjectId(&amp;amp;amp;quot;5af65d86222b639e0c2212f3&amp;amp;amp;quot;), &amp;amp;amp
Scala连接mongodb数据库
mongodb是一种面向文档的数据库,Scala是一种函数式编程语言,由于项目的关系,需要使用Scala连接mongodb数据库。下面介绍具体的方法。 就像Java连接MySQL数据库需要JDBC一样,Scala连接mongodb数据库也需要一种中间件,这里使用casbah,这里给出了casbah的一些指导。 使用casbah需要下载casbah的jar包,由于已经在Intellij Idea
spark sql 处理mongodb 数据库中的数据
spark 关于处理mongodb、json数据的技术。
Spark通过JDBC加载部分数据、添加过滤条件
当我们需要使用SparkSQL通过JDBC方式连接MySQL、Oracle、Greenplum等来实现对数据的操作时,可能在某些情况下并不需要加载全量的数据表。例如: 只需要其中的部分字段 按照条件进行筛选后的数据 此时就需要在JDBC连接时对option(“dbtable”, tablename)属性值进行修改,参看spark官网给出的属性介绍:(spark2.3 jdbc-to-other...
MongoDB on SparkSql的读取和写入操作(Scala版本)之二
参考:https://blog.csdn.net/qq_33689414/article/details/83421766 MongoDB on SparkSql的读取和写入操作(Scala版本) 1.1 添加依赖 需要添加一下依赖: &amp;lt;!-- spark 连接 mongo的连接器 --&amp;gt; &amp;lt;dependency&amp;gt;     &amp;lt;groupId&amp;gt;org.mong...
mongodb密码特殊字符的解决方法
一般是这么连接的: mongoose.connect(&quot;mongodb://username:password@127.X.X.X:27017/db&quot;); 但是,如果你的密码里面设置了特殊字符,比如‘@’,‘%’,可能使得 mongodb 连接不能被正常解析,字符转义也没什么效果,从而导致连接失败。 有2个方法可以规避这个问题: 1.更换连接格式 mongoose.connect( &quot;mongo...
spark写入数据到mongodb
将HIVE库里的表数据通过saprk写入到mongodb库里的实现 1、到mongodb的官网上下载mongo-spark-connector_2.10-1.1.0 2、下载mongodb-driver-core-3.6.0.jar ,mongodb-driver-3.6.0.jar,bson-3.6.0.jar 三者的版本必须一致 使用scala连接mongodb,官网上有相关的文档说明
Scala远程连接MongoDB读取数据
    使用用户名和密码远程连接MongoDB数据库,用Java和Scala连接其实原理相同,都是JDBC,用MongoDB的连接驱动,只是语法上稍有区别而已,而在类、方法的调用上一模一样。      在此,分享一下Scala连接MongoDB查看数据的Code,语法结构上稍作修改就可以用Java实现。首先,下载连接驱动,添加到工程里,下载地址:mongo-java-driver-3.7.0.ja...
Spark大数据常见错误分享总结(来自苏宁)
Spark trouble shooting 经验分享 错误总结
目前Spark Application处理的数据量和性能
今年最值得开心的事情,就是Spark Application在客户局点跑的效果。虽然里面涉及的算法由于涉密所以不能透露,但是性能杠杠的还是值得高兴一下的。 每秒钟的数据量大概为40万~80万条。 实时Spark Application的性能(开5分钟的时间窗口): 5分钟内可以处理完,没有延迟和堆积。 离线Spark Application的性能(一天跑一次,一次处理前一天的数...
MongoDB数据量大于2亿后遇到的问题 及原因分析
MongoDB数据量大于2亿后遇到的问题 及原因分析 一、数据增长情况     每月增长量最大达到了1.9亿,每天增长约300W-500W     (增长数据具体可看页尾) 二、遇到的情况及解决方法     1.数据量过大,并且都集中在一个表,所以此表数据插入变慢。         表索引越多越明显,         优化处理方法:         
Spark读取HDFS数据分区参考
refer: https://www.jianshu.com/p/182901f03296 本文以读取 parquet 文件 / parquet hive table 为例: hive metastore 和 parquet 转化的方式通过 spark.sql.hive.convertMetastoreParquet 控制,默认为 true。 如果...
mongodb常用数据查询筛选命令行操作
下面是我整理了关于mongodb数据表命令行操作数据的插入、修改、删除、求和、最大值、最小值、平均值、排序、分片查询、in和or等用法的查询在查询指定数据库表的情况下我们可以使用 show collections查询全部数据可以使用find()方法查询查询数据库表中的数据条数,带查询条件统计数量:db.sidcust.count({"age":{$gte:23,$lte:28}}); 全部:db.
Spark---算子调优之filter过后使用coalesce减少分区数量
默认情况下,经过了这种filter之后,RDD中的每个partition的数据量,可能都不太一样了。(原本每个partition的数据量可能是差不多的) 问题: 1、每个partition数据量变少了,但是在后面进行处理的时候,还是要跟partition数量一样数量的task,来进行处理;有点浪费task计算资源。 2、每个partition的数据量不一样,会导致后面的每个tas
MongoDB+大数据,几近完美
Background I joined Couchbase in December of 2013. However, I've been passionate about big data technology for years. I started writing about NoSQL in September of 2009 (link), and I wrote about bi
MongoDB使用Java Driver访问数据库
 下面简要概述使用MongoDB的Java驱动程序访问数据库。 快速导览        使用MongoDB的Java驱动非常简单,只要确保将mongo.jar文件添加到classpath中即可。1).获取连接:        为了连接到MongoDB,需要知道要连接的数据库名称。如果库不存在,MongoDB将创建一个新的库。另外,在连接时需要指定服务器地址和端口。下面展示三种连接本地机器...
文章热词 机器学习教程 Objective-C培训 交互设计视频教程 颜色模型 设计制作学习
相关热词 mysql关联查询两次本表 native底部 react extjs glyph 图标 如何制作网页链接视频教程 如何制作网页链接视频