使用rabbitMQ 消息队列保存了待处理的URL,然后使用 一个浏览器库 进行多线程处理,遇到的问题。
例如队列中有13个url。 我的代码规定了最多用5个线程进行处理,程序运行结束时会剩下3个url没有处理。 我对rabbitMQ不熟悉,应该是这个逻辑错了。
async def processTask(message: aio_pika.abc.AbstractIncomingMessage):
try:
# 将消息添加到列表中
messagesList.append(message)
#当集齐5条消息时进行多线程处理
if len(messagesList) == 5:
messages_to_process = messagesList[:] # 创建一个副本,避免修改原始列表
# 确定线程池的最大工作线程数
max_workers = 5 if len(messages_to_process) >= 5 else len(messages_to_process)
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(pageCollect, messageItem): messageItem for messageItem in
messagesList}
for future in as_completed(futures):
messageItem = messages_to_process.pop(0)
try:
resultDict = future.result()
if resultDict:
await messageItem.ack()
# 将结果字符串发送到结果队列中
logger.info(f'任务{messageItem.delivery_tag}的结果是: {resultDict}')
else:
await messageItem.reject(requeue=True)
except Exception as e:
logger.error(f'任务{messageItem.delivery_tag}发生错误: {e}')
await messageItem.reject(requeue=True)
linkList.clear()
messagesList.clear()
#当消息队列不足5条消息时,把剩余的消息数量作为线程数量进行处理
remaining_messages = queue.declaration_result.message_count
if len(messagesList) == remaining_messages:
messages_to_process = messagesList[:] # 创建一个副本,避免修改原始列表
# 确定线程池的最大工作线程数
max_workers = len(messages_to_process)
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(pageCollect, messageItem): messageItem for messageItem in
messagesList}
for future in as_completed(futures):
messageItem = messages_to_process.pop(0)
try:
resultDict = future.result()
if resultDict:
await messageItem.ack()
# 将结果字符串发送到结果队列中
logger.info(f'任务{messageItem.delivery_tag}的结果是: {resultDict}')
else:
await messageItem.reject(requeue=True)
except Exception as e:
logger.error(f'任务{messageItem.delivery_tag}发生错误: {e}')
await messageItem.reject(requeue=True)
linkList.clear()
messagesList.clear()
await asyncio.sleep(0.1)
except Exception as e:
print(e)
请不要用AI作答!
请不要用AI作答
请不要用AI作答