在使用 python 协程下载图片中,最终协程的任务数 卡在 97 一直循环,不知道哪里出了问题,有大佬知道什么情况吗,困扰我好久
下图是我运行结果,附上代码。
# -*- coding: utf-8 -*-
import requests
from lxml import etree
import time
import os
import pandas as pd
import asyncio
import aiohttp
import aiomysql
from random import randint
import cchardet
import aiofiles
import logging
class sikupicture_Spider(object):
def __init__(self):
# self.seens_url = []
self.loop = asyncio.get_event_loop()
self.queue = asyncio.PriorityQueue()
self._workers = 0 # 当前工作数
self._max_workers = 150 # 最大工作数
self.overtime = {} # {url: times,} 记录失败的URL的次数
self.overtime_threshold = 4
self.headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36",
}
self.list_content = []
async def init_url(self):
info = pd.read_excel(r"{}".format(os.path.abspath('moban.xlsx'))).fillna('')
for ite in info.itertuples():
await self.queue.put((randint(1, 5), getattr(ite, 'url')))
async def fetch(self, session, url, timeout, headers=None, binary=False, proxy=None):
_headers = self.headers
if headers:
_headers = headers
try:
async with session.get(url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp:
status_code = resp.status
if status_code == 403:
print("url-403", url)
if url in self.overtime:
self.overtime[url] += 1
if self.overtime[url] > self.overtime_threshold:
pass
await self.queue.put((randint(1, 5), url))
else:
self.overtime[url] = 1
await self.queue.put((randint(1, 5), url))
status_code = 0
html = None
if binary:
text = await resp.read()
encoding = cchardet.detect(text)
html = text.encode(encoding, errors='ignore')
else:
html = await resp.text()
except TimeoutError:
print("url-overtime", url)
if url in self.overtime:
self.overtime[url] += 1
if self.overtime[url] > self.overtime_threshold:
pass
await self.queue.put((randint(1, 5), url))
else:
self.overtime[url] = 1
await self.queue.put((randint(1, 5), url))
status_code = 0
html = None
return status_code, html
async def download_img(self, session, img_url, timeout, url, headers=None, binary=True, proxy=None):
_headers = self.headers
if headers:
_headers = headers
try:
async with session.get(img_url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp:
status_code = resp.status
if binary:
html = await resp.read()
else:
html = await resp.text()
except TimeoutError:
print("url-overtime", img_url)
if url in self.overtime:
self.overtime[url] += 1
if self.overtime[url] > self.overtime_threshold:
pass
else:
await self.queue.put((randint(1, 5), url))
else:
self.overtime[url] = 1
await self.queue.put((randint(1, 5), url))
status_code = 0
html = None
return status_code, html
def parse_source(self, source):
try:
response_1 = etree.HTML(source)
except Exception as err:
logging.error(f'parse error:{err}')
url = ""
else:
img_url = response_1.xpath("//a[@href='javascript:;']/@supsrc")[0] if len(
response_1.xpath("//a[@href='javascript:;']/@supsrc")[0]) else ""
return img_url
async def process(self, session, url, timeout):
status, source = await self.fetch(session, url, timeout)
file_name = url.replace("http://item.secoo.com/", "").replace(".shtml", "")
if status == 200:
img_url = self.parse_source(source)
img_status, img_source = await self.download_img(session, img_url, timeout, url)
if img_status == 200:
async with aiofiles.open("F:\\dawnzhu\\picture\\"+file_name+".jpg", "wb") as f:
await f.write(img_source)
self._workers -= 1
print("任务完成", self._workers, "url_status", status, "img_status", img_status)
else:
self._workers -= 1
print("任务完成", self._workers, "url_status", status,)
async def loop_crawl(self):
await self.init_url()
timeout = aiohttp.ClientTimeout(total=20)
conn = aiohttp.TCPConnector(loop=self.loop, limit=50, force_close=True, enable_cleanup_closed=True)
session = aiohttp.ClientSession(connector=conn, timeout=timeout)
while True:
if self._workers >= self._max_workers:
print("work 的判断")
await asyncio.sleep(5)
continue
if self.queue.empty():
print("队列是否为空....", self._workers)
await asyncio.sleep(5)
if self._workers == 0:
break
continue
_, url = await self.queue.get()
asyncio.ensure_future(self.process(session, url, timeout))
self._workers += 1
print("队列剩余数量", self.queue.qsize(), self._workers)
await session.close()
def run(self):
try:
self.loop.run_until_complete(self.loop_crawl())
except KeyboardInterrupt:
self.loop.close()
if __name__ == '__main__':
sp = sikupicture_Spider()
sp.run()