执行摘要
移除 Async EPLB 中未使用的 asyncio 基础设施
移除 Async EPLB 中未使用的 asyncio 基础设施,简化代码,减少不必要的依赖和复杂度。
建议快速合并。这是一次干净的重构,降低了代码复杂度,便于后续维护。值得关注的是通过移除 asyncio 并保留显式同步点来保持正确性的模式。
无实质 review 讨论,仅包含 bot 自动评注和 maintainer 的批准。
移除 Async EPLB 中未使用的 asyncio 基础设施,简化代码,减少不必要的依赖和复杂度。
建议快速合并。这是一次干净的重构,降低了代码复杂度,便于后续维护。值得关注的是通过移除 asyncio 并保留显式同步点来保持正确性的模式。
无实质 review 讨论,仅包含 bot 自动评注和 maintainer 的批准。
移除 asyncio 导入和事件循环:在 vllm/distributed/eplb/async_worker.py 中,删除了 import asyncio,以及 thread_target 中的事件循环创建、设置、运行和关闭逻辑,直接同步调用 transfer_run_periodically。
transfer_run_periodically 去除 async:该函数改为普通同步函数,内部对 transfer_layer 的调用去掉了 await。
transfer_layer 去除 async:在 vllm/distributed/eplb/rebalance_execute.py 中,transfer_layer 从 async def 改为 def。
测试适配:在 tests/distributed/test_eplb_execute.py 中,移除 import asyncio,将对 transfer_layer 的调用从 asyncio.run(transfer_layer(...)) 改为直接调用 transfer_layer(...)。
功能保持:同步化后,原 transfer_layer 内部通过 cuda_stream.synchronize() 保持了执行顺序保证,行为不变。
| 文件 | 模块 | 状态 | 重要度 |
|---|---|---|---|
vllm/distributed/eplb/async_worker.py |
分布式 | modified | 6.83 |
vllm/distributed/eplb/rebalance_execute.py |
分布式 | modified | 5.12 |
tests/distributed/test_eplb_execute.py |
测试 | modified | 4.25 |
vllm/distributed/eplb/async_worker.py
core-logic
核心变更文件:移除了 asyncio 事件循环管理,将 `transfer_run_periodically` 改为同步函数。
# vllm/distributed/eplb/async_worker.py
# 去除 asyncio 依赖后,后台线程直接同步调用 transfer_run_periodically
def start_async_worker(
state: "EplbState",
is_profile: bool = False,
) -> threading.Thread:
eplb_group = get_eplb_group().device_group
device_index = state.cuda_device_index
assert state.is_async
def thread_target() -> None:
assert device_index is not None
torch.accelerator.set_device_index(device_index)
cuda_stream = torch.cuda.Stream(device=device_index)
try:
# 直接同步调用,无需 asyncio 事件循环
transfer_run_periodically(
state=state,
eplb_group=eplb_group,
cuda_stream=cuda_stream,
is_profile=is_profile,
)
except Exception as exc:
logger.exception("async loop error (Rank %d): %s", rank, str(exc))
thread = threading.Thread(target=thread_target, daemon=True)
thread.start()
return thread
# 原 async def 改为 def,内部不再使用 await
def transfer_run_periodically(
state: "EplbState",
eplb_group: ProcessGroup,
cuda_stream: torch.cuda.Stream,
is_profile: bool = False,
) -> None:
while True:
state.rearrange_event.wait(stream=cuda_stream)
# ... 内部循环中直接调用 transfer_layer,不再 await
while model_state.rebalanced and layer_idx < num_layers:
transfer_metadata = transfer_layer(
old_layer_indices=... ,
new_layer_indices=... ,
# ... 其他参数
)
cuda_stream.synchronize() # 显式同步以保证顺序
当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。
风险极低。该 PR 仅为移除未使用的 asyncio 包装,不改变同步语义。transfer_layer 内部原本就是同步操作,去除 async/await 后通过显式 cuda_stream.synchronize() 确保顺序,无回归风险。
影响范围:仅限 EPLB 模块的三个文件,不影响其他模块。
影响程度:低。对外部行为无影响,简化了代码维护。
当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。
参与讨论