Dawn_zhu 2020-05-26 19:42 采纳率: 0%
浏览 396
已结题

使用 Python 协程多并发下载 任务数卡住不动

在使用 python 协程下载图片中,最终协程的任务数 卡在 97 一直循环,不知道哪里出了问题,有大佬知道什么情况吗,困扰我好久
下图是我运行结果,附上代码。
图片说明

# -*- coding: utf-8 -*-
import requests
from lxml import etree
import time
import os
import pandas as pd
import asyncio
import aiohttp
import aiomysql
from random import randint
import cchardet
import aiofiles
import logging


class sikupicture_Spider(object):
    def __init__(self):
        # self.seens_url = []
        self.loop = asyncio.get_event_loop()
        self.queue = asyncio.PriorityQueue()
        self._workers = 0  # 当前工作数
        self._max_workers = 150  # 最大工作数
        self.overtime = {}  # {url: times,} 记录失败的URL的次数
        self.overtime_threshold = 4
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36",
        }
        self.list_content = []

    async def init_url(self):
        info = pd.read_excel(r"{}".format(os.path.abspath('moban.xlsx'))).fillna('')
        for ite in info.itertuples():
            await self.queue.put((randint(1, 5), getattr(ite, 'url')))

    async def fetch(self, session, url, timeout, headers=None, binary=False, proxy=None):
        _headers = self.headers
        if headers:
            _headers = headers
        try:
            async with session.get(url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp:
                status_code = resp.status
                if status_code == 403:
                    print("url-403", url)
                    if url in self.overtime:
                        self.overtime[url] += 1
                        if self.overtime[url] > self.overtime_threshold:
                            pass
                        await self.queue.put((randint(1, 5), url))
                    else:
                        self.overtime[url] = 1
                        await self.queue.put((randint(1, 5), url))
                    status_code = 0
                    html = None
                if binary:
                    text = await resp.read()
                    encoding = cchardet.detect(text)
                    html = text.encode(encoding, errors='ignore')
                else:
                    html = await resp.text()

        except TimeoutError:
            print("url-overtime", url)
            if url in self.overtime:
                self.overtime[url] += 1
                if self.overtime[url] > self.overtime_threshold:
                    pass
                await self.queue.put((randint(1, 5), url))
            else:
                self.overtime[url] = 1
                await self.queue.put((randint(1, 5), url))
            status_code = 0
            html = None
        return status_code, html

    async def download_img(self, session, img_url, timeout, url, headers=None, binary=True, proxy=None):
        _headers = self.headers
        if headers:
            _headers = headers
        try:
            async with session.get(img_url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp:
                status_code = resp.status
                if binary:
                    html = await resp.read()
                else:
                    html = await resp.text()
        except TimeoutError:
            print("url-overtime", img_url)
            if url in self.overtime:
                self.overtime[url] += 1
                if self.overtime[url] > self.overtime_threshold:
                    pass
                else:
                    await self.queue.put((randint(1, 5), url))
            else:
                self.overtime[url] = 1
                await self.queue.put((randint(1, 5), url))
            status_code = 0
            html = None
        return status_code, html

    def parse_source(self, source):
        try:
            response_1 = etree.HTML(source)
        except Exception as err:
            logging.error(f'parse error:{err}')
            url = ""
        else:
            img_url = response_1.xpath("//a[@href='javascript:;']/@supsrc")[0] if len(
                response_1.xpath("//a[@href='javascript:;']/@supsrc")[0]) else ""
        return img_url

    async def process(self, session, url, timeout):
        status, source = await self.fetch(session, url, timeout)
        file_name = url.replace("http://item.secoo.com/", "").replace(".shtml", "")
        if status == 200:
            img_url = self.parse_source(source)
            img_status, img_source = await self.download_img(session, img_url, timeout, url)
            if img_status == 200:
                async with aiofiles.open("F:\\dawnzhu\\picture\\"+file_name+".jpg", "wb") as f:
                    await f.write(img_source)
            self._workers -= 1
            print("任务完成", self._workers, "url_status", status, "img_status", img_status)
        else:
            self._workers -= 1
            print("任务完成", self._workers, "url_status", status,)

    async def loop_crawl(self):
        await self.init_url()
        timeout = aiohttp.ClientTimeout(total=20)
        conn = aiohttp.TCPConnector(loop=self.loop, limit=50, force_close=True, enable_cleanup_closed=True)
        session = aiohttp.ClientSession(connector=conn, timeout=timeout)
        while True:
            if self._workers >= self._max_workers:
                print("work 的判断")
                await asyncio.sleep(5)
                continue
            if self.queue.empty():
                print("队列是否为空....", self._workers)
                await asyncio.sleep(5)
                if self._workers == 0:
                    break
                continue
            _, url = await self.queue.get()
            asyncio.ensure_future(self.process(session, url, timeout))
            self._workers += 1
            print("队列剩余数量", self.queue.qsize(), self._workers)
        await session.close()

    def run(self):
        try:
            self.loop.run_until_complete(self.loop_crawl())
        except KeyboardInterrupt:
            self.loop.close()

if __name__ == '__main__':
    sp = sikupicture_Spider()
    sp.run()

  • 写回答

1条回答 默认 最新

  • zqbnqsdsmd 2020-09-17 00:13
    关注
    评论

报告相同问题?

悬赏问题

  • ¥15 请教一下各位,为什么我这个没有实现模拟点击
  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作