我这个代码感觉没问题,为啥我生产处来后,然后kafka那边消费不到
from kafka import KafkaProducer
import json
# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers=['192.168.1.220:9093'],
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
# 准备要发送的数据
message = {"id": 1, "content": "Hello, Kafka!"}
# 发送数据并等待所有异步消息发送完成
future = producer.send('test', message)
try:
record_metadata = future.get(timeout=60) # 等待直到消息被确认或抛出异常
print(f"Message sent to {record_metadata.topic} partition={record_metadata.partition} offset={record_metadata.offset}")
except Exception as e:
print(f"An error occurred: {e}")
# 关闭生产者
producer.close()
执行老报错
An error occurred: KafkaTimeoutError: Batch for TopicPartition(topic='test', partition=0) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time