r527665047 2021-07-12 08:52 采纳率: 0%
浏览 96

python中multiprocessing.pool和pandas如何结合使用?

我有一个DataFrame表"all_urls_df",有十亿条数据,一共两列(title列,url列)。对url列进行检测,在DF表中新增status列,如果检测返回值是200,status列值为1,否则为0。

我想采用多进程的方式来实现该功能,部分代码如下:

def pandas_data_washed(df):
pool = multiprocessing.Pool(processes=35)
all_urls_df = df
all_urls_df["status"] = all_urls_df.apply(lambda x: 1 if pool.apply_async(check_url_ok, (x.url,)) else 0, axis=1)
pool.close()
pool.join()


def check_url_ok(url):
    """检测连接是否可用"""
    useragent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36"
    # print("正在检查URL %s" % url)

    try:

        result = requests.get(url, headers={"User-Agent": useragent}, timeout=(5, 5))
        if result.status_code == 200:

            print("访问正常 %s" % url)
            return True
        else:
            print("访问超时 %s" % url)
            return False
    except requests.exceptions.ConnectionError as e:
        # print("URL %s 访问超时" % url)
        # print("URL访问超时")
        print("访问超时 %s" % url)
        return False

现在遇到的问题如下,如果我用pool.apply,"all_urls_df"的status列出来的结果是正确的,但是数据是逐条检测,无法实现进程池并行效果;如果我用pool.apply_async,可以实现并行检测,但是status列的结果却全部都是1。请问我的代码是哪里写错了呢,该如何调整呢?

  • 写回答

2条回答 默认 最新

  • python收藏家 2021-07-12 09:03
    关注

    apply_async 返回的是对象,不能直接if pool.apply_async...

    评论

报告相同问题?

问题事件

  • 修改了问题 7月12日
  • 创建了问题 7月12日

悬赏问题

  • ¥15 matlab数据降噪处理,提高数据的可信度,确保峰值信号的不损失?
  • ¥15 怎么看我在bios每次修改的日志
  • ¥15 python+mysql图书管理系统
  • ¥15 Questasim Error: (vcom-13)
  • ¥15 船舶旋回实验matlab
  • ¥30 SQL 数组,游标,递归覆盖原值
  • ¥15 为什么我的数据接收的那么慢呀有没有完整的 hal 库并 代码呀有的话能不能发我一份并且我用 printf 函数显示处理之后的数据,用 debug 就不能运行了呢
  • ¥20 gitlab 中文路径,无法下载
  • ¥15 用动态规划算法均分纸牌
  • ¥30 udp socket,bind 0.0.0.0 ,如何自动选取用户访问的服务器IP来回复数据