代码缝合 2024-03-19 19:29 采纳率: 22.2%
浏览 4
已结题

eclipse paho mqtt 消息消费问题

上游会给我发两个消息 第一个消息是任务信息 第二个是用户信息,其中有用户id和任务id,我的需求是 我要在消费第二个消息的时候 再去第一个消息中寻找属于我自己用户的任务 并保存在数据库中

  • 写回答

1条回答 默认 最新

  • 码农阿豪@新空间代码工作室 Java领域优质创作者 2024-03-19 19:33
    关注
    让阿豪来帮你解答,本回答参考chatgpt3.5编写提供,如果还有疑问可以评论或留言
    问题解答:
    1. 消费者需要监听两个消息队列,分别为任务信息队列和用户信息队列,并分别实现消息消费的回调函数。
    2. 在消费用户信息的回调函数中,获取用户id和任务id的信息。
    3. 在消费任务信息的回调函数中,通过任务id查找该任务是否属于本消费者的用户。
    4. 如果任务属于本消费者的用户,则将任务保存到数据库中。 案例: 假设任务信息和用户信息分别存储在两个消息队列中。队列消息的格式如下: 任务信息队列消息格式:{"task_id": 1, "user_id": 2, "task_name": "任务1"} 用户信息队列消息格式:{"user_id": 2} 消费者代码如下:
    import json
    import pika
    import MySQLdb
    conn = MySQLdb.connect(host='localhost', port=3306, user='user', passwd='passwd', db='db')
    cursor = conn.cursor()
    def callback_task(ch, method, properties, body):
        task_info = json.loads(body.decode())
        # 判断任务是否属于本消费者的用户
        if task_info["user_id"] == user_id:
            # 存储任务到数据库
            sql = "INSERT INTO task (task_id, task_name) VALUES (%d, '%s')" % (task_info["task_id"], task_info["task_name"])
            cursor.execute(sql)
            conn.commit()
            print("保存任务成功!")
    def callback_user(ch, method, properties, body):
        user_info = json.loads(body.decode())
        global user_id
        user_id = user_info["user_id"]
        print("收到用户信息:", user_info)
    # 连接消息队列
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel1 = connection.channel()
    channel2 = connection.channel()
    # 监听任务信息队列和用户信息队列
    channel1.basic_consume(queue='task_queue', on_message_callback=callback_task, auto_ack=True)
    channel2.basic_consume(queue='user_queue', on_message_callback=callback_user, auto_ack=True)
    # 开始消费消息
    channel1.start_consuming()
    channel2.start_consuming()
    

    在消费用户信息的回调函数中获取用户id,然后将其赋值给全局变量user_id。 在消费任务信息的回调函数中,判断任务是否属于本消费者的用户。如果是,则将任务保存到数据库中。 需要注意的是,在代码中需要正确地连接到MySQL数据库和RabbitMQ消息队列,并且消费者需要具有正确的权限。

    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 3月19日
  • 创建了问题 3月19日

悬赏问题

  • ¥25 使用cube ai 导入onnx模型时报错
  • ¥15 关于#微信小程序#的问题:用一个网页显示所有关联的微信小程序数据,包括每个小程序的用户访问量
  • ¥15 root的安卓12系统上,如何使apk获得root或者高级别的系统权限?
  • ¥20 关于#matlab#的问题:如果用MATLAB函数delayseq可以对分数延时,但是延时后波形较原波形有幅度上的改变
  • ¥15 使用华为ENSP软件模拟实现该实验拓扑
  • ¥15 通过程序读取主板上报税口的数据
  • ¥15 matlab修改为并行
  • ¥15 大数据应用基础专业问题
  • ¥20 数据分析出错了,希望有能人看看,解决一下
  • ¥15 尝试访问%1服务的windows注册表时遇到问题。必须先解决此问题,然后才能运行安装过程。(请确认您正在使用管理员权限运行)373