捡垃圾的ITboy 2025-06-04 15:28 采纳率: 0%
浏览 25

debezium安装遇到的问题

使用debezium连接mysql innodb cluster数据库集群和kafka时候出现了问题
连接使用的是kafka connect
kafka集群是控制器节点和broker节点分离的方式部署的
broker配置如下

node.id=4
process.roles=broker
controller.quorum.voters=1@192.168.3.131:9093,2@192.168.3.132:9093,3@192.168.3.133:9093

listeners=BROKER://:9092
advertised.listeners=BROKER://192.168.3.131:9092

listener.security.protocol.map=BROKER:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
inter.broker.listener.name=BROKER
controller.listener.names=CONTROLLER

sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.mechanism.controller.protocol=PLAIN

super.users=User:admin
allow.everyone.if.no.acl.found=true
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

log.dirs=/opt/kafka/kafkab/kafka_2.13-4.0.0/data


采用的是scram的形式,具体的账密在sasl文件里

debezium的json配置如下

{
  "name": "mysql-connector-master-slave",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": 1,
    "topic.prefix": "mydb-",
    "database.user": "debezium",
    "database.password": "FX1zcqlifqnm",
    "database.server.id": "1",
    "database.hostname": "192.168.3.101",
    "database.port": "6446",
    "database.server.name": "myCluster",
    "database.history.storage.mode": "Kafka",
    "schema.history.internal.kafka.bootstrap.servers":"192.168.3.131:9092,192.168.3.132:9092,192.168.3.133:9092",
    "schema.history.internal.kafka.topic": "schema-changes.myCluster",
    "database.history.kafka.bootstrap.servers": "192.168.3.131:9092,192.168.3.132:9092,192.168.3.133:9092",
    "database.history.kafka.topic": "schema-changes.myCluster",
    "database.history.skip.unparseable.ddl": "true",
    "database.history.kafka.security.protocol": "SASL_PLAINTEXT",
    "database.history.kafka.sasl.mechanism": "SCRAM-SHA-512",
    "database.history.kafka.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"cgp45sEdqj4iIT1g\";",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "(.*)\\.(.*)",
    "transforms.route.replacement": "${2}.${1}",

    "offset.storage.file.filename": "/tmp/offsets-master-slave.dat",
    "offset.flush.interval.ms": 60000,
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "database.replicaHost": "192.168.3.101",
    "database.replicaPort": "6447",
    "database.replicaUser": "debezium",
    "database.replicaPassword": "FX1zcqlifqnm",
    "status.monitoring.enabled": "true",
    "health.check.interval.ms": 30000,
    "failover.timeout": 5000,
    "failover.retries": 3
  }
}

但开启连接器后一直报错超时
具体的broker的server日志如下
org.apache.kafka.common.errors.InvalidRequestException: Unexpected Kafka request of type METADATA during SASL handshake.
[2025-06-04 04:04:17,760] WARN [SocketServer listenerType=BROKER, nodeId=4] Unexpected error from /192.168.3.131 (channelId=192.168.3.131:9092-192.168.3.131:38734-2-33); closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.errors.InvalidRequestException: Unexpected Kafka request of type METADATA during SASL handshake.
[2025-06-04 04:04:19,134] WARN [SocketServer listenerType=BROKER, nodeId=4] Unexpected error from /192.168.3.131 (channelId=192.168.3.131:9092-192.168.3.131:38744-0-34); closing connection (org.apache.kafka.common.network.Selector)

请问到底该怎么办呀,琢磨了好几天了

  • 写回答

