使用debezium连接mysql innodb cluster数据库集群和kafka时候出现了问题
连接使用的是kafka connect
kafka集群是控制器节点和broker节点分离的方式部署的
broker配置如下
node.id=4
process.roles=broker
controller.quorum.voters=1@192.168.3.131:9093,2@192.168.3.132:9093,3@192.168.3.133:9093
listeners=BROKER://:9092
advertised.listeners=BROKER://192.168.3.131:9092
listener.security.protocol.map=BROKER:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
inter.broker.listener.name=BROKER
controller.listener.names=CONTROLLER
sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.mechanism.controller.protocol=PLAIN
super.users=User:admin
allow.everyone.if.no.acl.found=true
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
log.dirs=/opt/kafka/kafkab/kafka_2.13-4.0.0/data
采用的是scram的形式,具体的账密在sasl文件里
debezium的json配置如下
{
"name": "mysql-connector-master-slave",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": 1,
"topic.prefix": "mydb-",
"database.user": "debezium",
"database.password": "FX1zcqlifqnm",
"database.server.id": "1",
"database.hostname": "192.168.3.101",
"database.port": "6446",
"database.server.name": "myCluster",
"database.history.storage.mode": "Kafka",
"schema.history.internal.kafka.bootstrap.servers":"192.168.3.131:9092,192.168.3.132:9092,192.168.3.133:9092",
"schema.history.internal.kafka.topic": "schema-changes.myCluster",
"database.history.kafka.bootstrap.servers": "192.168.3.131:9092,192.168.3.132:9092,192.168.3.133:9092",
"database.history.kafka.topic": "schema-changes.myCluster",
"database.history.skip.unparseable.ddl": "true",
"database.history.kafka.security.protocol": "SASL_PLAINTEXT",
"database.history.kafka.sasl.mechanism": "SCRAM-SHA-512",
"database.history.kafka.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"cgp45sEdqj4iIT1g\";",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "(.*)\\.(.*)",
"transforms.route.replacement": "${2}.${1}",
"offset.storage.file.filename": "/tmp/offsets-master-slave.dat",
"offset.flush.interval.ms": 60000,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"database.replicaHost": "192.168.3.101",
"database.replicaPort": "6447",
"database.replicaUser": "debezium",
"database.replicaPassword": "FX1zcqlifqnm",
"status.monitoring.enabled": "true",
"health.check.interval.ms": 30000,
"failover.timeout": 5000,
"failover.retries": 3
}
}
但开启连接器后一直报错超时
具体的broker的server日志如下
org.apache.kafka.common.errors.InvalidRequestException: Unexpected Kafka request of type METADATA during SASL handshake.
[2025-06-04 04:04:17,760] WARN [SocketServer listenerType=BROKER, nodeId=4] Unexpected error from /192.168.3.131 (channelId=192.168.3.131:9092-192.168.3.131:38734-2-33); closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.errors.InvalidRequestException: Unexpected Kafka request of type METADATA during SASL handshake.
[2025-06-04 04:04:19,134] WARN [SocketServer listenerType=BROKER, nodeId=4] Unexpected error from /192.168.3.131 (channelId=192.168.3.131:9092-192.168.3.131:38744-0-34); closing connection (org.apache.kafka.common.network.Selector)
请问到底该怎么办呀,琢磨了好几天了