dongshu_cheng 2023-01-31 14:43 采纳率: 100%
浏览 17
已结题

spark structured steaming与kafka集成offset管理方式

spark structured steaming与kafka集成,管理offset的方式有哪几种呢?目的是从故障中恢复流应用程序,比如重启consumer。
我知道的是:1. 利用spark checkpoint 文件,2. 提交offset给kafka。
还有别的方法吗? 哪种是常用的方案呢?

期待回复,非常感谢!!

  • 写回答

2条回答 默认 最新

  • m0_54204465 2023-01-31 15:26
    关注

    Spark Structured Streaming 集成 Kafka 的 offset 管理方式有如下几种:

    利用 Spark checkpoint 文件
    提交 offset 给 Kafka
    常用的方案是使用 Spark checkpoint,因为它可以提供更好的故障恢复性和高可用性,同时也不依赖于 Kafka 的 offset 管理。但是,提交 offset 给 Kafka 也是一个不错的选择,特别是在简单的场景中。在Spark Structured Streaming与Kafka集成时,这两种方法是比较常见的offset管理方法。有时也会通过将offset存储在外部数据存储中(如HDFS,RDBMS)来实现offset管理,以将offset存储在MySQL数据库为例:

    # define offsetRanges variable
    val offsetRanges = Array(
      // topic, partition, inclusive starting offset, exclusive ending offset
      TopicAndPartition("topicA", 0, 0, 100),
      TopicAndPartition("topicA", 1, 0, 100),
      TopicAndPartition("topicB", 0, 0, 100),
      TopicAndPartition("topicB", 1, 0, 100)
    )
    
    # Update offsets in database
    offsetRanges.foreach { case (topicAndPartition, offset) =>
      val sql = s"INSERT INTO offsets (topic, partition, offset) VALUES ('${topicAndPartition.topic}', ${topicAndPartition.partition}, $offset) ON DUPLICATE KEY UPDATE offset = $offset"
      conn.createStatement().execute(sql)
    }
    
    # Fetch current offsets from database
    val currentOffsets = offsetRanges.map { topicAndPartition =>
      val sql = s"SELECT offset FROM offsets WHERE topic = '${topicAndPartition.topic}' AND partition = ${topicAndPartition.partition}"
      val resultSet = conn.createStatement().executeQuery(sql)
      resultSet.next()
      val offset = resultSet.getLong("offset")
      topicAndPartition -> offset
    }
    
    

    但这不太常见。
    下面是一个使用 Apache Spark Structured Streaming 集成 Apache Kafka 的简单例子:

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "topic1")
      .load()
    
    val query = df
      .writeStream
      .format("console")
      .outputMode("append")
      .start()
    
    

    在这个例子中,Spark 从 Kafka 的 topic1 读取数据并写入到控制台。对于 offset 管理,默认情况下 Spark 使用 checkpoint 文件进行管理。可以使用 .option("checkpointLocation", "path/to/checkpoint/dir") 来设置 checkpoint 位置。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 2月9日
  • 已采纳回答 2月1日
  • 创建了问题 1月31日

悬赏问题

  • ¥15 任务A:大数据平台搭建(容器环境)怎么做呢?
  • ¥15 r语言神经网络自变量重要性分析
  • ¥15 基于双目测规则物体尺寸
  • ¥15 wegame打不开英雄联盟
  • ¥15 公司的电脑,win10系统自带远程协助,访问家里个人电脑,提示出现内部错误,各种常规的设置都已经尝试,感觉公司对此功能进行了限制(我们是集团公司)
  • ¥15 救!ENVI5.6深度学习初始化模型报错怎么办?
  • ¥30 eclipse开启服务后,网页无法打开
  • ¥30 雷达辐射源信号参考模型
  • ¥15 html+css+js如何实现这样子的效果?
  • ¥15 STM32单片机自主设计