捡垃圾的ITboy 2025-06-04 12:20 采纳率: 0%
浏览 16

关于#debezium#的问题,如何解决?(标签-数据库|关键词-connect)

使用debezium连接mysql innodb cluster数据库集群和kafka时候出现了问题
连接使用的是kafka connect
kafka集群是控制器节点和broker节点分离的方式部署的
broker配置如下

node.id=4
process.roles=broker
controller.quorum.voters=1@192.168.3.131:9093,2@192.168.3.132:9093,3@192.168.3.133:9093

listeners=BROKER://:9092
advertised.listeners=BROKER://192.168.3.131:9092

listener.security.protocol.map=BROKER:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
inter.broker.listener.name=BROKER
controller.listener.names=CONTROLLER

sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.mechanism.controller.protocol=PLAIN

super.users=User:admin
allow.everyone.if.no.acl.found=true
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

log.dirs=/opt/kafka/kafkab/kafka_2.13-4.0.0/data


采用的是scram的形式,具体的账密在sasl文件里

debezium的json配置如下

{
  "name": "mysql-connector-master-slave",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": 1,
    "topic.prefix": "mydb-",
    "database.user": "debezium",
    "database.password": "FX1zcqlifqnm",
    "database.server.id": "1",
    "database.hostname": "192.168.3.101",
    "database.port": "6446",
    "database.server.name": "myCluster",
    "database.history.storage.mode": "Kafka",
    "schema.history.internal.kafka.bootstrap.servers":"192.168.3.131:9092,192.168.3.132:9092,192.168.3.133:9092",
    "schema.history.internal.kafka.topic": "schema-changes.myCluster",
    "database.history.kafka.bootstrap.servers": "192.168.3.131:9092,192.168.3.132:9092,192.168.3.133:9092",
    "database.history.kafka.topic": "schema-changes.myCluster",
    "database.history.skip.unparseable.ddl": "true",
    "database.history.kafka.security.protocol": "SASL_PLAINTEXT",
    "database.history.kafka.sasl.mechanism": "SCRAM-SHA-512",
    "database.history.kafka.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"cgp45sEdqj4iIT1g\";",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "(.*)\\.(.*)",
    "transforms.route.replacement": "${2}.${1}",

    "offset.storage.file.filename": "/tmp/offsets-master-slave.dat",
    "offset.flush.interval.ms": 60000,
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "database.replicaHost": "192.168.3.101",
    "database.replicaPort": "6447",
    "database.replicaUser": "debezium",
    "database.replicaPassword": "FX1zcqlifqnm",
    "status.monitoring.enabled": "true",
    "health.check.interval.ms": 30000,
    "failover.timeout": 5000,
    "failover.retries": 3
  }
}

但开启连接器后一直报错超时
具体的broker的server日志如下
org.apache.kafka.common.errors.InvalidRequestException: Unexpected Kafka request of type METADATA during SASL handshake.
[2025-06-04 04:04:17,760] WARN [SocketServer listenerType=BROKER, nodeId=4] Unexpected error from /192.168.3.131 (channelId=192.168.3.131:9092-192.168.3.131:38734-2-33); closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.errors.InvalidRequestException: Unexpected Kafka request of type METADATA during SASL handshake.
[2025-06-04 04:04:19,134] WARN [SocketServer listenerType=BROKER, nodeId=4] Unexpected error from /192.168.3.131 (channelId=192.168.3.131:9092-192.168.3.131:38744-0-34); closing connection (org.apache.kafka.common.network.Selector)

请问到底该怎么办呀,琢磨了好几天了

  • 写回答

