最近学爬虫的进度到了代理池,一直在看的书是崔庆才的书,但是由于书中的代码比较老,就去克隆了他GitHub上的源代码(6月更新的)。然后发现proxypool库报错,报错如下:
但是我已经pip安装了proxypool库,然后点进去一看发现该库只有一个代码文件:
查了很多CSDN上的东西,未能得到解答。我在想,是不是因为这个库更新删除了这些方法,才导致报错的。麻烦大家不吝赐教,解答一下,非常感谢!学习过程中遇到的困难有点多,有赖于各位的帮助,相信曙光就在眼前!
为了方便,源代码奉上:
```python
#存储模块
import redis
from proxypool.exceptions import PoolEmptyException
from proxypool.schemas.proxy import Proxy
from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_DB, REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MIN, \
PROXY_SCORE_INIT
from random import choice
from typing import List
from loguru import logger
from proxypool.utils.proxy import is_valid_proxy, convert_proxy_or_proxies
REDIS_CLIENT_VERSION = redis.__version__
IS_REDIS_VERSION_2 = REDIS_CLIENT_VERSION.startswith('2.')
class RedisClient(object):
"""
redis connection client of proxypool
"""
def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=REDIS_DB, **kwargs):
"""
init redis client
:param host: redis host
:param port: redis port
:param password: redis password
"""
self.db = redis.StrictRedis(host=host, port=port, password=password, db=db, decode_responses=True, **kwargs)
def add(self, proxy: Proxy, score=PROXY_SCORE_INIT) -> int:
"""
add proxy and set it to init score
:param proxy: proxy, ip:port, like 8.8.8.8:88
:param score: int score
:return: result
"""
if not is_valid_proxy(f'{proxy.host}:{proxy.port}'):
logger.info(f'invalid proxy {proxy}, throw it')
return
if not self.exists(proxy):
if IS_REDIS_VERSION_2:
return self.db.zadd(REDIS_KEY, score, proxy.string())
return self.db.zadd(REDIS_KEY, {proxy.string(): score})
def random(self) -> Proxy:
"""
get random proxy
firstly try to get proxy with max score
if not exists, try to get proxy by rank
if not exists, raise error
:return: proxy, like 8.8.8.8:8
"""
# try to get proxy with max score
proxies = self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MAX , PROXY_SCORE_MAX)
if len(proxies):
return convert_proxy_or_proxies(choice(proxies))
# else get proxy by rank
proxies = self.db.zrevrange(REDIS_KEY, PROXY_SCORE_MIN , PROXY_SCORE_MAX)
if len(proxies):
return convert_proxy_or_proxies(choice(proxies))
# else raise error
raise PoolEmptyException
def decrease(self, proxy: Proxy) -> int:
"""
decrease score of proxy, if small than PROXY_SCORE_MIN, delete it
:param proxy: proxy
:return: new score
"""
if IS_REDIS_VERSION_2:
self.db.zincrby(REDIS_KEY, proxy.string(), -1)
else:
self.db.zincrby(REDIS_KEY, -1, proxy.string())
score = self.db.zscore(REDIS_KEY, proxy.string())
logger.info(f'{proxy.string()} score decrease 1, current {score}')
if score <= PROXY_SCORE_MIN:
logger.info(f'{proxy.string()} current score {score}, remove')
self.db.zrem(REDIS_KEY, proxy.string())
def exists(self, proxy: Proxy) -> bool:
"""
if proxy exists
:param proxy: proxy
:return: if exists, bool
"""
return not self.db.zscore(REDIS_KEY, proxy.string()) is None
def max(self, proxy: Proxy) -> int:
"""
set proxy to max score
:param proxy: proxy
:return: new score
"""
logger.info(f'{proxy.string()} is valid, set to {PROXY_SCORE_MAX}')
if IS_REDIS_VERSION_2:
return self.db.zadd(REDIS_KEY, PROXY_SCORE_MAX, proxy.string())
return self.db.zadd(REDIS_KEY, {proxy.string(): PROXY_SCORE_MAX})
def count(self) -> int:
"""
get count of proxies
:return: count, int
"""
return self.db.zcard(REDIS_KEY)
def all(self) -> List[Proxy]:
"""
get all proxies
:return: list of proxies
"""
return convert_proxy_or_proxies(self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX))
def batch(self, cursor, count) -> List[Proxy]:
"""
get batch of proxies
:param cursor: scan cursor
:param count: scan count
:return: list of proxies
"""
cursor, proxies = self.db.zscan(REDIS_KEY, cursor, count=count)
return cursor, convert_proxy_or_proxies([i[0] for i in proxies])
if __name__ == '__main__':
conn = RedisClient()
result = conn.random()
print(result)
#获取模块
from retrying import retry
import requests
from loguru import logger
from proxypool.setting import GET_TIMEOUT
from fake_headers import Headers
import time
class BaseCrawler(object):
urls = []
@retry(stop_max_attempt_number=3, retry_on_result=lambda x: x is None, wait_fixed=2000)
def fetch(self, url, **kwargs):
try:
headers = Headers(headers=True).generate()
kwargs.setdefault('timeout', GET_TIMEOUT)
kwargs.setdefault('verify', False)
kwargs.setdefault('headers', headers)
response = requests.get(url, **kwargs)
if response.status_code == 200:
response.encoding = 'utf-8'
return response.text
except requests.ConnectionError:
return
@logger.catch
def crawl(self):
"""
crawl main method
"""
for url in self.urls:
logger.info(f'fetching {url}')
html = self.fetch(url)
time.sleep(.5)
for proxy in self.parse(html):
logger.info(f'fetched proxy {proxy.string()} from {url}')
yield proxy
#站大爷
from pyquery import PyQuery as pq
from proxypool.schemas.proxy import Proxy
from proxypool.crawlers.base import BaseCrawler
from loguru import logger
import re
BASE_URL = 'https://www.zdaye.com/dayProxy/{page}.html'
MAX_PAGE = 5 * 2
class ZhandayeCrawler(BaseCrawler):
"""
zhandaye crawler,https://www.zdaye.com/dayProxy.html
"""
urls_catalog = [BASE_URL.format(page=page) for page in range(1, MAX_PAGE)]
headers = {
'User-Agent': 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36'
}
urls = []
ignore = True
def crawl(self):
self.crawl_catalog()
yield from super().crawl()
def crawl_catalog(self):
for url in self.urls_catalog:
logger.info(f'fetching {url}')
html = self.fetch(url, headers=self.headers)
self.parse_catalog(html)
def parse_catalog(self, html):
"""
parse html file to get proxies
:return:
"""
doc = pq(html)
for item in doc('#J_posts_list .thread_item div div p a').items():
url = 'https://www.zdaye.com' + item.attr('href')
logger.info(f'get detail url: {url}')
self.urls.append(url)
def parse(self, html):
doc = pq(html)
trs = doc('.cont br').items()
for tr in trs:
line = tr[0].tail
match = re.search(r'(\d+\.\d+\.\d+\.\d+):(\d+)', line)
if match:
host = match.group(1)
port = match.group(2)
yield Proxy(host=host, port=port)
if __name__ == '__main__':
crawler = ZhandayeCrawler()
for proxy in crawler.crawl():
print(proxy)
#ip89
from proxypool.schemas.proxy import Proxy
from proxypool.crawlers.base import BaseCrawler
import re
MAX_NUM = 9999
BASE_URL = 'http://api.89ip.cn/tqdl.html?api=1&num={MAX_NUM}&port=&address=&isp='.format(MAX_NUM=MAX_NUM)
class Ip89Crawler(BaseCrawler):
"""
89ip crawler, http://api.89ip.cn
"""
urls = [BASE_URL]
def parse(self, html):
"""
parse html file to get proxies
:return:
"""
ip_address = re.compile('([\d:\.]*)<br>')
hosts_ports = ip_address.findall(html)
for addr in hosts_ports:
addr_split = addr.split(':')
if (len(addr_split) == 2):
host = addr_split[0]
port = addr_split[1]
yield Proxy(host=host, port=port)
if __name__ == '__main__':
crawler = Ip89Crawler()
for proxy in crawler.crawl():
print(proxy)
#快代理
from proxypool.crawlers.base import BaseCrawler
from proxypool.schemas.proxy import Proxy
import re
from pyquery import PyQuery as pq
BASE_URL = 'https://www.kuaidaili.com/free/{type}/{page}/'
MAX_PAGE = 300
class KuaidailiCrawler(BaseCrawler):
"""
kuaidaili crawler, https://www.kuaidaili.com/
"""
urls = [BASE_URL.format(type=type, page=page) for type in ('intr', 'inha') for page in range(1, MAX_PAGE + 1)]
def parse(self, html):
"""
parse html file to get proxies
:return:
"""
doc = pq(html)
for item in doc('table tr').items():
td_ip = item.find('td[data-title="IP"]').text()
td_port = item.find('td[data-title="PORT"]').text()
if td_ip and td_port:
yield Proxy(host=td_ip, port=td_port)
if __name__ == '__main__':
crawler = KuaidailiCrawler()
for proxy in crawler.crawl():
print(proxy)
#太阳ip
from proxypool.schemas.proxy import Proxy
from proxypool.crawlers.base import BaseCrawler
from pyquery import PyQuery as pq
BaseUrl = 'http://www.taiyanghttp.com/free/page{num}'
MAX_PAGE = 5
class TaiyangdailiCrawler(BaseCrawler):
"""
taiyangdaili crawler, http://www.taiyanghttp.com/free/
"""
urls = [BaseUrl.format(num=i) for i in range(1, 6)]
def parse(self, html):
"""
parse html file to get proxies
:return:
"""
doc = pq(html)
trs = doc('#ip_list .tr.ip_tr').items()
for tr in trs:
host = tr.find('div:nth-child(1)').text()
port = tr.find('div:nth-child(2)').text()
yield Proxy(host=host, port=port)
if __name__ == '__main__':
crawler = TaiyangdailiCrawler()
for proxy in crawler.crawl():
print(proxy)
#检测模块
from proxypool.schemas import Proxy
def is_valid_proxy(data):
if data.__contains__(':'):
ip = data.split(':')[0]
port = data.split(':')[1]
return is_ip_valid(ip) and is_port_valid(port)
else:
return is_ip_valid(data)
def is_ip_valid(ip):
a = ip.split('.')
if len(a) != 4:
return False
for x in a:
if not x.isdigit():
return False
i = int(x)
if i < 0 or i > 255:
return False
return True
def is_port_valid(port):
return port.isdigit()
def convert_proxy_or_proxies(data):
"""
convert list of str to valid proxies or proxy
:param data:
:return:
"""
if not data:
return None
# if list of proxies
if isinstance(data, list):
result = []
for item in data:
# skip invalid item
item = item.strip()
if not is_valid_proxy(item): continue
host, port = item.split(':')
result.append(Proxy(host=host, port=int(port)))
return result
if isinstance(data, str) and is_valid_proxy(data):
host, port = data.split(':')
return Proxy(host=host, port=int(port))
from proxypool.schemas import Proxy
def is_valid_proxy(data):
if data.__contains__(':'):
ip = data.split(':')[0]
port = data.split(':')[1]
return is_ip_valid(ip) and is_port_valid(port)
else:
return is_ip_valid(data)
def is_ip_valid(ip):
a = ip.split('.')
if len(a) != 4:
return False
for x in a:
if not x.isdigit():
return False
i = int(x)
if i < 0 or i > 255:
return False
return True
def is_port_valid(port):
return port.isdigit()
def convert_proxy_or_proxies(data):
"""
convert list of str to valid proxies or proxy
:param data:
:return:
"""
if not data:
return None
# if list of proxies
if isinstance(data, list):
result = []
for item in data:
# skip invalid item
item = item.strip()
if not is_valid_proxy(item): continue
host, port = item.split(':')
result.append(Proxy(host=host, port=int(port)))
return result
if isinstance(data, str) and is_valid_proxy(data):
host, port = data.split(':')
return Proxy(host=host, port=int(port))
#测试模块
import platform
from os.path import dirname, abspath, join
from environs import Env
from loguru import logger
from proxypool.utils.parse import parse_redis_connection_string
env = Env()
env.read_env()
# definition of flags
IS_WINDOWS = platform.system().lower() == 'windows'
# definition of dirs
ROOT_DIR = dirname(dirname(abspath(__file__)))
LOG_DIR = join(ROOT_DIR, env.str('LOG_DIR', 'logs'))
# definition of environments
DEV_MODE, TEST_MODE, PROD_MODE = 'dev', 'test', 'prod'
APP_ENV = env.str('APP_ENV', DEV_MODE).lower()
APP_DEBUG = env.bool('APP_DEBUG', True if APP_ENV == DEV_MODE else False)
APP_DEV = IS_DEV = APP_ENV == DEV_MODE
APP_PROD = IS_PROD = APP_ENV == PROD_MODE
APP_TEST = IS_TEST = APP_ENV == TEST_MODE
# redis host
REDIS_HOST = env.str('REDIS_HOST', '127.0.0.1')
# redis port
REDIS_PORT = env.int('REDIS_PORT', 6379)
# redis password, if no password, set it to None
REDIS_PASSWORD = env.str('REDIS_PASSWORD', None)
# redis db, if no choice, set it to 0
REDIS_DB = env.int('REDIS_DB', 0)
# redis connection string, like redis://[password]@host:port or rediss://[password]@host:port/0
REDIS_CONNECTION_STRING = env.str('REDIS_CONNECTION_STRING', None)
if REDIS_CONNECTION_STRING:
REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_DB = parse_redis_connection_string(REDIS_CONNECTION_STRING)
# redis hash table key name
REDIS_KEY = env.str('REDIS_KEY', 'proxies:universal')
# definition of proxy scores
PROXY_SCORE_MAX = 100
PROXY_SCORE_MIN = 0
PROXY_SCORE_INIT = 10
# definition of proxy number
PROXY_NUMBER_MAX = 50000
PROXY_NUMBER_MIN = 0
# definition of tester cycle, it will test every CYCLE_TESTER second
CYCLE_TESTER = env.int('CYCLE_TESTER', 20)
# definition of getter cycle, it will get proxy every CYCLE_GETTER second
CYCLE_GETTER = env.int('CYCLE_GETTER', 100)
GET_TIMEOUT = env.int('GET_TIMEOUT', 10)
# definition of tester
TEST_URL = env.str('TEST_URL', 'http://www.baidu.com')
TEST_TIMEOUT = env.int('TEST_TIMEOUT', 10)
TEST_BATCH = env.int('TEST_BATCH', 20)
# only save anonymous proxy
TEST_ANONYMOUS = True
# TEST_HEADERS = env.json('TEST_HEADERS', {
# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36',
# })
TEST_VALID_STATUS = env.list('TEST_VALID_STATUS', [200, 206, 302])
# definition of api
API_HOST = env.str('API_HOST', '0.0.0.0')
API_PORT = env.int('API_PORT', 5555)
API_THREADED = env.bool('API_THREADED', True)
# flags of enable
ENABLE_TESTER = env.bool('ENABLE_TESTER', True)
ENABLE_GETTER = env.bool('ENABLE_GETTER', True)
ENABLE_SERVER = env.bool('ENABLE_SERVER', True)
# logger.add(env.str('LOG_RUNTIME_FILE', join(LOG_DIR, 'runtime.log')), level='DEBUG', rotation='1 week', retention='20 days')
# logger.add(env.str('LOG_ERROR_FILE', join(LOG_DIR, 'error.log')), level='ERROR', rotation='1 week')
#调度模块
import time
import multiprocessing
from proxypool.processors.server import app
from proxypool.processors.getter import Getter
from proxypool.processors.tester import Tester
from proxypool.setting import CYCLE_GETTER, CYCLE_TESTER, API_HOST, API_THREADED, API_PORT, ENABLE_SERVER, \
ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS
from loguru import logger
if IS_WINDOWS:
multiprocessing.freeze_support()
tester_process, getter_process, server_process = None, None, None
class Scheduler():
"""
scheduler
"""
def run_tester(self, cycle=CYCLE_TESTER):
"""
run tester
"""
if not ENABLE_TESTER:
logger.info('tester not enabled, exit')
return
tester = Tester()
loop = 0
while True:
logger.debug(f'tester loop {loop} start...')
tester.run()
loop += 1
time.sleep(cycle)
def run_getter(self, cycle=CYCLE_GETTER):
"""
run getter
"""
if not ENABLE_GETTER:
logger.info('getter not enabled, exit')
return
getter = Getter()
loop = 0
while True:
logger.debug(f'getter loop {loop} start...')
getter.run()
loop += 1
time.sleep(cycle)
def run_server(self):
"""
run server for api
"""
if not ENABLE_SERVER:
logger.info('server not enabled, exit')
return
app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)
def run(self):
global tester_process, getter_process, server_process
try:
logger.info('starting proxypool...')
if ENABLE_TESTER:
tester_process = multiprocessing.Process(target=self.run_tester)
logger.info(f'starting tester, pid {tester_process.pid}...')
tester_process.start()
if ENABLE_GETTER:
getter_process = multiprocessing.Process(target=self.run_getter)
logger.info(f'starting getter, pid{getter_process.pid}...')
getter_process.start()
if ENABLE_SERVER:
server_process = multiprocessing.Process(target=self.run_server)
logger.info(f'starting server, pid{server_process.pid}...')
server_process.start()
tester_process.join()
getter_process.join()
server_process.join()
except KeyboardInterrupt:
logger.info('received keyboard interrupt signal')
tester_process.terminate()
getter_process.terminate()
server_process.terminate()
finally:
# must call join method before calling is_alive
tester_process.join()
getter_process.join()
server_process.join()
logger.info(f'tester is {"alive" if tester_process.is_alive() else "dead"}')
logger.info(f'getter is {"alive" if getter_process.is_alive() else "dead"}')
logger.info(f'server is {"alive" if server_process.is_alive() else "dead"}')
logger.info('proxy terminated')
if __name__ == '__main__':
scheduler = Scheduler()
scheduler.run()
```