Boolean_01 2025-07-11 11:13 采纳率: 25%
浏览 26

"with torch.cuda.stream()" 为什么会阻塞,而不是异步的?

起因

我在尝试实现dualpipe中的两个model chunk前向和反向之间的overlap。我打算先用torch.cuda.stream简单实现,一下:

def overlapped_forward_backward(
    module0: torch.nn.Module,
    inputs0: List[torch.Tensor],
    labels0: Optional[List[torch.Tensor]],
    loss_masks0: Optional[List[torch.Tensor]],
    loss1: Optional[torch.Tensor],
    outputs1: Optional[List[torch.Tensor]],
    output_grads1: Optional[List[torch.Tensor]],
    forward_step_func: Callable,
    is_last_stage0: bool,
) -> tuple[torch.Tensor, Optional[torch.Tensor]]:
    """
    You should implement custom forward-backward overlap strategy.
    The code below is just an example.
    """
    device = inputs0[0].device
    if not hasattr(overlapped_forward_backward, 'backward_streams'):
        overlapped_forward_backward.backward_streams = {}
    if device not in overlapped_forward_backward.backward_streams:
        overlapped_forward_backward.backward_streams[device] = torch.cuda.Stream(device=device)
    backward_stream = overlapped_forward_backward.backward_streams[device]

    with torch.cuda.stream(backward_stream):
        if loss1 is not None:
            loss1.backward()
            loss1.detach_()
        else:
            run_backward(outputs1, output_grads1)

    if len(inputs0) == 1:
        from megatron.core.utils import get_attr_wrapped_model
        set_input_tensor = get_attr_wrapped_model(module0, "set_input_tensor")
        set_input_tensor(inputs0)
    if is_last_stage0:
        inputs0_with_labels_loss_masks = list(inputs0)
        inputs0_with_labels_loss_masks.append(labels0)
        inputs0_with_labels_loss_masks.append(loss_masks0)
        outputs0, loss_func = forward_step_func(inputs0_with_labels_loss_masks, module0)
    else:
        outputs0, loss_func = forward_step_func(inputs0, module0)
    outputs0 = [outputs0] if isinstance(outputs0, torch.Tensor) else outputs0
    if is_last_stage0:
        loss0 = loss_func(outputs0[0])[0]
    else:
        loss0 = None

    torch.cuda.current_stream().wait_stream(backward_stream)

    return outputs0, loss0

我发现前向和反向并没有overlap,反向那块代码运行时间和没有加stream的情况是相同的。

小实验

于是我做了下面这个小实验:


import torch
import time

# GPU warmup
a = torch.randn(10000, 10000, device='cuda')
b = torch.randn(10000, 10000, device='cuda')
c = torch.mm(a, b)

# Count calculation time
calc_start = time.time()
a = torch.randn(10000, 10000, device='cuda')
b = torch.randn(10000, 10000, device='cuda')
for i in range(100):
    c = torch.mm(a, b)
calc_end = time.time()
print(f"calc time: {calc_end - calc_start}")

# Count stream time
calc_stream = torch.cuda.Stream()
torch.cuda.synchronize()
stream_start = time.time()
with torch.cuda.stream(calc_stream):
    a = torch.randn(10000, 10000, device='cuda')
    b = torch.randn(10000, 10000, device='cuda')
    for i in range(100):
        c = torch.mm(a, b)
stream_end = time.time()
print(f"stream time: {stream_end - stream_start}")

torch.cuda.synchronize()

测试了用stream和不用stream的时间,发现时间是差不多的,stream的时间反而更长:

img


如果增加循环数量的话,使不使用stream的时间都会相应的增加。

问题

按照我的理解,"with torch.cuda.stream()"应该是异步的,不会阻塞,其中的代码块运行时间应该趋近于零。这样才可以和后续的程序并行计算或通信。但是实验结果貌似说明它是阻塞的,整个计算做完之后才会往后继续执行。我代码哪里不对吗,还是有其他可能原因,请指点一下!

  • 写回答

