题目是:爬取蜗牛笔记上全部帖子信息
http://www.woniuxy.com/note/page-1
具体爬取信息:标题 日期 阅读数量 文章类型
应该怎么改目前运行啥也没有
spider_main
from task2_woniu.url_manager_zwj import URLManager
from task2_woniu.data_output_zwj import DataOutput
from task2_woniu.html_downloader_zwj import HTMLDownloader
from task2_woniu.html_parser_zwj import HTMLParser
import time
class SpiderMain:
def __init__(self):
self.manager=URLManager()
self.parser=HTMLParser()
self.downloader=HTMLDownloader()
self.output=DataOutput()
def crawl(self,root_url):
pass
def crawl(self,root_url):
self.manager.add_new_url(root_url)
while self.manager.has_new_url:
try:
new_url=self.manager.get_new_url()
print("开始抓取{}".format(new_url))
html=self.downloader.download(url=new_url)
new_urls,data=self.parser.parser(new_url,html)
if data:
#self.output.save_csv(data)
self.output.collect_data(data)
if new_urls:
self.manager.add_new_urls(new_urls)
print("已抓取()个链接,还有()个未抓取",new_urls,data)
time.sleep(5)
except Exception as e:
print(e,",crawl failed")
self.output.output_file()
if __name__=='__main__':
spider_main=SpiderMain()
spider_main.crawl("http://www.woniuxy.com/note/page-1")
html_parser
import re
from bs4 import BeautifulSoup
from urllib.parse import urljoin
class HTMLParser:
def __init__(self):
self.base_url="http://www.woniuxy.com/"
def parser(self,page_url,html_content):
try:
html=etree.HTML(html_content)
new_urls=None
new_data=None
if ("page" in page_url):
new_urls=self._get_new_urls(html)
else:
new_data=self._get_new_data(html)
return new_urls,new_data
except Exception:
return [page_url],None
def _get_new_urls(self,html):
new_urls=set()
detail_links=html.xpath("//div[@id='content']/a")
for link in detail_links:
new_url=link.get('href')
new_full_url=urljoin(self.base_url,new_url)
print('新文章链接:'+new_full_url)
new_urls.add(new_full_url)
if len(html.xpath("//*[text()='下一页']")) !=0:
next_page_link=html.xpath("//*[text()='下一页']")[0]
print(next_page_link)
print('下一页链接:'+urljoin(self.base_url,next_page_link.get('href')))
new_urls.add(urljoin(self.base_url,next_page_link.get('href')))
return new_urls
else:
return ""
def _get_new_data(self,html):
bs = BeautifulSoup(html, 'lxml')
data=dict()
title=bs.find_all(class_="col-lg-10 col-md-10 col-sm-10 col-xs-10 title")[0].string.strip()
data['title']=title.get_text()
#data['title']=html.xpath("//div[contains(@class,'title')]")[0].text.strip()
info = bs.find_all(class_="col-lg-12 col-md-12 col-sm-12 col-xs-12 info")
date_num = info[0].string.strip().find("日期:")
read_num = info[0].string.strip().find("阅读:")
end_num = info[0].string.strip().find("消耗积分:")
type_num=info[0].string.strip().find("文章类型:")
date = info[0].string.strip()[date_num + 3:read_num].strip()
read = info[0].string.strip()[read_num + 3:end_num - 4].strip()
type=info[0].string.strip()[type_num].strip()
data['date']=date.get_text()
#data['date']=html.xpath("//div[contains(@class,'date')]")[0].text.strip()
data['readcount']=read.get_text()
#data['readcount']=html.xpath("//div[contains(@class,'readcount')]")[0].text.strip()
data['type']=type.get_text()
return data
url_manager
class URLManager:
def __init__(self):
# 定义用于保存待爬取URL地址的集合
self.new_urls=set()
# 定义用于保存已爬取URL地址的集合
self.old_urls=set()
#判断集合中是否还有待爬取的url地址
def has_new_url(self):
#判断是否还有待爬的新url地址
return self.new_url_size() != 0
#获取一个待爬取的url地址
def get_new_url(self):
#从待爬url地址列表取出一个地址
new_url=self.new_urls.pop()
#然后将这个url地址放入到已爬队列中
self.old_urls.add(new_url)
return new_url
#新增一个待爬取的url地址
def add_new_url(self,url):
if url and url not in self.old_urls:#url非空且不在已爬url网址集合中
self.new_urls.add(url)#添加到待爬网址集合中
def add_new_urls(self,urls):
if urls is None or len(urls) == 0:
print('url地址为空或长度为0,添加new urls失败')
return
for url in urls:
self.add_new_url(url)
def new_url_size(self):
return len(self.new_urls)
def old_url_size(self):
return len(self.old_urls)
html_downloder
import requests
from faker import Faker
class HTMLDownloader:
def __init__(self):
fake=Faker()
self.user_agent=fake.user_agent()
def download(self,url):
if url:
headers={'User-Agent':self.user_agent}
r=requests.get(url,headers=headers)
if r.status_code ==200:
r.encoding="utf-8"
return r.text
return None
data_output
import csv
class DataOutput:
def __init__(self):
self.datas=[]
def store_data(self,data):
if data is None:
return
self.datas.append(data)
def output_file(self):
header = ['标题', '发布时间', '阅读次数']
with open('zwj.csv', "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=header)
writer.writeheader()
writer.writerows(self.datas)
print("数据已经写入成功!!!")