weixin_54686435 2021-07-22 19:16 采纳率: 61.5%
浏览 26
已结题

多进程的爬虫,如何实现两个进程直接的通信


import requests
from lxml import etree
from threading import Thread, Lock
from fake_useragent import UserAgent
from queue import Queue


class Get_url(Thread):
    def __init__(self, url_queue, lock):
        Thread.__init__(self)
        self.url_quequ = url_queue
        self.lock = lock

    def run(self):
        while not self.url_quequ.empty():
            url = self.url_quequ.get()
            # print(url)
            headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'}
            resp = requests.get(url, headers=headers)
            e = etree.HTML(resp.text)
            contents = e.xpath('//div[@class="info clear"]/div/a/@href')
            # for content in contents:
            #     print(content)
            with open('lianjia.txt', 'a', encoding='utf-8')as f:
                for content in contents:
                    f.write(content + ',')


class Get_html(Thread):
    def __init__(self, url_queues, locks):
        Thread.__init__(self)
        self.url_queues = url_queue
        self.locks = lock

    def run(self):
        while not self.url_queues.empty():
            url = self.url_queues.get()
            headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'}
            resp = requests.get(url, headers=headers)
            e = etree.HTML(resp.text)
            name = e.xpath('//div[@class="room"]/div[1]/text()')
            print(name)


if __name__ == '__main__':
    base_url = 'https://bj.lianjia.com/ershoufang/pg{}/'
    lock = Lock()
    url_queue = Queue()
    for num in range(1, 100):
        url_queue.put(base_url.format(num))
    for i in range(3):
        get_url = Get_url(url_queue, lock)
        get_url.start()


    # locks = Lock()
    # url_queues = Queue()
    # for url in all_url:
    #     url_queues.put(url)
    # for m in range(10):
    #     get_html = Get_html(url_queues, locks)
    #     get_html.start()

我该如何将我获取到的url给到下面的解析呢?这两个进程之间该如何通信?

  • 写回答

