tylrr 2024-03-28 20:53 采纳率: 84.6%
浏览 24
已结题

使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。

使用rabbitMQ 消息队列保存了待处理的URL,然后使用 一个浏览器库 进行多线程处理,遇到的问题。

例如队列中有13个url。 我的代码规定了最多用5个线程进行处理,程序运行结束时会剩下3个url没有处理。 我对rabbitMQ不熟悉,应该是这个逻辑错了。

img

async def processTask(message: aio_pika.abc.AbstractIncomingMessage):
    try:
        # 将消息添加到列表中
        messagesList.append(message)
        #当集齐5条消息时进行多线程处理
        if len(messagesList) == 5:
            messages_to_process = messagesList[:]  # 创建一个副本,避免修改原始列表
            # 确定线程池的最大工作线程数
            max_workers = 5 if len(messages_to_process) >= 5 else len(messages_to_process)
            with ThreadPoolExecutor(max_workers=max_workers) as pool:
                futures = {pool.submit(pageCollect, messageItem): messageItem for messageItem in
                                         messagesList}
                for future in as_completed(futures):
                    messageItem = messages_to_process.pop(0)
                    try:
                        resultDict = future.result()
                        if resultDict:
                            await messageItem.ack()
                            # 将结果字符串发送到结果队列中
                            logger.info(f'任务{messageItem.delivery_tag}的结果是: {resultDict}')
                        else:
                            await messageItem.reject(requeue=True)
                    except Exception as e:
                        logger.error(f'任务{messageItem.delivery_tag}发生错误: {e}')
                        await messageItem.reject(requeue=True)
            linkList.clear()
            messagesList.clear()

        #当消息队列不足5条消息时,把剩余的消息数量作为线程数量进行处理
        remaining_messages = queue.declaration_result.message_count
        if len(messagesList) == remaining_messages:
            messages_to_process = messagesList[:]  # 创建一个副本,避免修改原始列表
            # 确定线程池的最大工作线程数
            max_workers = len(messages_to_process)
            with ThreadPoolExecutor(max_workers=max_workers) as pool:
                futures = {pool.submit(pageCollect, messageItem): messageItem for messageItem in
                           messagesList}
                for future in as_completed(futures):
                    messageItem = messages_to_process.pop(0)
                    try:
                        resultDict = future.result()
                        if resultDict:
                            await messageItem.ack()
                            # 将结果字符串发送到结果队列中
                            logger.info(f'任务{messageItem.delivery_tag}的结果是: {resultDict}')
                        else:
                            await messageItem.reject(requeue=True)
                    except Exception as e:
                        logger.error(f'任务{messageItem.delivery_tag}发生错误: {e}')
                        await messageItem.reject(requeue=True)
            linkList.clear()
            messagesList.clear()

        await asyncio.sleep(0.1)

    except Exception as e:
        print(e)


请不要用AI作答!
请不要用AI作答
请不要用AI作答

  • 写回答