5条回答 默认 最新

  • 檀越@新空间 2025-07-11 11:18
    关注

    上午好☀️☀️☀️️
    本答案参考通义千问

    你的问题非常典型,涉及到 PyTorch 的 CUDA 流(stream)机制 以及其在异步执行中的行为。你提到的 with torch.cuda.stream() 并没有实现预期的异步执行效果,反而看起来像是阻塞的,这背后有多个可能的原因。


    核心问题:with torch.cuda.stream() 是异步的,但需要配合 cuda.synchronize() 或其他机制才能看到效果

    1. with torch.cuda.stream(...): 是异步的,但不会自动启动计算

    • with torch.cuda.stream(stream): 只是将后续操作调度到指定的流中,并不会立即执行
    • 如果你在同一个流中连续执行多个操作(如循环),那么这些操作会按顺序执行并不会并行
    • 因此,如果你只在一个流中做大量计算,时间上和主线程(默认流)是一样的。

    重点:
    with torch.cuda.stream(...): 不会自动异步执行任务,它只是将操作放入该流中,等待该流被调度执行。


    🧠 为什么你的实验结果看起来像阻塞?

    2. 实验代码中没有使用 synchronize() 来测量正确的时间

    stream_start = time.time()
    with torch.cuda.stream(calc_stream):
        a = torch.randn(10000, 10000, device='cuda')
        b = torch.randn(10000, 10000, device='cuda')
        for i in range(100):
            c = torch.mm(a, b)
    stream_end = time.time()
    

    这段代码中,time.time() 记录的是 Python 主线程的时间,而 with torch.cuda.stream(...) 中的操作是异步的,Python 会在进入这个块后立即返回,不等待 GPU 完成计算。因此,stream_end - stream_start 实际上只记录了 Python 线程的时间,而不是 GPU 执行时间。

    重点:
    要测量 GPU 操作的实际耗时,必须使用 torch.cuda.synchronize() 来确保所有操作完成后再计时。


    🔍 正确的实验方法

    修改后的实验代码如下:

    import torch
    import time
    
    # GPU warmup
    a = torch.randn(10000, 10000, device='cuda')
    b = torch.randn(10000, 10000, device='cuda')
    c = torch.mm(a, b)
    
    # Count calculation time
    calc_start = time.time()
    a = torch.randn(10000, 10000, device='cuda')
    b = torch.randn(10000, 10000, device='cuda')
    for i in range(100):
        c = torch.mm(a, b)
    calc_end = time.time()
    print(f"calc time: {calc_end - calc_start}")
    
    # Count stream time
    calc_stream = torch.cuda.Stream()
    torch.cuda.synchronize()  # 确保之前操作完成
    stream_start = time.time()
    
    with torch.cuda.stream(calc_stream):
        a = torch.randn(10000, 10000, device='cuda')
        b = torch.randn(10000, 10000, device='cuda')
        for i in range(100):
            c = torch.mm(a, b)
    
    # 等待流中的任务完成
    torch.cuda.synchronize(calc_stream)
    stream_end = time.time()
    print(f"stream time: {stream_end - stream_start}")
    

    重点:
    必须使用 torch.cuda.synchronize() 来确保流中的任务完成后再计时,否则时间无法反映实际的 GPU 执行时间。


    🛠️ 如何真正实现“前向和反向之间的 overlap”?

    你希望在 前向计算反向计算 之间实现 overlap(重叠),这需要以下几点:

    ✅ 解决方案步骤如下:

    1. 创建两个独立的 CUDA 流

      • 一个用于前向计算(default stream)
      • 一个用于反向计算(backward stream)
    2. 将前向计算放在 default stream 中,反向计算放在另一个流中。

    3. 使用 wait_stream() 来控制依赖关系

      • 在反向计算开始前,让 backward stream 等待 forward stream 完成。
      • 这样可以确保梯度计算只在前向完成后才开始。
    4. 使用 torch.cuda.synchronize() 来同步流,确保计算完成后再继续后续操作。


    ✅ 示例修改后的代码(优化版)

    def overlapped_forward_backward(
        module0: torch.nn.Module,
        inputs0: List[torch.Tensor],
        labels0: Optional[List[torch.Tensor]],
        loss_masks0: Optional[List[torch.Tensor]],
        loss1: Optional[torch.Tensor],
        outputs1: Optional[List[torch.Tensor]],
        output_grads1: Optional[List[torch.Tensor]],
        forward_step_func: Callable,
        is_last_stage0: bool,
    ) -> tuple[torch.Tensor, Optional[torch.Tensor]]:
        """
        You should implement custom forward-backward overlap strategy.
        The code below is just an example.
        """
        device = inputs0[0].device
        if not hasattr(overlapped_forward_backward, 'backward_streams'):
            overlapped_forward_backward.backward_streams = {}
        if device not in overlapped_forward_backward.backward_streams:
            overlapped_forward_backward.backward_streams[device] = torch.cuda.Stream(device=device)
        backward_stream = overlapped_forward_backward.backward_streams[device]
    
        # 前向计算在 default stream
        if len(inputs0) == 1:
            from megatron.core.utils import get_attr_wrapped_model
            set_input_tensor = get_attr_wrapped_model(module0, "set_input_tensor")
            set_input_tensor(inputs0)
        if is_last_stage0:
            inputs0_with_labels_loss_masks = list(inputs0)
            inputs0_with_labels_loss_masks.append(labels0)
            inputs0_with_labels_loss_masks.append(loss_masks0)
            outputs0, loss_func = forward_step_func(inputs0_with_labels_loss_masks, module0)
        else:
            outputs0, loss_func = forward_step_func(inputs0, module0)
        outputs0 = [outputs0] if isinstance(outputs0, torch.Tensor) else outputs0
        if is_last_stage0:
            loss0 = loss_func(outputs0[0])[0]
        else:
            loss0 = None
    
        # 将反向计算放入新的流中
        with torch.cuda.stream(backward_stream):
            if loss1 is not None:
                loss1.backward()
                loss1.detach_()
            else:
                run_backward(outputs1, output_grads1)
    
        # 等待反向流完成(可选)
        # torch.cuda.synchronize(backward_stream)
    
        return outputs0, loss0
    

    重点:
    将前向计算放在 default stream,反向计算放在独立的流中,通过 wait_stream() 控制依赖,即可实现 overlap。


    ✅ 总结

    | 问题 | 原因 | 解决方案 | |------|------|----------| | with torch.cuda.stream(...) 阻塞 | 未使用 synchronize() 测量 GPU 时间 | 使用 torch.cuda.synchronize() 后再计时 | | 前向和反向不能 overlap | 没有分离流或控制依赖 | 使用独立流 + wait_stream() 控制依赖 | | with torch.cuda.stream(...) 表现像同步 | 流中操作顺序执行 | 将不同任务分配到不同流中 |


    📌 最终建议

    • 务必使用 torch.cuda.synchronize() 来测量 GPU 操作的实际耗时
    • 合理分配流,将不同的计算任务(如前向、反向、通信)分配到不同的流中。
    • 使用 wait_stream() 控制依赖关系,以实现真正的 overlap。

    如有更多关于 PyTorch 流机制的问题,欢迎继续提问!

    评论

报告相同问题?

问题事件

  • 创建了问题 7月11日