1条回答 默认 最新

  • weixin_45864635 2021-07-23 13:36
    关注

    给你个多线程的实例 至少我下了一步电视剧
    # 下载m3u8视频(多线程下载)
    import re, requests, os, json, datetime
    import threading
    from threading import Thread
    import queue
    from lxml import etree
    from urllib.parse import unquote
    # import ssl

    requests.packages.urllib3.disable_warnings()
    
    
    class M3u8(object):
    
        def __init__(self, url_m3u8, name, path):
            """
            多进程下载m3u8视频,以mp4格式保存
            :param url_m3u8:m3u8视频链接
            :param name:视频文件保存名称
            :param path:视频文件保存地址(地址不存在会自动创建)
            """
            self.url_m3u8 = url_m3u8
            self.path = path
            self.name = name
            # self.cpu_count = multiprocessing.cpu_count()   # cpu进程数,多进程使用
            self.thread_pool = 15  # 定义线程池数量
            self.start_time = datetime.datetime.now().replace(microsecond=0)
            self.error_file = self.path + '\\' + self.name + '_error.txt'  # 下载片段出错信息
            self.path_ts = self.path + '\\' + 'ts'  # 下载ts文件目录,下载完毕 后 合并删除文件
            try:
                os.makedirs(self.path_ts)
            except:
                print(self.path_ts + ':目录已存在')
            self.headers = {'Accept': '*/*',
                            'Accept-Language': 'zh-CN,zh;q=0.9',
                            'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_4 like Mac OS X) AppleWebKit/605.1.15 '
                                          '(KHTML, like Gecko) Version/13.1 Mobile/15E148 Safari/604.1'}
            self.run()
    
        def run(self):
            # 可直接传入m3u8视频链接
            # 1.get到文件内容
    
            req = self.get(self.url_m3u8)
    
            # 2.解析出ts视频列表
            # 使用parse1返回ts列表
            ts_list = self.parse1(self.url_m3u8, req)
            if ts_list:
                print('parse1解析成功')
            if not ts_list:
                # 第一种parse1方法返回ts列表为空时,使用parse2方法
                ts_list = self.parse2(self.url_m3u8, req)
                if ts_list:
                    print('parse2成功解析')
            if not ts_list:
                # 第二种parse2方法返回ts列表为空时,使用parse3方法
                ts_list = self.parse3(self.url_m3u8, req)
                if ts_list:
                    print('parse3成功解析')
            if not ts_list:
                # 第二种parse3方法返回ts列表为空时,使用parse4方法
                ts_list = self.parse4(self.url_m3u8, req)
                if ts_list:
                    print('parse4成功解析')
    
            # 3.传入ts文件列表,多进程下载ts
            self.download_process(ts_list)
    
            # 4. 下载出错的视频片段重新下载
            if os.path.exists(self.error_file):
                self.error()
    
            # 5.1 ts文件排序
            ls = os.listdir(self.path_ts)
            num_ls = []
            for i in ls:
                try:
                    st = re.search(r'\d+', i).group(0)
                    st = int(st)
                    num_ls.append(st)
                except:
                    pass
            num_ls = sorted(num_ls)
            new_ls = []
    
            for i in num_ls:
                new_str = str(i) + '.ts'
                new_ls.append(new_str)
            ls = new_ls
            # 5.2合并
            print('一共%d个文件' % len(ls))
            with open(self.path + '\\' + self.name + '.mp4', 'ab') as fp:
                for i in ls:
                    with open(self.path_ts + '\\' + i, 'rb') as ts:
                        fp.write(ts.read())
            # 5.3删除ts文件
            for i in ls:
                os.remove(self.path_ts + '\\' + i)
            os.removedirs(self.path_ts)
            end = datetime.datetime.now().replace(microsecond=0)
            print("耗时:%s" % (end - self.start_time))
    
        def get(self, url):
            headers = {'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_4 like Mac OS X) AppleWebKit/605.1.15 '
                                     '(KHTML, like Gecko) Version/13.1 Mobile/15E148 Safari/604.1'}
            req = requests.get(url, headers=headers, verify=False)
            if req.status_code == 200:
                return req.text
            else:
                ex = Exception('请求状态为:', req.status_code)
                raise ex
    
        def parse1(self, url, req):
            # 重构方式是:链接地址去掉index.m3u8 + 匹配到的链接,不需要需要重新请求新的m3u8文件
            url_1 = url.replace(url.split('/')[-1], '')
            # 直接解析ts文件
            re_list = re.findall(r'\n(h.+ts)', req)
            key = re.findall(r'(h.+key)',req)
            if key:
                re_list.append(key[0])
            ts_list = []
            # 不是绝对路径的话
            if re_list:
                if 'http' not in re_list[0]:
                    for i in re_list:
                        mv_url = url_1 + i
                        ts_list.append(mv_url)
                    return ts_list
                # 是绝对路径的话
                else:
                    ts_list = re_list
                    return ts_list
            else:
                return ts_list
    
        def parse2(self, url, req):
            # 重构方式是:链接地址去掉index.m3u8 + 匹配到的链接,需要重新请求新的m3u8文件
            url_1 = url.replace(url.split('/')[-1], '')
            lines = req.split('\n')
            for line in lines:
                if (line[0] != '#') and (line[0] != ""):
                    # print(line)
                    url_2 = line
                    new_url = url_1 + url_2
                    # print('新链接',new_url)
                    try:  # 如果请求错误,返回空列表
                        req = self.get(new_url)
                        ts_list = self.parse1(new_url, req)
                        return ts_list
                    except:
                        ts_list = []
                        return ts_list
    
        def parse3(self, url, req):
            # 构建方式为:主机名 + 匹配到的链接,需要重新请求新的m3u8文件
            url_1 = url.split('/')[0] + '//' + url.split('/')[2]
            lines = req.split('\n')
            ts_list = []
            for line in lines:
                if (line[0] != '#') and (line[0] != ""):
                    url_2 = line
                    new_url = url_1 + url_2
                    # print('新链接',new_url)
                    try:  # 请求新连接正常时
                        req_new = self.get(new_url)
                        re_list = re.findall(r'\n(.+ts)', req_new)
                        if 'http' not in re_list[0]:
                            # 如果不是绝对地址的话,主机名 + 匹配到的链接重构url
                            for i in re_list:
                                mv_url = url_1 + i
                                ts_list.append(mv_url)
                            return ts_list
                        else:
                            # 如果是绝对地址的话,返回
                            ts_list = re_list
                            return ts_list
                    except:  # 请求新连接出错时返回空列表
                        return ts_list
    
        def parse4(self, url, req):
            # 构建方式为:主机名 + 匹配到的链接,不需要重新请求新的m3u8文件
            url_1 = url.split('/')[0] + '//' + url.split('/')[2]
            # 直接解析ts文件
            ts_list = []
            re_list = re.findall(r'\n(.+ts)', req)
            try:
                if 'http' not in re_list[0]:
                    # 如果不是绝对地址的话,主机名 + 匹配到的链接重构url
                    for i in re_list:
                        mv_url = url_1 + i
                        ts_list.append(mv_url)
                    return ts_list
                else:
                    # 如果是绝对地址的话,返回
                    ts_list = re_list
                    return ts_list
            except:
                print('parse4解析出错,请检查链接是否正确,响应内容为:\n%s' % req)
                exit()
    
        def download_process(self, urls):
            q = queue.Queue()
            for num, url in enumerate(urls, 1):
                ts_flie_name = self.path_ts + "\\" + str(num) + '.ts'
                jindu = [num, len(urls)]
                q_dic = {}
                q_dic['url'] = url
                q_dic['ts_flie_name'] = ts_flie_name
                q_dic['jindu'] = jindu
                q.put(q_dic)
    
            start_num = len(threading.enumerate())
    
            thread_pool = self.thread_pool + start_num
            while True:
                if threading.active_count() <= thread_pool:
                    q_dic = q.get()
                    if not os.path.exists(q_dic['ts_flie_name']):
                        Thread(target=self.download, args=(q_dic['url'], q_dic['ts_flie_name'], q_dic['jindu'])).start()
                    if q.empty():
                        break
            while True:
                if threading.active_count() <= start_num:
                    break
            print('---end---')
    
        def download(self, url, ts_flie_name, jindu):
    
            print(self.name + ':进度%d/%d' % (jindu[0], jindu[1]), end=" ")
            print("下载:", url, '地址:', ts_flie_name)
            try:
                response = requests.get(url, stream=True, verify=False, timeout=20, headers=self.headers)
            except Exception as e:
                print("异常请求:%s" % e.args)
                error_dic = {}
                error_dic['url'] = url
                error_dic['ts_flie_name'] = ts_flie_name
    
                with open(self.error_file, 'a', encoding='utf-8') as fp:
                    fp.write(json.dumps(error_dic, ensure_ascii=False) + "\n")
                return
    
    
            with open(ts_flie_name, "wb") as file:
                for chunk in response.iter_content(chunk_size=1024):
                    if chunk:
                        file.write(chunk)
    
        def error(self):
            print('重新下载出错视频')
            with open(self.error_file, 'r', encoding='utf-8') as fp:
                lines = fp.readlines()
            with open(self.error_file, 'a', encoding='utf-8') as fp:
                fp.write('\n' + '-' * 50 + '以上出错视频已下载' + '-' * 50)
    
            q = queue.Queue()
            for num, line in enumerate(lines, 1):
                line = line.strip()
                dic = json.loads(line)
                dic['jindu'] = [num, len(lines)]
                q.put(dic)
    
            start_num = len(threading.enumerate())
            thread_pool = self.thread_pool + start_num
            while True:
                if threading.active_count() <= thread_pool:
                    q_dic = q.get()
                    Thread(target=self.download, args=(q_dic['url'], q_dic['ts_flie_name'], q_dic['jindu'])).start()
                    if q.empty():
                        break
            while True:
                if threading.active_count() <= start_num:
                    break
            print('---end---')
    
    
    if __name__ == '__main__':
        path = 'c:\\aa5'
        url = 'https://www.y3600.com/hanju/86.html'
        headers = {'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_4 like Mac OS X) AppleWebKit/605.1.15 '
                                 '(KHTML, like Gecko) Version/13.1 Mobile/15E148 Safari/604.1'}
        req = requests.get(url, headers=headers, verify=False)
        html = etree.HTML(req.text)
        aa = html.xpath("//div[@id='playlist']/div[2]/ul[@class='order42']/li/a/@onclick")
        bb = html.xpath("//div[@id='playlist']/div[2]/ul[@class='order42']/li/a/text()")
        print(bb)
        cc = []
        for i in aa:
            cc.append(unquote(re.findall(r'(h.+3u8)',i)[0].replace('^','%')))
        q = [*zip(bb,cc)]
        for i in q:
            print(i)
            if os.path.exists('c:\\aa5\\'+ i[0]+ '.mp4'):
                continue
            else:
                m3u8 = M3u8(i[1], i[0], path)
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 系统已结题 7月31日
  • 已采纳回答 7月23日
  • 创建了问题 7月22日

悬赏问题

  • ¥15 关于#stm32#的问题:CANOpen的PDO同步传输问题
  • ¥20 yolov5自定义Prune报错,如何解决?
  • ¥15 电磁场的matlab仿真
  • ¥15 mars2d在vue3中的引入问题
  • ¥50 h5唤醒支付宝并跳转至向小荷包转账界面
  • ¥15 算法题:数的划分,用记忆化DFS做WA求调
  • ¥15 chatglm-6b应用到django项目中,模型加载失败
  • ¥15 CreateBitmapFromWicBitmap内存释放问题。
  • ¥30 win c++ socket
  • ¥15 C# datagridview 栏位进度