wendy_stt 2024-07-23 16:23 采纳率: 66.7%
浏览 4

python这边生产出kafka信息,kafka那边咋消费不到

我这个代码感觉没问题,为啥我生产处来后,然后kafka那边消费不到

from kafka import KafkaProducer  
import json  

# 创建Kafka生产者  
producer = KafkaProducer(bootstrap_servers=['192.168.1.220:9093'],  
                          value_serializer=lambda m: json.dumps(m).encode('utf-8'))  

# 准备要发送的数据  
message = {"id": 1, "content": "Hello, Kafka!"}  

# 发送数据并等待所有异步消息发送完成  
future = producer.send('test', message)  
try:  
    record_metadata = future.get(timeout=60)  # 等待直到消息被确认或抛出异常  
    print(f"Message sent to {record_metadata.topic} partition={record_metadata.partition} offset={record_metadata.offset}")  
except Exception as e:  
    print(f"An error occurred: {e}")  

# 关闭生产者  
producer.close()

执行老报错
An error occurred: KafkaTimeoutError: Batch for TopicPartition(topic='test', partition=0) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time

  • 写回答

3条回答 默认 最新

  • 阿里嘎多学长 2024-07-23 16:23
    关注

    以下内容由AIGC及阿里嘎多学长共同生成、有用望采纳:


    遇到Kafka消息生产和消费问题时,可以按照以下步骤进行排查:

    1. 检查生产者代码

      • 确保bootstrap_servers地址和端口正确无误。
      • 检查value_serializer是否正确将消息序列化成Kafka能够接收的格式。
    2. 检查消费者配置

      • 确保消费者连接的服务器地址和端口与生产者一致。
      • 确保消费者订阅的主题名称与生产者发送的主题一致。
    3. 网络连接检查

      • 使用pingtelnet命令测试Kafka服务器的网络连通性。
      • 检查防火墙设置,确保没有规则阻止了生产者和消费者之间的通信。
    4. Kafka服务器日志

      • 查看Kafka服务器的日志文件,通常位于/var/log/kafka/目录下,寻找可能的错误或警告信息。
    5. 确认主题存在

      • 使用Kafka命令行工具检查主题是否存在:kafka-topics.sh --list --bootstrap-server 192.168.1.220:9093
      • 如果主题不存在,使用kafka-topics.sh --create --topic test --bootstrap-server 192.168.1.220:9093 --partitions 1 --replication-factor 1创建主题。
    6. 调整timeout参数

      • 如果future.get(timeout=60)超时,可能需要增加timeout值,或者检查Kafka服务器的性能和负载情况。
    7. 深入调查

      • 如果以上步骤都无法解决问题,可能需要检查Kafka的详细配置,比如server.properties文件中的设置。
      • 考虑查看Kafka的监控工具,如Kafka Manager或Confluent Control Center,以获取更多信息。
    8. 社区帮助

      • 如果问题依然无法解决,可以在Kafka社区论坛或Stack Overflow等平台上寻求帮助,提供详细的错误信息和配置细节。
    9. 代码审查

      • 审查生产者代码,确保没有逻辑错误,比如是否在发送消息后立即关闭了生产者,这可能导致消息发送不完整。
    10. 消费者代码检查

      • 确保消费者代码正确处理了消息接收逻辑,没有出现异常或错误处理不当的情况。

    按照这些步骤进行排查,通常可以定位并解决Kafka消息生产和消费的问题。如果问题依然存在,可能需要更详细的信息来进行进一步的分析。

    评论 编辑记录

报告相同问题?

问题事件

  • 修改了问题 7月23日
  • 创建了问题 7月23日

悬赏问题

  • ¥15 35114 SVAC视频验签的问题
  • ¥15 impedancepy
  • ¥15 在虚拟机环境下完成以下,要求截图!
  • ¥15 求往届大挑得奖作品(ppt…)
  • ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
  • ¥50 浦育平台scratch图形化编程
  • ¥20 求这个的原理图 只要原理图
  • ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
  • ¥20 微信的店铺小程序如何修改背景图
  • ¥15 UE5.1局部变量对蓝图不可见