头顶榴莲树 2020-06-24 21:19 采纳率: 0%
浏览 230

如何在StructuredStreaming中使用groupBy、groupByKey等算子时降低Task数量(调优)

我在本地IDEA上跑spark程序,每当调用groupBy、groupByKey等算子时就非常耗时
,从WebUI上看涉及的stage有200多个任务,但我就用了一条数据,并发度也不高,代码如下所示:
//接入
val source = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets","latest")
.load()

//处理
import spark.implicits._
val process = source
  .select("value")
  .as[String]
  .map(e => e.split(" "))
  .map(e =>(e(0),e(1)))
  .selectExpr("CAST(_1 AS STRING) as key",
        "CAST(_2 AS DOUBLE) as value")
  .as[(String,Double)]
  .groupBy("key")//这个算子特别耗时
  .mean("value")

//输出
val query = process.writeStream
    .format("console")
    .outputMode("complete")
    .start()
query.awaitTermination()

    下面时WebUI上的Task监控

    ![图片说明](https://img-ask.csdn.net/upload/202006/24/1593004600_193955.jpg)

     每条消息要处理一分多钟,我该如何优化?
  • 写回答

1条回答 默认 最新

  • dabocaiqq 2020-08-14 15:48
    关注
    评论

报告相同问题?

悬赏问题

  • ¥65 永磁型步进电机PID算法
  • ¥15 sqlite 附加(attach database)加密数据库时,返回26是什么原因呢?
  • ¥88 找成都本地经验丰富懂小程序开发的技术大咖
  • ¥15 如何处理复杂数据表格的除法运算
  • ¥15 如何用stc8h1k08的片子做485数据透传的功能?(关键词-串口)
  • ¥15 有兄弟姐妹会用word插图功能制作类似citespace的图片吗?
  • ¥15 latex怎么处理论文引理引用参考文献
  • ¥15 请教:如何用postman调用本地虚拟机区块链接上的合约?
  • ¥15 为什么使用javacv转封装rtsp为rtmp时出现如下问题:[h264 @ 000000004faf7500]no frame?
  • ¥15 乘性高斯噪声在深度学习网络中的应用