25条回答 默认 最新

  • 专家-皆知 2024-03-28 20:53
    关注

    引自免费微信小程序:皆我百晓生

    在处理使用rabbitMQ发送的队列和多个浏览器线程并发抓取数据的问题时,可能会出现以下几种情况:

    1. 消息队列只保存了5个任务: 当队列中的任务数达到5时,我们尝试将这些任务分配给不同的线程进行处理,但是由于ThreadPoolExecutor默认的工作线程数量(max_workers)限制为5,实际产生的线程数可能小于5。在这种情况下,可能会导致某些未处理的任务丢失或在较短的时间内完成,这可能会导致队列数量变为3,即队列中仍有3个未处理的任务。

    2. 在线程获取到消息后未及时处理: 在多线程爬取过程中,当线程获取到一个待处理的消息时,如果没有立即将其放入结果队列中,而是被丢弃在任务队列中等待其他线程处理,那么未处理的消息就会继续积压,影响后续处理任务的效率。这会导致在多线程并发处理的过程中,先执行完主线程的任务,再进行后续任务,最终造成处理队列中剩余的3个未处理的任务。

    3. 异步操作失败或异常: 多个浏览器线程并行请求同一组URL,可能出现同步错误、网络问题、进程通信错误等问题,导致一些任务无法正常发送或接收到结果,从而影响到整个多线程爬取任务的处理。这些异步操作如发送POST请求、解析HTML、API调用等都可能引发这种问题。

    4. 队列实现复杂度: 队列的设计和实现方式可能会影响其性能。当队列过大时,可能导致链表或其他动态数据结构的维护变得困难,甚至可能导致锁争抢、死锁等问题,降低整体的处理效率。

    针对上述问题,我们可以采取以下解决方案:

    1. 调整线程池大小: 对于实际运行中发现的不超过5个任务的情况,我们可以尝试设置更高的线程池大小。具体来说,可以使用ThreadPoolExecutormaximum_workers参数,将线程池的最大工作线程数设置为num_threads + 1,例如:

    import concurrent.futures
    
    # 设置最大线程数
    max_workers = 6
    executor = ThreadPoolExecutor(max_workers=max_workers)
    

    这样即使队列中的任务数量超过5,也不会超出最大线程数限制,保证所有任务都能按照预期处理。但需要注意的是,增大线程池大小可能会增加系统资源消耗,应根据实际环境条件合理设定,并考虑到其他相关因素,如CPU利用率、内存使用情况等。

    2. 优化消息分发策略: 对于队列长度达到5或更长的情况下,我们需要优化消息分发策略,确保每个线程至少有一个完整的任务。一种可能的方法是使用队列的weight属性,设置每条任务的优先级,使得包含更多任务的组优先处理。这样,优先级较高的任务将首先发送到队列,确保在处理过程中不会因少数重要任务而空闲过多时间。

    from aixent.utils import weight
    
    # 创建权重队列
    weights_queue = Queue(weight=weight)
    
    def pageCollect(message_item: aio_pika.abc.AmqpMessage):
        message_item.weight = weights_queue.size + 1
        message_queue.put(message_item)
    

    在这个版本的函数中,我们将优先级添加到权重队列,它在处理完成后自动减少对应的权重值,使待处理的任务有机会进一步处理。然后,当需要处理队列的最后一个未处理的任务时,我们只需要从权重队列中取出对应的Queue.Item,进行处理即可。

    3. 异步编程模型: 对于涉及到网页请求和渲染的异步任务,可以采用基于事件驱动的异步编程模型,比如使用aiohttp库。在这个模型中,请求和服务层之间的交互通常是通过事件触发的,而不是单线程直接操作客户端资源,大大提高了并发处理能力。

    from aiohttp import web, ClientSession
    import json
    
    async def process_task(event, *args):
        url = event['url']
        async with ClientSession(**event.get('kwargs')) as session:
            response = await session.get(url)
            data = await response.json()
            await task_queue.put(data)
    
    async def task_queue():
        tasks = []
        while True:
            task = await task_queue.popleft()
            tasks.append(task)
            if tasks:
                await asyncio.gather(*tasks)
            else:
                break
    
    class Application(web.Application):
        @web.event
        def on_event(self, event):
            task_queue()
    
    if __name__ == '__main__':
        app = Application()
        app.run(host='0.0.0.0', port=80)
    

    在这个版本的代码中,我们在on_event方法中创建了一个事件队列task_queue(),用于存储待处理的任务。每次新的URL请求触发事件时,我们都会从队列中取出一个任务,对其进行处理并将其添加回队列。然后,我们进入一个无限循环,直到队列为空为止。这样,在多线程环境中,即使队列中有大量的未处理任务,也可以保持线程间的有序连接,提高处理效率。

    4. 使用长轮询和高可用性技术: 如果服务器端负载过高,可以考虑使用长轮询和高可用性的技术来优化访问频率,避免频繁阻塞线程。长轮询是一种将请求发送至远程服务器的间隔延时技术,它可以在一定时间内重复发送同一个请求,如果请求次数达到阈值,就切换到其他节点发送请求,以保证系统的服务稳定性。此外,还可以利用分布式缓存如Redis、Memcached等来缓存热门URL和结果,减少数据库查询次数,提升读写性能。

    5. 构建更健壮的测试框架: 使用像pytest这样的Python测试框架进行集成测试和单元测试,覆盖多线程下的各项功能和逻辑,确保在所有并发环境下都能正确地遍历队列,检查任务是否成功发送,以及结果是否正确显示。同时,可以通过添加断言、期望值等方式在函数、类、模块层次上实现断言规则,确保在整个应用流程中任务的状态符合预期。

    以上就是在处理使用rabbitMQ发送的队列和多个浏览器线程并发抓取数据问题时,避免队列溢出、线程混乱、异步操作失败等问题的一些建议和解决方案。在实际应用中,根据具体需求和技术栈的特点,结合上述方法进行灵活组合和配置,可有效提高多线程爬取的性能和可靠性。

    评论 编辑记录

报告相同问题?

问题事件

  • 系统已结题 4月5日
  • 修改了问题 3月28日
  • 创建了问题 3月28日

悬赏问题

  • ¥170 如图所示配置eNSP
  • ¥20 docker里部署springboot项目,访问不到扬声器
  • ¥15 netty整合springboot之后自动重连失效
  • ¥15 悬赏!微信开发者工具报错,求帮改
  • ¥20 wireshark抓不到vlan
  • ¥20 关于#stm32#的问题:需要指导自动酸碱滴定仪的原理图程序代码及仿真
  • ¥20 设计一款异域新娘的视频相亲软件需要哪些技术支持
  • ¥15 stata安慰剂检验作图但是真实值不出现在图上
  • ¥15 c程序不知道为什么得不到结果
  • ¥15 键盘指令混乱情况下的启动盘系统重装