xhredeem 2024-04-18 00:13 采纳率: 13.3%
浏览 11
已结题

kafka topic 所有分副本数修改

背景:
有数十个topic
每个topic有几十个分区
因为历史原因只能 命令行修改分区副本数,不是修改分区数
查看网上的教程基本上是下面这种json格式修改
"version":1,
"partitions":[{"topic":"onetopic","partition":0,"replicas":[0,1]},
{"topic":"onetopic","partition":1,"replicas":[0,1]},
{"topic":"onetopic","partition":2,"replicas":[1,0]},
{"topic":"onetopic","partition":3,"replicas":[1,0]}]
这种情况下,一个topic就要写几十行,因为有几十个分区,甚至上百个分区,是不方便的,有没有快捷有效的办法

  • 写回答

27条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2024-04-18 00:13
    关注
    获得0.30元问题酬金

    引自免费微信小程序:皆我百晓生

    要批量修改Kafka topic的分区副本数,您可以编写一个脚本程序,根据现有的topic列表及其分区信息,动态生成所需的JSON配置文件。这里提供一种使用Python实现的解决方案,包括以下几个步骤:

    1. 获取现有topic列表及分区信息:使用kafka-topics.sh脚本来列出所有topic及其分区信息。

    2. 生成修改副本数的JSON配置:基于上述信息,编写Python脚本,动态创建符合要求的JSON配置文件。

    3. 执行副本数修改操作:使用kafka-reassign-partitions.sh脚本,结合生成的JSON配置文件,进行副本数的批量修改。

    以下是具体实现过程:

    步骤1:获取现有topic列表及分区信息

    运行以下命令,将输出保存到文件(例如:topics_partitions.txt)中:

    ./bin/kafka-topics.sh --bootstrap-server <your-bootstrap-server> --list | xargs -I{} ./bin/kafka-topics.sh --bootstrap-server <your-bootstrap-server> --describe {} > topics_partitions.txt
    

    步骤2:生成修改副本数的JSON配置

    使用以下Python脚本读取topics_partitions.txt,并生成JSON配置文件(例如:reassignment.json):

    import json
    
    # 需要修改的目标副本数
    target_replication_factor = 2
    
    with open('topics_partitions.txt', 'r') as f:
        content = f.readlines()
    
    reassignments = []
    for line in content:
        if 'PartitionCount' not in line and 'ReplicationFactor' not in line:
            topic, partition, _, replicas_str = line.strip().split()
            replicas = replicas_str.split(',')
            # 忽略已达到目标副本数的分区
            if len(replicas) == target_replication_factor:
                continue
    
            reassignments.append({
                "topic": topic,
                "partition": int(partition),
                "replicas": list(map(int, replicas)),
            })
    
    # 添加版本信息
    config = {
        "version": 1,
        "partitions": reassignments,
    }
    
    with open('reassignment.json', 'w') as f:
        json.dump(config, f, indent=4)
    

    步骤3:执行副本数修改操作

    使用以下命令执行分区副本数的批量修改:

    ./bin/kafka-reassign-partitions.sh --zookeeper <your-zookeeper> --reassignment-json-file reassignment.json --execute
    

    或者,如果您使用的是Kafka 2.6.0及以上版本且配置了advertised.listeners包含PLAINTEXT://SSL://,可以使用Bootstrap Server替换ZooKeeper:

    ./bin/kafka-reassign-partitions.sh --bootstrap-server <your-bootstrap-server> --reassignment-json-file reassignment.json --execute
    

    请确保在执行前备份您的集群状态,并在生产环境中谨慎操作。同时,根据实际情况调整上述代码中的路径、服务器地址以及目标副本数(target_replication_factor)。

    评论 编辑记录

报告相同问题?

问题事件

  • 系统已结题 4月26日
  • 创建了问题 4月18日

悬赏问题

  • ¥15 (标签-UDP|关键词-client)
  • ¥15 关于库卡officelite无法与虚拟机通讯的问题
  • ¥15 qgcomp混合物线性模型分析的代码出现错误:Model aliasing occurred
  • ¥100 已有python代码,要求做成可执行程序,程序设计内容不多
  • ¥15 目标检测项目无法读取视频
  • ¥15 GEO datasets中基因芯片数据仅仅提供了normalized signal如何进行差异分析
  • ¥100 求采集电商背景音乐的方法
  • ¥15 数学建模竞赛求指导帮助
  • ¥15 STM32控制MAX7219问题求解答
  • ¥20 在本地部署CHATRWKV时遇到了AttributeError: 'str' object has no attribute 'requires_grad'