上游会给我发两个消息 第一个消息是任务信息 第二个是用户信息,其中有用户id和任务id,我的需求是 我要在消费第二个消息的时候 再去第一个消息中寻找属于我自己用户的任务 并保存在数据库中
1条回答 默认 最新
关注
让阿豪来帮你解答,本回答参考chatgpt3.5编写提供,如果还有疑问可以评论或留言
问题解答:- 消费者需要监听两个消息队列,分别为任务信息队列和用户信息队列,并分别实现消息消费的回调函数。
- 在消费用户信息的回调函数中,获取用户id和任务id的信息。
- 在消费任务信息的回调函数中,通过任务id查找该任务是否属于本消费者的用户。
- 如果任务属于本消费者的用户,则将任务保存到数据库中。 案例: 假设任务信息和用户信息分别存储在两个消息队列中。队列消息的格式如下: 任务信息队列消息格式:{"task_id": 1, "user_id": 2, "task_name": "任务1"} 用户信息队列消息格式:{"user_id": 2} 消费者代码如下:
import json import pika import MySQLdb conn = MySQLdb.connect(host='localhost', port=3306, user='user', passwd='passwd', db='db') cursor = conn.cursor() def callback_task(ch, method, properties, body): task_info = json.loads(body.decode()) # 判断任务是否属于本消费者的用户 if task_info["user_id"] == user_id: # 存储任务到数据库 sql = "INSERT INTO task (task_id, task_name) VALUES (%d, '%s')" % (task_info["task_id"], task_info["task_name"]) cursor.execute(sql) conn.commit() print("保存任务成功!") def callback_user(ch, method, properties, body): user_info = json.loads(body.decode()) global user_id user_id = user_info["user_id"] print("收到用户信息:", user_info) # 连接消息队列 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel1 = connection.channel() channel2 = connection.channel() # 监听任务信息队列和用户信息队列 channel1.basic_consume(queue='task_queue', on_message_callback=callback_task, auto_ack=True) channel2.basic_consume(queue='user_queue', on_message_callback=callback_user, auto_ack=True) # 开始消费消息 channel1.start_consuming() channel2.start_consuming()
在消费用户信息的回调函数中获取用户id,然后将其赋值给全局变量user_id。 在消费任务信息的回调函数中,判断任务是否属于本消费者的用户。如果是,则将任务保存到数据库中。 需要注意的是,在代码中需要正确地连接到MySQL数据库和RabbitMQ消息队列,并且消费者需要具有正确的权限。
解决 无用评论 打赏 举报
悬赏问题
- ¥25 使用cube ai 导入onnx模型时报错
- ¥15 关于#微信小程序#的问题:用一个网页显示所有关联的微信小程序数据,包括每个小程序的用户访问量
- ¥15 root的安卓12系统上,如何使apk获得root或者高级别的系统权限?
- ¥20 关于#matlab#的问题:如果用MATLAB函数delayseq可以对分数延时,但是延时后波形较原波形有幅度上的改变
- ¥15 使用华为ENSP软件模拟实现该实验拓扑
- ¥15 通过程序读取主板上报税口的数据
- ¥15 matlab修改为并行
- ¥15 大数据应用基础专业问题
- ¥20 数据分析出错了,希望有能人看看,解决一下
- ¥15 尝试访问%1服务的windows注册表时遇到问题。必须先解决此问题,然后才能运行安装过程。(请确认您正在使用管理员权限运行)373