sidney 2024-07-16 20:57 采纳率: 0%
浏览 4

分布式dask无法完成计算tsfresh extract_features

我有一个时间序列dataframe,1千个id,2万行,100列,用tsfresh的MultiprocessingDistributor,参数n_workers=12,在一台64G的单机上花40分钟就可以计算完全量的extract_features。
但如果用ClusterDaskDistributor,worker在4台64G机器上,scheduler是单独一个64G机器,这些机器都没有其他任务,最终却怎么也算不完,sheduler机器最终因为outofmemory把整个任务自我kill了。
如果使用MinimalFCParameters,只扩张10倍左右的因子,MultiprocessingDistributor和ClusterDaskDistributor都能正常结束。
另外,从dashboard上看,worker都能正常结束计算,最后在gather结果的时候,总是不停地增加内存消耗,直至把内存耗尽。但实际上这个任务在单机上用多进程的MultiprocessingDistributor顶峰期才使用了30多G内存就计算完成了。
请问,可能是哪里出了问题?谢谢!

版本信息
python 3.10
tsfresh:0.20.2
dask:2024.7.0
pandas:2.2.2
操作系统:ubuntu 22.04.1 LTS (Jammy Jellyfish)


```python
Distributor = ClusterDaskDistributor("10.23.173.171:8786")
X = extract_features(timeseries_container=data_roll,
                     column_id='id',
                     column_sort='TRADE_DATE',
                     distributor=Distributor)

```

  • 写回答

5条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2024-07-16 20:57
    关注

    引自免费微信小程序:皆我百晓生

    根据您提供的信息,我认为可能的问题出在调度器(例如Dask的ClusterDistributor)和数据集大小之间。虽然您已经尝试使用不同的调度器,并且在单机上使用多进程时可以正常运行,但在多节点集群上,似乎遇到了一些挑战。

    首先,让我们分析一下这个问题的关键点:

    1. 使用的是Dask的ClusterDistributed调度器。
    2. 数据集大小约为1千个ID和2万行,每行有100个特征。
    3. 您在单机上使用多进程(MultiProcessingDistributed)时可以正常工作,表明系统有足够的资源。
    4. 在多节点集群上,尽管调度器(比如ClusterDistributed)可以正确地分配任务给每个节点,但整体来说,计算并没有达到预期的时间效率。

    分析可能的原因

    1. 内存限制:由于您的系统只有一个CPU核心和较少的RAM,这可能导致在多节点环境中,即使调度器能够有效地将任务分发到各个节点,但由于内存不足,最终导致所有任务无法同时执行而被终止。
    2. 网络延迟:尽管您提到没有其他任务,但可能存在网络延迟或其他通信瓶颈,导致某些节点无法及时接收到或发送任务请求。
    3. 资源竞争:当多个任务同时进行时,可能会发生资源竞争,尤其是在大型数据集的情况下,这可能会影响系统的性能。
    4. 任务调度算法:Dask中的ClusterDistributed调度器通常具有一定的并发处理能力,但它可能没有充分利用这一点来最大化任务的并行性。
    5. 硬件配置:如果您正在使用的是较旧的硬件设备,它们可能不足以支持高负载环境下的高效运行。

    解决方案

    为了解决这些问题,您可以考虑以下几个策略:

    1. 提高硬件配置:确保您的集群至少包含足够的CPU核心数和内存来处理大量的计算任务。
    2. 优化任务调度算法:通过调整Dask的默认设置(如num_workers),以及在实际应用中观察和调整,以找到最优的资源利用方式。
    3. 监控和优化网络延迟:检查是否有网络连接中断或延迟,特别是对于远程任务请求和响应。
    4. 使用更高级的任务调度工具:考虑使用专门设计用于大规模计算任务的调度工具,如JoblibParallel等,这些工具通常具有更好的并发处理能力和故障恢复机制。
    5. 适当的数据结构选择:在处理大量数据时,可能需要考虑使用适合的存储和计算数据结构,例如分布式数据库或缓存技术。

    实现示例

    为了演示如何使用joblib库实现类似的操作,我们可以创建一个简单的Python脚本,使用joblib.Paralleljoblib.delayed函数来并行化计算过程。

    import joblib
    from dask.distributed import Client, LocalCluster
    
    # 创建本地Dask集群
    cluster = LocalCluster(n_workers=12)
    
    # 创建Client
    client = Client(cluster)
    
    # 准备数据
    timeseries_container = ...
    
    # 计算提取特征
    def compute_extract_features(timeseries):
        # 这里应该调用tsfresh的extract_features函数
        pass
    
    # 使用joblib并行化计算
    with client:
        results = client.compute(
            [compute_extract_features(timeseries) for timeseries in timeseries_container]
        )
    
    # 获取并行化的结果
    for result in results:
        print(result)
    

    这段代码展示了如何使用joblib.Parallel并行化compute_extract_features函数的结果。注意,这里我们只是简单地打印结果,而在实际应用中,您可能需要进一步处理结果或保存它们。

    请根据您的具体需求和环境调整上述建议。希望这些建议对解决您的问题有所帮助。

    评论 编辑记录

报告相同问题?

问题事件

  • 创建了问题 7月16日

悬赏问题

  • ¥15 35114 SVAC视频验签的问题
  • ¥15 impedancepy
  • ¥15 在虚拟机环境下完成以下,要求截图!
  • ¥15 求往届大挑得奖作品(ppt…)
  • ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
  • ¥50 浦育平台scratch图形化编程
  • ¥20 求这个的原理图 只要原理图
  • ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
  • ¥20 微信的店铺小程序如何修改背景图
  • ¥15 UE5.1局部变量对蓝图不可见