Prhub

#40730 [EPLB] Remove asyncio infrastructure from Async EPLB

原始 PR 作者 SageMoore 合并时间 2026-04-24 08:21 文件变更 3 提交数 3 评论 0 代码增减 +16 / -26

执行摘要

移除 Async EPLB 中未使用的 asyncio 基础设施

移除 Async EPLB 中未使用的 asyncio 基础设施,简化代码,减少不必要的依赖和复杂度。

建议快速合并。这是一次干净的重构,降低了代码复杂度,便于后续维护。值得关注的是通过移除 asyncio 并保留显式同步点来保持正确性的模式。

讨论亮点

无实质 review 讨论,仅包含 bot 自动评注和 maintainer 的批准。

实现拆解

  1. 移除 asyncio 导入和事件循环:在 vllm/distributed/eplb/async_worker.py 中,删除了 import asyncio,以及 thread_target 中的事件循环创建、设置、运行和关闭逻辑,直接同步调用 transfer_run_periodically

  2. transfer_run_periodically 去除 async:该函数改为普通同步函数,内部对 transfer_layer 的调用去掉了 await

  3. transfer_layer 去除 async:在 vllm/distributed/eplb/rebalance_execute.py 中,transfer_layerasync def 改为 def

  4. 测试适配:在 tests/distributed/test_eplb_execute.py 中,移除 import asyncio,将对 transfer_layer 的调用从 asyncio.run(transfer_layer(...)) 改为直接调用 transfer_layer(...)

  5. 功能保持:同步化后,原 transfer_layer 内部通过 cuda_stream.synchronize() 保持了执行顺序保证,行为不变。

文件 模块 状态 重要度
vllm/distributed/eplb/async_worker.py 分布式 modified 6.83
vllm/distributed/eplb/rebalance_execute.py 分布式 modified 5.12
tests/distributed/test_eplb_execute.py 测试 modified 4.25

关键符号

transfer_layer transfer_run_periodically

关键源码片段

vllm/distributed/eplb/async_worker.py core-logic

核心变更文件:移除了 asyncio 事件循环管理,将 `transfer_run_periodically` 改为同步函数。

# vllm/distributed/eplb/async_worker.py
# 去除 asyncio 依赖后,后台线程直接同步调用 transfer_run_periodically
def start_async_worker(
    state: "EplbState",
    is_profile: bool = False,
) -> threading.Thread:
    eplb_group = get_eplb_group().device_group
    device_index = state.cuda_device_index
    assert state.is_async
​
    def thread_target() -> None:
        assert device_index is not None
        torch.accelerator.set_device_index(device_index)
        cuda_stream = torch.cuda.Stream(device=device_index)
        try:
            # 直接同步调用,无需 asyncio 事件循环
            transfer_run_periodically(
                state=state,
                eplb_group=eplb_group,
                cuda_stream=cuda_stream,
                is_profile=is_profile,
            )
        except Exception as exc:
            logger.exception("async loop error (Rank %d): %s", rank, str(exc))
​
    thread = threading.Thread(target=thread_target, daemon=True)
    thread.start()
    return thread# 原 async def 改为 def,内部不再使用 await
def transfer_run_periodically(
    state: "EplbState",
    eplb_group: ProcessGroup,
    cuda_stream: torch.cuda.Stream,
    is_profile: bool = False,
) -> None:
    while True:
        state.rearrange_event.wait(stream=cuda_stream)
        # ... 内部循环中直接调用 transfer_layer,不再 await
        while model_state.rebalanced and layer_idx < num_layers:
            transfer_metadata = transfer_layer(
                old_layer_indices=... ,
                new_layer_indices=... ,
                # ... 其他参数
            )
            cuda_stream.synchronize() # 显式同步以保证顺序

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

风险极低。该 PR 仅为移除未使用的 asyncio 包装,不改变同步语义。transfer_layer 内部原本就是同步操作,去除 async/await 后通过显式 cuda_stream.synchronize() 确保顺序,无回归风险。

影响范围:仅限 EPLB 模块的三个文件,不影响其他模块。
影响程度:低。对外部行为无影响,简化了代码维护。

微小变更

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论