我在本地IDEA上跑spark程序,每当调用groupBy、groupByKey等算子时就非常耗时
,从WebUI上看涉及的stage有200多个任务,但我就用了一条数据,并发度也不高,代码如下所示:
//接入
val source = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets","latest")
.load()
//处理
import spark.implicits._
val process = source
.select("value")
.as[String]
.map(e => e.split(" "))
.map(e =>(e(0),e(1)))
.selectExpr("CAST(_1 AS STRING) as key",
"CAST(_2 AS DOUBLE) as value")
.as[(String,Double)]
.groupBy("key")//这个算子特别耗时
.mean("value")
//输出
val query = process.writeStream
.format("console")
.outputMode("complete")
.start()
query.awaitTermination()
下面时WebUI上的Task监控
![图片说明](https://img-ask.csdn.net/upload/202006/24/1593004600_193955.jpg)
每条消息要处理一分多钟,我该如何优化?