大家好,我使用aio_pika库时遇到一个问题。 我的程序逻辑是 从消息队列中获取50个url消息,并作为入参传给处理函数进行一次性处理。然后批量确认。我查了几个AI,他们给出的答案都是错的(给出的方法过时或不对)。请问大家怎么样解决。谢谢
import aio_pika
import time
import asyncio
# 调用100个url消息,获得结果后进行确认
async def checkLink(linkList: list[tuple]):
try:
linksListItem = [i[0] for i in linkList]
urlList = [i["url"] for i in linksListItem]
resultStr = await myAPI(urlList) # 输出查询结果
if resultStr: #正常获得查询结果
index = 0
result_dict = {}
responseList = json.loads(resultStr) # 输出结果list
#请问如何修改以下语句实现消息的确认? 在aio_pika最新版本的库中channel貌似没有basic_ack_multiple 方法
await channel.basic_ack_multiple(delivery_tags=[message.delivery_tag for message in messagesList])
return result_dict
else:
return False
except Exception as e:
logger.error(f"checkLink异常: {e}")
logger.error(traceback.format_exc())
return False
async def Task(message: aio_pika.abc.AbstractIncomingMessage, ):
try:
# 将消息添加到列表中
messagesList.append((pickle.loads(message.body) , message.delivery_tag))
# 如果消息列表的长度达到100,或者队列为空,就处理消息
message_count = queue.declaration_result.message_count
if len(messagesList) ==100 or message_count == 0:
resultDict = await checkLink(messagesList)
if resultDict:
#logger.info(json.dumps(resultDict))
#print("\n")
# 将结果字符串发送到结果队列中
await resultExchange.publish(
aio_pika.Message(
body=pickle.dumps(resultDict),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
headers={"type": "Result"}
),
routing_key="allResult"
)
for i in messagesList:
await message.ack()
else:
# 拒绝并重新排队消息。
for i in messagesList:
await i.reject(requeue=True)
logger.error("异常")
linkList.clear()
messagesList.clear()
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"出现异常: {e}")
logger.error(traceback.format_exc())
async def consumer():
try:
# 创建连接
connection = await aio_pika.connect_robust(host='localhost', port=5672, login='admin', password='123123',
virtualhost='my_vhost', )
# 创建信道
global channel
channel = await connection.channel()
await channel.set_qos(prefetch_count=100)
# 声明被消费队列
global queue
queueName = "myqueue"
queue = await channel.declare_queue(queueName, durable=True)
# 声明结果队列
rstqueueName = "Result"
global resultQuequ
resultQuequ = await channel.declare_queue(rstqueueName, durable=True)
# 声明结果交换机
global resultExchange
resultExchange = await channel.declare_exchange(name='resultExchange', type='direct')
# 将结果队列和交换机绑定在一起
await resultQuequ.bind(resultExchange, routing_key="allResult")
# 开启消息确认机制。将 no_ack 参数设置为 False 时,需在处理完消息后,调用 message.ack() 方法来向消息队列发送确认信号。
await queue.consume(Task, no_ack=False)
try:
# Wait until terminate
await asyncio.Future()
finally:
await connection.close()
except Exception as e:
logger.error(f"出现异常: {e}")
logger.error(traceback.format_exc())
async def main():
try:
# 创建一个任务来消费队列中的任务
consume_task = asyncio.create_task(consumer())
# 等待任务完成
await consume_task
except Exception as e:
logger.error(f"出现异常: {e}")
logger.error(traceback.format_exc())
if __name__ == '__main__':
asyncio.run(main())