Dawn_zhu 2020-05-26 19:42 采纳率: 0%
浏览 396
已结题

使用 Python 协程多并发下载 任务数卡住不动

在使用 python 协程下载图片中,最终协程的任务数 卡在 97 一直循环,不知道哪里出了问题,有大佬知道什么情况吗,困扰我好久
下图是我运行结果,附上代码。
图片说明

# -*- coding: utf-8 -*-
import requests
from lxml import etree
import time
import os
import pandas as pd
import asyncio
import aiohttp
import aiomysql
from random import randint
import cchardet
import aiofiles
import logging


class sikupicture_Spider(object):
    def __init__(self):
        # self.seens_url = []
        self.loop = asyncio.get_event_loop()
        self.queue = asyncio.PriorityQueue()
        self._workers = 0  # 当前工作数
        self._max_workers = 150  # 最大工作数
        self.overtime = {}  # {url: times,} 记录失败的URL的次数
        self.overtime_threshold = 4
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36",
        }
        self.list_content = []

    async def init_url(self):
        info = pd.read_excel(r"{}".format(os.path.abspath('moban.xlsx'))).fillna('')
        for ite in info.itertuples():
            await self.queue.put((randint(1, 5), getattr(ite, 'url')))

    async def fetch(self, session, url, timeout, headers=None, binary=False, proxy=None):
        _headers = self.headers
        if headers:
            _headers = headers
        try:
            async with session.get(url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp:
                status_code = resp.status
                if status_code == 403:
                    print("url-403", url)
                    if url in self.overtime:
                        self.overtime[url] += 1
                        if self.overtime[url] > self.overtime_threshold:
                            pass
                        await self.queue.put((randint(1, 5), url))
                    else:
                        self.overtime[url] = 1
                        await self.queue.put((randint(1, 5), url))
                    status_code = 0
                    html = None
                if binary:
                    text = await resp.read()
                    encoding = cchardet.detect(text)
                    html = text.encode(encoding, errors='ignore')
                else:
                    html = await resp.text()

        except TimeoutError:
            print("url-overtime", url)
            if url in self.overtime:
                self.overtime[url] += 1
                if self.overtime[url] > self.overtime_threshold:
                    pass
                await self.queue.put((randint(1, 5), url))
            else:
                self.overtime[url] = 1
                await self.queue.put((randint(1, 5), url))
            status_code = 0
            html = None
        return status_code, html

    async def download_img(self, session, img_url, timeout, url, headers=None, binary=True, proxy=None):
        _headers = self.headers
        if headers:
            _headers = headers
        try:
            async with session.get(img_url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp:
                status_code = resp.status
                if binary:
                    html = await resp.read()
                else:
                    html = await resp.text()
        except TimeoutError:
            print("url-overtime", img_url)
            if url in self.overtime:
                self.overtime[url] += 1
                if self.overtime[url] > self.overtime_threshold:
                    pass
                else:
                    await self.queue.put((randint(1, 5), url))
            else:
                self.overtime[url] = 1
                await self.queue.put((randint(1, 5), url))
            status_code = 0
            html = None
        return status_code, html

    def parse_source(self, source):
        try:
            response_1 = etree.HTML(source)
        except Exception as err:
            logging.error(f'parse error:{err}')
            url = ""
        else:
            img_url = response_1.xpath("//a[@href='javascript:;']/@supsrc")[0] if len(
                response_1.xpath("//a[@href='javascript:;']/@supsrc")[0]) else ""
        return img_url

    async def process(self, session, url, timeout):
        status, source = await self.fetch(session, url, timeout)
        file_name = url.replace("http://item.secoo.com/", "").replace(".shtml", "")
        if status == 200:
            img_url = self.parse_source(source)
            img_status, img_source = await self.download_img(session, img_url, timeout, url)
            if img_status == 200:
                async with aiofiles.open("F:\\dawnzhu\\picture\\"+file_name+".jpg", "wb") as f:
                    await f.write(img_source)
            self._workers -= 1
            print("任务完成", self._workers, "url_status", status, "img_status", img_status)
        else:
            self._workers -= 1
            print("任务完成", self._workers, "url_status", status,)

    async def loop_crawl(self):
        await self.init_url()
        timeout = aiohttp.ClientTimeout(total=20)
        conn = aiohttp.TCPConnector(loop=self.loop, limit=50, force_close=True, enable_cleanup_closed=True)
        session = aiohttp.ClientSession(connector=conn, timeout=timeout)
        while True:
            if self._workers >= self._max_workers:
                print("work 的判断")
                await asyncio.sleep(5)
                continue
            if self.queue.empty():
                print("队列是否为空....", self._workers)
                await asyncio.sleep(5)
                if self._workers == 0:
                    break
                continue
            _, url = await self.queue.get()
            asyncio.ensure_future(self.process(session, url, timeout))
            self._workers += 1
            print("队列剩余数量", self.queue.qsize(), self._workers)
        await session.close()

    def run(self):
        try:
            self.loop.run_until_complete(self.loop_crawl())
        except KeyboardInterrupt:
            self.loop.close()

if __name__ == '__main__':
    sp = sikupicture_Spider()
    sp.run()

  • 写回答

1条回答

  • zqbnqsdsmd 2020-09-17 00:13
    关注
    评论

报告相同问题?

悬赏问题

  • ¥50 如何用脚本实现输入法的热键设置
  • ¥20 我想使用一些网络协议或者部分协议也行,主要想实现类似于traceroute的一定步长内的路由拓扑功能
  • ¥30 深度学习,前后端连接
  • ¥15 孟德尔随机化结果不一致
  • ¥15 apm2.8飞控罗盘bad health,加速度计校准失败
  • ¥15 求解O-S方程的特征值问题给出边界层布拉休斯平行流的中性曲线
  • ¥15 谁有desed数据集呀
  • ¥20 手写数字识别运行c仿真时,程序报错错误代码sim211-100
  • ¥15 关于#hadoop#的问题
  • ¥15 (标签-Python|关键词-socket)