6条回答 默认 最新

  • 一杯年华@编程空间 2025-06-04 15:12
    关注

    一杯年华@编程空间帮您解答,本答案结合 ChatGPT4.0和DeepSeek整理

    我曾经遇到过类似的问题,当时也是使用Debezium连接MySQL集群和Kafka时出现了超时报错,结合你提供的日志和配置信息来看,核心问题可能出在Kafka的SASL认证配置与Debezium连接器的交互上。以下是具体分析和解决方案:

    问题分析

    从Broker日志中的 InvalidRequestException: Unexpected Kafka request of type METADATA during SASL handshake 可以看出,在SASL认证握手阶段,连接器提前发送了METADATA请求,这通常是因为Kafka的监听器配置、认证机制或连接器的JAAS配置不匹配导致的。可能的原因包括:

    1. 监听器名称与认证协议不匹配:Kafka的inter.broker.listener.namecontroller.listener.names配置可能未正确关联到SASL协议。
    2. JAAS配置缺失或错误:Debezium中用于Kafka认证的sasl.jaas.config可能未正确指定监听器名称,或账号权限不足。
    3. 连接器与Kafka版本兼容性问题:Debezium版本与Kafka的SASL机制不兼容(如SCRAM-SHA-512的支持问题)。

    解决方案

    方案一:修正Kafka监听器与认证配置的关联

    Kafka配置中需要明确指定不同监听器对应的安全协议,确保连接器使用的监听器(如BROKER)与认证机制匹配。

    操作步骤

    1. 检查Kafka的listener.security.protocol.map配置:确保BROKER监听器对应的协议为SASL_PLAINTEXT(如你当前配置),但需确认inter.broker.listener.name是否与连接器实际使用的监听器一致。
    2. 在Debezium配置中指定监听器名称:Kafka的SASL认证需要明确监听器名称,否则可能默认使用CONTROLLER监听器(不对外开放)。
      在连接器的database.history.kafka.sasl.jaas.config中添加serverName参数,指定使用BROKER监听器:
      database.history.kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required 
      username="admin" 
      password="cgp45sEdqj4iIT1g" 
      serverName="BROKER";
      
      (注意:若Kafka的advertised.listeners使用的是BROKER监听器,此处serverName需与之一致。)

    代码片段

    {
      "config": {
        // ... 其他配置
        "database.history.kafka.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"cgp45sEdqj4iIT1g\" serverName=\"BROKER\";",
        // 新增:指定Kafka连接使用的监听器名称(可选,根据实际情况调整)
        "schema.history.internal.kafka.listener.name": "BROKER",
        "database.history.kafka.listener.name": "BROKER"
      }
    }
    

    方案二:简化认证配置,使用统一的SASL机制

    若Kafka的控制器和Broker节点均使用SCRAM-SHA-512认证(当前配置中controller使用PLAIN可能导致不一致),可统一认证机制以避免握手冲突。

    操作步骤

    1. 修改Kafka的controller认证机制:将controllersasl.mechanism.controller.protocolPLAIN改为SCRAM-SHA-512,确保与BROKER一致。
      sasl.mechanism.controller.protocol=SCRAM-SHA-512
      
    2. 检查Debezium的认证配置是否匹配:确保连接器的sasl.mechanism与Kafka集群统一为SCRAM-SHA-512,并移除可能冲突的配置(如super.users的配置是否允许admin用户访问)。

    代码片段

    # Kafka broker配置修改(示例)
    sasl.mechanism.controller.protocol=SCRAM-SHA-512  # 原为PLAIN
    

    最优方案推荐:方案一(修正监听器与认证配置)

    原因

    • 你的Kafka集群采用控制器与Broker分离部署,CONTROLLER监听器仅用于内部控制器通信,而连接器应通过BROKER监听器(对外服务)连接。
    • 通过在JAAS配置中指定serverName="BROKER",明确告知连接器使用正确的监听器进行认证,可直接解决“认证阶段发送METADATA请求”的冲突问题。

    验证步骤

    1. 重启Kafka Broker节点使配置生效。
    2. 在Debezium连接器配置中添加serverName参数后重新启动连接器。
    3. 观察Broker日志,若不再出现InvalidRequestException,且连接器成功建立连接,则问题解决。

    希望以上方案能帮你解决问题!如果仍有报错,可以提供更详细的连接器日志或Kafka认证日志,我会进一步分析。请楼主采纳,如有问题请继续留言!

    评论

报告相同问题?

问题事件

  • 修改了问题 6月4日
  • 创建了问题 6月4日