6条回答 默认 最新

  • 檀越@新空间 2025-06-04 15:29
    关注

    下午好🌅🌅🌅
    本答案参考通义千问

    从你的描述来看,问题的核心在于 Debezium 连接器无法通过 SASL 验证与 Kafka Broker 建立连接。以下是问题分析和解决步骤:


    问题分析

    1. 错误日志关键点

      • Unexpected Kafka request of type METADATA during SASL handshake:这表明 Kafka Broker 在处理请求时,期望一个 SASL 握手过程,但收到了一个 METADATA 请求。
      • 这通常是由于客户端(Debezium)和 Broker 的 SASL 配置不匹配导致的。
    2. Broker 配置问题

      • Broker 的 listenersadvertised.listeners 配置为 BROKER://,但没有明确指定 SASL 协议。
      • Broker 的 listener.security.protocol.map 中定义了 BROKER:SASL_PLAINTEXT,但客户端可能未正确识别或使用该协议。
    3. Debezium 配置问题

      • Debezium 的 database.history.kafka.security.protocol 设置为 SASL_PLAINTEXT,但客户端可能未正确传递 SASL 配置到 Broker。
      • Debezium 的 database.history.kafka.sasl.jaas.config 和 Kafka Broker 的 super.users 配置需要严格一致。
    4. 网络或防火墙问题

      • 确保 Kafka Broker 的监听地址和端口在防火墙中开放,并且客户端能够访问这些地址。

    解决方案

    以下是解决问题的具体步骤:

    1. 检查 Kafka Broker 配置

    确保 Broker 的配置正确无误。以下是关键配置项的检查和调整建议:

    # Listener 和 advertised.listeners 必须包含 SASL 协议
    listeners=SASL_PLAINTEXT://:9092
    advertised.listeners=SASL_PLAINTEXT://192.168.3.131:9092
    
    # 明确指定协议映射
    listener.security.protocol.map=SASL_PLAINTEXT:SASL_PLAINTEXT
    
    # 允许 SCRAM-SHA-512 机制
    sasl.enabled.mechanisms=SCRAM-SHA-512
    sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
    
    # 超级用户配置
    super.users=User:admin
    
    # 允许未定义的 ACL
    allow.everyone.if.no.acl.found=true
    

    2. 检查 Debezium 配置

    确保 Debezium 的配置与 Kafka Broker 的 SASL 配置完全一致。以下是调整建议:

    {
      "name": "mysql-connector-master-slave",
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": 1,
        "topic.prefix": "mydb-",
        "database.user": "debezium",
        "database.password": "FX1zcqlifqnm",
        "database.server.id": "1",
        "database.hostname": "192.168.3.101",
        "database.port": "6446",
        "database.server.name": "myCluster",
        "database.history.storage.mode": "Kafka",
        "schema.history.internal.kafka.bootstrap.servers":"192.168.3.131:9092,192.168.3.132:9092,192.168.3.133:9092",
        "schema.history.internal.kafka.topic": "schema-changes.myCluster",
        "database.history.kafka.bootstrap.servers": "192.168.3.131:9092,192.168.3.132:9092,192.168.3.133:9092",
        "database.history.kafka.topic": "schema-changes.myCluster",
        "database.history.skip.unparseable.ddl": "true",
        "database.history.kafka.security.protocol": "SASL_PLAINTEXT",
        "database.history.kafka.sasl.mechanism": "SCRAM-SHA-512",
        "database.history.kafka.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"cgp45sEdqj4iIT1g\";",
        "transforms": "route",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "(.*)\\.(.*)",
        "transforms.route.replacement": "${2}.${1}",
        "offset.storage.file.filename": "/tmp/offsets-master-slave.dat",
        "offset.flush.interval.ms": 60000,
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "database.replicaHost": "192.168.3.101",
        "database.replicaPort": "6447",
        "database.replicaUser": "debezium",
        "database.replicaPassword": "FX1zcqlifqnm",
        "status.monitoring.enabled": "true",
        "health.check.interval.ms": 30000,
        "failover.timeout": 5000,
        "failover.retries": 3
      }
    }
    

    3. 验证 SASL JAAS 配置

    确保 Debezium 的 database.history.kafka.sasl.jaas.config 和 Kafka Broker 的 super.users 配置一致。例如:

    • Broker 的 super.users 配置为 User:admin
    • Debezium 的 database.history.kafka.sasl.jaas.config 中的用户名必须为 admin,密码为 cgp45sEdqj4iIT1g

    4. 检查网络连通性

    确保 Kafka Broker 的监听地址和端口可以被 Debezium 访问。可以通过以下命令测试连通性:

    telnet 192.168.3.131 9092
    

    如果连接失败,请检查防火墙规则或网络配置。


    5. 启动 Debezium 连接器

    完成上述配置后,重新启动 Debezium 连接器,并观察日志。如果问题仍未解决,请提供完整的日志信息以便进一步分析。


    总结

    通过以上步骤,你应该能够解决 InvalidRequestException 和 SASL 握手失败的问题。如果仍有疑问,请提供更详细的日志信息或配置文件内容。

    评论

报告相同问题?

问题事件

  • 创建了问题 6月4日