不溜過客 2025-11-15 09:20 采纳率: 98.5%
浏览 8
已采纳

如何爬取指定公众号全部文章列表?

如何绕过微信公众号反爬虫机制,稳定获取指定公众号的全部历史文章列表?常见问题包括:请求频率限制导致IP被封、登录态(如cookie或token)失效、接口参数加密(如uin、key等动态参数难以模拟)、页面加载依赖JavaScript渲染(需无头浏览器支持)以及公众号搜索结果分页不连续等。此外,部分公众号文章列表采用HTTPS加密传输且存在Referer、User-Agent校验,直接抓包后模拟请求易失败。如何设计高可用的爬虫架构,在合法合规前提下实现增量抓取与数据去重,是该场景下的典型技术挑战。
  • 写回答

2条回答 默认 最新

  • 宝码香车 优质创作者: JavaScript技术领域 2025-11-15 10:48
    关注

    让【宝码香车】来帮您解答,本回答参考DeepSeek编写,并整理提供。

    如果答案能帮助您,请采纳,非常感谢!
    .
    每日一句正能量,让它带给您好心情:每一个清晨,都要告诉自己:今天也要努力,也要快乐。
    .

    如何爬取指定公众号全部文章列表

    核心思路

    基于微信公众号平台的技术特点,推荐采用多策略组合的方式实现稳定爬取。

    技术方案

    1. 基础请求配置

    import requests
    import time
    import random
    
    
    # 请求头配置
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Referer': 'https://mp.weixin.qq.com/',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Connection': 'keep-alive'
    }
    

    2. 代理IP池管理

    class ProxyPool:
        def __init__(self):
            self.proxies = []
            self.current_index = 0
        
        def add_proxy(self, proxy):
            self.proxies.append(proxy)
        
        def get_proxy(self):
            if not self.proxies:
                return None
            proxy = self.proxies[self.current_index]
            self.current_index = (self.current_index + 1) % len(self.proxies)
            return proxy
        
        def remove_proxy(self, proxy):
            if proxy in self.proxies:
                self.proxies.remove(proxy)
    

    3. 请求频率控制

    import time
    from collections import defaultdict
    
    
    class RequestLimiter:
        def __init__(self, max_requests_per_minute=30):
            self.max_requests = max_requests_per_minute
            self.request_times = defaultdict(list)
        
        def wait_if_needed(self, domain):
            current_time = time.time()
            domain_times = self.request_times[domain]
            
            # 移除1分钟前的记录
            domain_times = [t for t in domain_times if current_time - t < 60]
            self.request_times[domain] = domain_times
            
            if len(domain_times) >= self.max_requests:
                sleep_time = 60 - (current_time - domain_times[0])
                if sleep_time > 0:
                    time.sleep(sleep_time)
            
            self.request_times[domain].append(current_time)
    

    4. 无头浏览器方案

    from selenium import webdriver
    from selenium.webdriver.chrome.options import Options
    
    
    def create_headless_browser():
        chrome_options = Options()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--disable-gpu')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument(f'--user-agent={headers["User-Agent"]}')
        
        driver = webdriver.Chrome(options=chrome_options)
        return driver
    
    
    def get_articles_with_selenium(public_account_url):
        driver = create_headless_browser()
        try:
            driver.get(public_account_url)
            time.sleep(3)  # 等待页面加载
            
            # 模拟滚动加载更多内容
            for i in range(5):
                driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                time.sleep(2)
            
            # 提取文章信息
            articles = driver.find_elements_by_css_selector('.weui_media_box')
            article_list = []
            for article in articles:
                title = article.find_element_by_css_selector('.weui_media_title').text
                link = article.find_element_by_css_selector('.weui_media_title').get_attribute('hrefs')
                date = article.find_element_by_css_selector('.weui_media_extra_info').text
                article_list.append({
                    'title': title,
                    'link': link,
                    'date': date
                })
            
            return article_list
        finally:
            driver.quit()
    

    5. 数据存储与去重

    import sqlite3
    import hashlib
    
    
    class ArticleStorage:
        def __init__(self, db_path='articles.db'):
            self.conn = sqlite3.connect(db_path)
            self.create_table()
        
        def create_table(self):
            cursor = self.conn.cursor()
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS articles (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    title TEXT NOT NULL,
                    link TEXT UNIQUE NOT NULL,
                    publish_date TEXT,
                    content_hash TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            self.conn.commit()
        
        def generate_hash(self, content):
            return hashlib.md5(content.encode('utf-8')).hexdigest()
        
        def is_duplicate(self, link, content):
            content_hash = self.generate_hash(content)
            cursor = self.conn.cursor()
            cursor.execute('SELECT id FROM articles WHERE link = ? OR content_hash = ?', 
                          (link, content_hash))
            return cursor.fetchone() is not None
        
        def save_article(self, title, link, publish_date, content):
            if not self.is_duplicate(link, content):
                content_hash = self.generate_hash(content)
                cursor = self.conn.cursor()
                cursor.execute('''
                    INSERT INTO articles (title, link, publish_date, content_hash)
                    VALUES (?, ?, ?, ?)
                ''', (title, link, publish_date, content_hash))
                self.conn.commit()
                return True
            return False
    

    6. 完整爬虫架构

    import logging
    from concurrent.futures import ThreadPoolExecutor
    
    
    class WeChatCrawler:
        def __init__(self):
            self.proxy_pool = ProxyPool()
            self.limiter = RequestLimiter()
            self.storage = ArticleStorage()
            self.setup_logging()
        
        def setup_logging(self):
            logging.basicConfig(
                level=logging.INFO,
                format='%(asctime)s - %(levelname)s - %(message)s'
            )
        
        def crawl_public_account(self, account_name):
            """主爬取流程"""
            try:
                # 策略1: 尝试直接API请求
                articles = self.try_api_crawl(account_name)
                if not articles:
                    # 策略2: 使用无头浏览器
                    articles = self.try_browser_crawl(account_name)
                
                # 保存数据
                for article in articles:
                    self.process_article(article)
                    
            except Exception as e:
                logging.error(f"爬取公众号 {account_name} 失败: {e}")
        
        def process_article(self, article):
            """处理单篇文章"""
            try:
                content = self.fetch_article_content(article['link'])
                if content:
                    self.storage.save_article(
                        article['title'],
                        article['link'],
                        article['date'],
                        content
                    )
            except Exception as e:
                logging.error(f"处理文章失败 {article['title']}: {e}")
        
        def run_batch_crawl(self, account_list, max_workers=3):
            """批量爬取多个公众号"""
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = []
                for account in account_list:
                    future = executor.submit(self.crawl_public_account, account)
                    futures.append(future)
                
                for future in futures:
                    try:
                        future.result(timeout=300)  # 5分钟超时
                    except Exception as e:
                        logging.error(f"任务执行失败: {e}")
    

    注意事项

    1. 合规性要求
    • 遵守 robots.txt
    • 控制请求频率,避免对目标服务器造成压力
    • 仅用于个人学习研究,不得用于商业用途
    1. 技术要点
    • 使用会话保持登录状态
    • 实现请求重试机制
    • 定期更新 User-Agent
    • 监控IP封禁情况并及时切换
    1. 数据质量
    • 实现增量抓取
    • 数据去重处理
    • 异常数据清洗

    这种架构结合了直接请求的高效性和无头浏览器的兼容性,能够在遵守平台规则的前提下实现相对稳定的数据采集。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

问题事件

  • 已采纳回答 11月16日
  • 创建了问题 11月15日