tylrr 2024-02-03 14:41 采纳率: 84.6%
浏览 5
已结题

[Python+rabbitMQ]如何调用aio_pika的方法进行多条消息确认?

大家好,我使用aio_pika库时遇到一个问题。 我的程序逻辑是 从消息队列中获取50个url消息,并作为入参传给处理函数进行一次性处理。然后批量确认。我查了几个AI,他们给出的答案都是错的(给出的方法过时或不对)。请问大家怎么样解决。谢谢

import aio_pika
import time
import asyncio

# 调用100个url消息,获得结果后进行确认
async def checkLink(linkList: list[tuple]):
    try:
        linksListItem = [i[0] for i in linkList]
        urlList = [i["url"] for i in linksListItem]
        resultStr = await myAPI(urlList)   # 输出查询结果
        if resultStr:                                   #正常获得查询结果
            index = 0
            result_dict = {}
            responseList = json.loads(resultStr)  # 输出结果list

            #请问如何修改以下语句实现消息的确认?  在aio_pika最新版本的库中channel貌似没有basic_ack_multiple 方法
            await channel.basic_ack_multiple(delivery_tags=[message.delivery_tag for message in messagesList])
     
            return result_dict
        else:
            return False
    except Exception as e:
        logger.error(f"checkLink异常: {e}")
        logger.error(traceback.format_exc())
        return False

async def Task(message: aio_pika.abc.AbstractIncomingMessage, ):
    try:
        # 将消息添加到列表中
        messagesList.append((pickle.loads(message.body) , message.delivery_tag))
        # 如果消息列表的长度达到100,或者队列为空,就处理消息
        message_count = queue.declaration_result.message_count
        if len(messagesList) ==100 or message_count == 0:
            resultDict = await checkLink(messagesList)
            if resultDict:
                #logger.info(json.dumps(resultDict))
                #print("\n")
                # 将结果字符串发送到结果队列中
                await resultExchange.publish(
                    aio_pika.Message(
                        body=pickle.dumps(resultDict),
                        delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
                        headers={"type": "Result"}
                    ),
                    routing_key="allResult"
                )
                for i in messagesList:
                    await message.ack()
            else:
                # 拒绝并重新排队消息。
                for i in messagesList:
                    await i.reject(requeue=True)
                logger.error("异常")
            linkList.clear()
            messagesList.clear()
        await asyncio.sleep(0.1)
    except Exception as e:
        logger.error(f"出现异常: {e}")
        logger.error(traceback.format_exc())


async def consumer():
    try:
        # 创建连接
        connection = await aio_pika.connect_robust(host='localhost', port=5672, login='admin', password='123123',
                                                   virtualhost='my_vhost', )   
        # 创建信道
        global channel
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=100)
        # 声明被消费队列
        global queue
        queueName = "myqueue"
        queue = await channel.declare_queue(queueName, durable=True)
        # 声明结果队列
        rstqueueName = "Result"
        global resultQuequ
        resultQuequ = await channel.declare_queue(rstqueueName, durable=True)
        # 声明结果交换机
        global resultExchange
        resultExchange = await channel.declare_exchange(name='resultExchange', type='direct')
        # 将结果队列和交换机绑定在一起
        await resultQuequ.bind(resultExchange, routing_key="allResult")

        # 开启消息确认机制。将 no_ack 参数设置为 False 时,需在处理完消息后,调用 message.ack() 方法来向消息队列发送确认信号。
        await queue.consume(Task, no_ack=False)
        try:
            # Wait until terminate
            await asyncio.Future()
        finally:
            await connection.close()
    except Exception as e:
        logger.error(f"出现异常: {e}")
        logger.error(traceback.format_exc())


async def main():
    try:
        # 创建一个任务来消费队列中的任务
        consume_task = asyncio.create_task(consumer())
        # 等待任务完成
        await consume_task
    except Exception as e:
        logger.error(f"出现异常: {e}")
        logger.error(traceback.format_exc())


if __name__ == '__main__':
    asyncio.run(main())
  • 写回答

2条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2024-02-03 14:43
    关注

    引自免费微信小程序:皆我百晓生

    在您提供的代码中,checkLink 函数尝试批量确认多个消息。但是,aio_pika 库确实没有提供类似 basic_ack_multiple 的方法来一次性确认多条消息。为了实现批量确认的功能,您需要遍历 messagesList 中的消息,并逐个调用 message.ack()

    针对您的需求,您可以修改 Task 函数,在处理完 50 条消息后,对这些消息进行逐一确认。以下是调整后的 Task 函数:

    async def Task(message: aio_pika.IncomingMessage):
        try:
            # 将消息添加到列表中
            messagesList.append((pickle.loads(message.body), message.delivery_tag))
            
            # 当消息列表的长度达到50时(而不是100),或者队列为空,就处理消息
            if len(messagesList) == 50 or (queue.declaration_result.message_count == 0 and len(messagesList) > 0):
                resultDict = await checkLink([item[0] for item in messagesList])
                
                if resultDict:
                    # 发送结果到结果队列
                    await resultExchange.publish(
                        aio_pika.Message(
                            body=pickle.dumps(resultDict),
                            delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
                            headers={"type": "Result"}
                        ),
                        routing_key="allResult"
                    )
    
                    # 确认所有已处理的消息
                    for item in messagesList:
                        await item[1].ack()
    
                else:
                    # 拒绝并重新排队所有未确认的消息
                    for item in messagesList:
                        await item[1].reject(requeue=True)
    
                messagesList.clear()
    
            await asyncio.sleep(0.1)
        except Exception as e:
            logger.error(f"出现异常: {e}")
            logger.error(traceback.format_exc())
    

    请注意上述修改点:

    1. if 语句中将条件更改为当消息列表长度为50时触发批量处理。
    2. 调整 checkLink 函数的参数传递方式,只传入url列表。
    3. 在成功处理消息并发送结果到结果队列之后,遍历 messagesList 并对每个消息调用 .ack() 方法进行确认。
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 2月21日
  • 已采纳回答 2月13日
  • 创建了问题 2月3日

悬赏问题

  • ¥15 有了解d3和topogram.js库的吗?有偿请教
  • ¥100 任意维数的K均值聚类
  • ¥15 stamps做sbas-insar,时序沉降图怎么画
  • ¥15 unity第一人称射击小游戏,有demo,在原脚本的基础上进行修改以达到要求
  • ¥15 买了个传感器,根据商家发的代码和步骤使用但是代码报错了不会改,有没有人可以看看
  • ¥15 关于#Java#的问题,如何解决?
  • ¥15 加热介质是液体,换热器壳侧导热系数和总的导热系数怎么算
  • ¥100 嵌入式系统基于PIC16F882和热敏电阻的数字温度计
  • ¥15 cmd cl 0x000007b
  • ¥20 BAPI_PR_CHANGE how to add account assignment information for service line