某高校为大数据相关专业的学生开设了多门课程,为了能够实时地查找出目标课程,需要在IntelliJIDEA 中使用Spark Streaming 编程实现从一台服务器的8888端口上接收课程数据,课程数据需手动在服务器的8888端口输入,输入的课程数据如表6-7所示,每一条数据有2个字段,分别表示课程编号和课程名称,以空格分隔。现目标课程是“Hadoop”和“Spark”, 需要查询两门课程及对应课程编号。
表6-7 输入的课程数据
121 Hadoop
123 Java
069 HBase
223 Spark
078 Hive
3.实现思路及步骤
(1) 在 IntelliJ IDEA 中配置好 Spark Streaming 开发环境
(2) 启动 IntelliJ IDEA, 并进行 Spark Streaming 编程。
(3)在一台服务器(master节点) 中查看是否安装了 nc软件, 若没有安装 nc软件,则先安装 nc软件。
(4) 在 master节点上用 nc启动8888端口。
(5)在 IntelliJ IDEA 中使用socketTextStream 监听8888端口, 获取数据。
(6)使用map()方法将每一条数据以空格分割,并转化成“(课程名称,课程编号)”的形式。
(7)创建数组, 将要查找的“Hadoop”和“Spark”两门课程标记为 true, 形式如“("Hadoop", true)”, 并使用 parallelize 把数组转化成RDD。
(8)使用leftOuterJoin()方法对步骤(6)得到的 RDD 数据与步骤(7)得到的 RDD数据进行左外连接,最终形成“(课程名称,(课程编号,true))”形式的数据。
(9) 使用getOrElse 函数判断数据是否含有“true”字段, 并使用filter 把含有“true”的数据筛选出来。
求解答!可以把我看看吗
- 写回答
- 好问题 0 提建议
- 追加酬金
- 关注问题
- 邀请回答
-
4条回答 默认 最新
关注 引自免费微信小程序:皆我百晓生
首先,你需要确保已经在IntelliJ IDEA中配置好了Apache Spark的开发环境。如果你还没有设置,可以参考官方文档配置IntelliJ IDEA。接下来,按照以下步骤编写Spark Streaming程序来解决这个问题:
- 打开IntelliJ IDEA并创建一个新的Scala项目。
- 添加Spark依赖到项目的
build.sbt
文件中(如果你使用的是SBT构建系统):
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.3.0" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.3.0"
或者如果你使用Maven,添加到
pom.xml
文件中:<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.3.0</version> </dependency> </dependencies>
- 创建一个新的Scala源码文件,例如
CourseFinder.scala
,然后开始编写代码:
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ object CourseFinder { def main(args: Array[String]): Unit = { // 设置Spark配置 val conf = new SparkConf().setAppName("CourseFinder").setMaster("local[*]") val ssc = new StreamingContext(conf, Milliseconds(1000)) // 安装nc软件并启动监听8888端口(这一步应该在服务器上完成) // 在master节点运行:sudo apt-get install netcat || yum install ncat // 在master节点运行:nc -lk 8888 // 创建socketTextStream监听8888端口 val lines = ssc.socketTextStream("localhost", 8888) // 解析每一行数据,转换为(课程名称, 课程编号)格式 val courses = lines.map(line => line.split(" ").map(_.trim)).map(p => (p(1), p(0))) // 创建包含待查找课程及其状态的RDD val searchCourses = Array(("Hadoop", true), ("Spark", true)).toDF("courseName", "found") // 左外连接查找课程 val joined = courses.leftOuterJoin(searchCourses).select($"courseName", $"found".as("status")) // 过滤出找到的课程 val filtered = joined.filter($"status".isin(true)) // 输出结果 filtered.print() // 启动StreamingContext ssc.start() ssc.awaitTermination() } }
-
在IntelliJ IDEA中编译并运行这个程序。注意,这个程序会持续运行直到你手动停止它。当在服务器的8888端口输入课程数据时,程序会过滤出包含"Hadoop"和"Spark"的课程。
-
确保你已经在服务器上正确安装了nc软件并启动了8888端口的监听服务。然后可以在命令行中向服务器发送课程数据,例如:
echo "121 Hadoop" | nc localhost 8888 echo "123 Java" | nc localhost 8888 echo "069 HBase" | nc localhost 8888 echo "223 Spark" | nc localhost 8888 echo "078 Hive" | nc localhost 8888
- 结果会在IntelliJ IDEA的控制台输出,显示找到的课程名称及其状态。
注意:这个示例代码假设你已经有一个正在运行的Spark集群。如果是在本地运行,只需将
conf.setMaster("local[*]")
保持不变。如果你在远程集群上运行,需要将conf.setMaster("local[*]")
替换为集群的实际地址,例如conf.setMaster("spark://your-master-node:7077")
。解决 无用评论 打赏 举报 编辑记录
悬赏问题
- ¥15 35114 SVAC视频验签的问题
- ¥15 impedancepy
- ¥15 在虚拟机环境下完成以下,要求截图!
- ¥15 求往届大挑得奖作品(ppt…)
- ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
- ¥50 浦育平台scratch图形化编程
- ¥20 求这个的原理图 只要原理图
- ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
- ¥20 微信的店铺小程序如何修改背景图
- ¥15 UE5.1局部变量对蓝图不可见