Boolean_01 2025-07-10 19:44 采纳率: 25%
浏览 21

"with torch.cuda.stream()" 为什么会阻塞,而不是异步的?

起因

我在尝试实现dualpipe中的两个model chunk前向和反向之间的overlap。我打算先用torch.cuda.stream简单实现,一下:

def overlapped_forward_backward(
    module0: torch.nn.Module,
    inputs0: List[torch.Tensor],
    labels0: Optional[List[torch.Tensor]],
    loss_masks0: Optional[List[torch.Tensor]],
    loss1: Optional[torch.Tensor],
    outputs1: Optional[List[torch.Tensor]],
    output_grads1: Optional[List[torch.Tensor]],
    forward_step_func: Callable,
    is_last_stage0: bool,
) -> tuple[torch.Tensor, Optional[torch.Tensor]]:
    """
    You should implement custom forward-backward overlap strategy.
    The code below is just an example.
    """
    device = inputs0[0].device
    if not hasattr(overlapped_forward_backward, 'backward_streams'):
        overlapped_forward_backward.backward_streams = {}
    if device not in overlapped_forward_backward.backward_streams:
        overlapped_forward_backward.backward_streams[device] = torch.cuda.Stream(device=device)
    backward_stream = overlapped_forward_backward.backward_streams[device]

    with torch.cuda.stream(backward_stream):
        if loss1 is not None:
            loss1.backward()
            loss1.detach_()
        else:
            run_backward(outputs1, output_grads1)

    if len(inputs0) == 1:
        from megatron.core.utils import get_attr_wrapped_model
        set_input_tensor = get_attr_wrapped_model(module0, "set_input_tensor")
        set_input_tensor(inputs0)
    if is_last_stage0:
        inputs0_with_labels_loss_masks = list(inputs0)
        inputs0_with_labels_loss_masks.append(labels0)
        inputs0_with_labels_loss_masks.append(loss_masks0)
        outputs0, loss_func = forward_step_func(inputs0_with_labels_loss_masks, module0)
    else:
        outputs0, loss_func = forward_step_func(inputs0, module0)
    outputs0 = [outputs0] if isinstance(outputs0, torch.Tensor) else outputs0
    if is_last_stage0:
        loss0 = loss_func(outputs0[0])[0]
    else:
        loss0 = None

    torch.cuda.current_stream().wait_stream(backward_stream)

    return outputs0, loss0

我发现前向和反向并没有overlap,反向那块代码运行时间和没有加stream的情况是相同的。

小实验

于是我做了下面这个小实验:


import torch
import time

# GPU warmup
a = torch.randn(10000, 10000, device='cuda')
b = torch.randn(10000, 10000, device='cuda')
c = torch.mm(a, b)

# Count calculation time
calc_start = time.time()
a = torch.randn(10000, 10000, device='cuda')
b = torch.randn(10000, 10000, device='cuda')
for i in range(100):
    c = torch.mm(a, b)
calc_end = time.time()
print(f"calc time: {calc_end - calc_start}")

# Count stream time
calc_stream = torch.cuda.Stream()
torch.cuda.synchronize()
stream_start = time.time()
with torch.cuda.stream(calc_stream):
    a = torch.randn(10000, 10000, device='cuda')
    b = torch.randn(10000, 10000, device='cuda')
    for i in range(100):
        c = torch.mm(a, b)
stream_end = time.time()
print(f"stream time: {stream_end - stream_start}")

torch.cuda.synchronize()

测试了用stream和不用stream的时间,发现时间是差不多的,stream的时间反而更长:

img

问题

按照我的理解,"with torch.cuda.stream()"应该是异步的,不会阻塞,其中的代码块运行时间应该趋近于零。这样才可以和后续的程序并行计算或通信。但是实验结果貌似说明它是阻塞的,整个计算做完之后才会往后继续执行。我代码哪里不对吗,还是有其他可能原因,请指点一下!

  • 写回答

6条回答 默认 最新

  • 阿里嘎多学长 2025-07-10 19:54
    关注

    阿里嘎多学长整理AIGC生成,因移动端显示问题导致当前答案未能完全显示,请使用PC端查看更加详细的解答过程

    with torch.cuda.stream() 为什么会阻塞,而不是异步的?

    你使用 torch.cuda.stream() 试图实现异步计算,但是实际上它并没有实现异步计算,而是阻塞的原因是因为 CUDA Stream 的实现机制。

    torch.cuda.stream() 创建了一个 CUDA Stream 对象,这个对象可以控制 GPU 上的计算流程,但是它并不是真正的异步计算机制。实际上,它只是将计算流程排队到 GPU 上,然后等待计算完成。

    在你的代码中,使用 with torch.cuda.stream(stream) 语句创建了一个 CUDA Stream 对象,然后将计算流程排队到这个对象上。但是,这个对象并没有真正地异步计算,而是阻塞的,因为它需要等待计算完成。

    如果你想实现真正的异步计算,可以使用 torch.cuda.async_copy_to_devicetorch.cuda.async_copy_from_device 函数,它们可以异步地将数据从 CPU 复制到 GPU 或从 GPU 复制到 CPU。

    例如:

    import torch
    import torch.cuda
    
    # 创建 CUDA Stream 对象
    stream = torch.cuda.Stream()
    
    # 将数据从 CPU 复制到 GPU
    torch.cuda.async_copy_to_device(data, device)
    
    # 在 GPU 上执行计算
    torch.cuda.synchronize(stream)
    
    # 将结果从 GPU 复制到 CPU
    torch.cuda.async_copy_from_device(result, device)
    

    在上面的代码中,我们使用 torch.cuda.async_copy_to_devicetorch.cuda.async_copy_from_device 函数异步地将数据从 CPU 复制到 GPU 和从 GPU 复制到 CPU,然后使用 torch.cuda.synchronize 函数等待计算完成。

    结论

    torch.cuda.stream() 创建的 CUDA Stream 对象并不是真正的异步计算机制,而是阻塞的。要实现真正的异步计算,可以使用 torch.cuda.async_copy_to_devicetorch.cuda.async_copy_from_device 函数。

    评论

报告相同问题?

问题事件

  • 创建了问题 7月10日