执行摘要
- 一句话:实现 Elastic EP 失败进程恢复与动态重新加入
- 推荐动作:值得精读,特别是对分布式系统和容错机制感兴趣的开发者。该 PR 展示了如何在不阻塞在线服务的前提下恢复失败节点,设计上的权衡(如同步与异步、性能与可靠性)具有教育意义。但需注意当前实现依赖 Mooncake 库的内部语义,通用性受限;性能优化留给后续 PR。
功能与动机
作为 #11657 的后续,该 PR 使 SGLang 能够动态添加先前失败的进程,恢复最佳吞吐量。当节点失败时,系统管理员可以使用附加标志 --elastic-ep-rejoin 重新启动节点,而剩余的健康进程继续服务现有请求,并定期轮询重新启动进程的状态,一旦准备就绪即可无缝重新加入现有进程组,从而最小化服务中断。
实现拆解
- 新增 CLI 参数与初始化断言:在
server_args.py 添加 --elastic-ep-rejoin 标志,在 _handle_elastic_ep 中增加 pp_size == 1 断言,确保弹性 EP 当前仅支持单管道阶段。
- 分布式初始化适配:在
parallel_state.py 的 GroupCoordinator、init_world_group、init_model_parallel_group、init_distributed_environment 中新增 recovered_rank 参数,使进程组创建时传递 Mooncake 恢复选项;同时跳过恢复进程的消息队列初始化,改为在恢复完成后重建。
- 恢复辅助函数:在
elastic_ep.py 添加核心恢复逻辑,包括 _wait_for_peer_state(轮询 Mooncake 后端确认对等就绪)、_iter_live_parallel_groups(按 unique_name 排序保证集体操作顺序一致)、_map_global_to_group_local_ranks、_maybe_create_message_queue、_refresh_ep_members、try_recover_ranks(对每个活组执行 recover_ranks)以及 join_process_groups(供重新加入进程调用)。
- 模型运行器集成:在
ModelRunner.__init__ 中,检测 elastic_ep_rejoin 后执行 join_process_groups、广播专家位置元数据、重置 ElasticEPState;新增 maybe_recover_ep_ranks 方法在每个前向迭代中检查活跃秩状态,发现失败时调用 try_recover_ranks 并同步 seed 与 expert location。
- 专家位置元数据同步:在
expert_location.py 增加 broadcast_global_expert_location_metadata 函数,通过分布式广播将健康源的设备张量和 CPU 副本同步给所有秩,确保恢复后 expert routing 一致。
- 数据并行控制器调整:
_broadcast_ports_as_server 在弹性 EP 启用时不再关闭 ZMQ 套接字,而是启动守护线程 _reply_ports_as_server 持续响应恢复秩的端口请求。
- EPLB 管理器重置:在
eplb_manager.py 增加 reset_generator 方法,使恢复后调度从一致状态恢复。
- 文档更新:在
docs/advanced_features/server_arguments.md 记录新参数。
关键文件:
python/sglang/srt/elastic_ep/elastic_ep.py(模块 弹性 EP;类别 source;类型 core-logic;符号 reset, _get_process_group_backend, _iter_live_parallel_groups, _map_global_to_group_local_ranks): 核心实现文件,新增了恢复辅助函数和状态管理方法,是整个 PR 的基石。
python/sglang/srt/model_executor/model_runner.py(模块 模型执行器;类别 source;类型 data-contract;符号 maybe_recover_ep_ranks, _get_healthy_expert_location_src_rank): 在模型初始化中集成 rejoin 路径,并在每个前向迭代中检查回复发恢复。
python/sglang/srt/eplb/expert_location.py(模块 专家路由;类别 source;类型 core-logic;符号 broadcast_global_expert_location_metadata): 新增 broadcast_global_expert_location_metadata 函数,确保恢复后专家位置元数据一致。
python/sglang/srt/managers/data_parallel_controller.py(模块 数据并行控制;类别 source;类型 entrypoint;符号 _reply_ports_as_server): 修改端口广播逻辑,为恢复进程提供持久化的 REP 服务线程。
python/sglang/srt/distributed/parallel_state.py(模块 分布式状态;类别 source;类型 dependency-wiring;符号 init_world_group, init_model_parallel_group, init_distributed_environment, GroupCoordinator.init): 分布式初始化传递 recovered_rank 标志,控制进程组创建和消息队列初始化。
python/sglang/srt/server_args.py(模块 服务参数;类别 source;类型 core-logic;符号 elastic_ep_rejoin): 添加新参数和 pp_size==1 断言。
python/sglang/srt/eplb/eplb_manager.py(模块 EPLB 管理器;类别 source;类型 core-logic;符号 reset_generator): 新增 reset_generator 方法,支持恢复后重置 EPLB 状态。
docs/advanced_features/server_arguments.md(模块 文档;类别 docs;类型 documentation): 记录新参数 --elastic-ep-rejoin 的文档说明。
关键符号:reset, _get_process_group_backend, _iter_live_parallel_groups, _map_global_to_group_local_ranks, _wait_for_peer_state, _maybe_create_message_queue, _refresh_ep_members, try_recover_ranks, join_process_groups, maybe_recover_ep_ranks, _get_healthy_expert_location_src_rank, broadcast_global_expert_location_metadata, _reply_ports_as_server, reset_generator
关键源码片段
python/sglang/srt/model_executor/model_runner.py
在模型初始化中集成 rejoin 路径,并在每个前向迭代中检查回复发恢复。
def maybe_recover_ep_ranks(self):
# TODO(perf): `active_ranks.all()` 在 CUDA 张量上会触发主机 - 设备同步,
# 且此函数在前向路径上。此检查仅在启用 --elastic-ep-backend 时运行,
# 因此同步开销不会传播到其他配置。留给未来弹性 EP 路径优化。
if self.tp_group.active_ranks.all() and self.tp_group.active_ranks_cpu.all():
# 所有秩健康,无需恢复
return
# 将设备张量拷贝到 CPU 并转换为 numpy 数组用于掩码计算
tp_active_ranks = self.tp_group.active_ranks.detach().cpu().numpy()
tp_active_ranks_cpu = self.tp_group.active_ranks_cpu.detach().numpy()
# 取两者交集:仅当两个视图都标记为不活跃时才需要恢复
tp_active_ranks &= tp_active_ranks_cpu
# 注意:ranks_to_recover 使用 tp_group 内索引。当前 Mooncake 弹性 EP
# 实现假设 --pp-size=1,因此 tp-group 索引等于全局秩索引。
ranks_to_recover = [
i for i in range(len(tp_active_ranks)) if not tp_active_ranks[i]
]
if not ranks_to_recover:
return
# 调用恢复辅助函数,该函数会在每个并行组上执行集体操作
try_recover_ranks(ranks_to_recover)
# 选择一个健康的秩作为元数据广播源
healthy_local_indices = [
i for i in range(len(tp_active_ranks)) if tp_active_ranks[i]
]
src_rank = self.tp_group.ranks[healthy_local_indices[0]]
# 广播专家位置元数据,确保所有秩一致
broadcast_global_expert_location_metadata(src_rank=src_rank)
# 重置状态管理器,使活跃秩重新标记为全 1
ElasticEPStateManager.instance().reset()
python/sglang/srt/eplb/expert_location.py
新增 broadcast_global_expert_location_metadata 函数,确保恢复后专家位置元数据一致。
def broadcast_global_expert_location_metadata(
src_rank: int = 0, group: Optional[torch.distributed.ProcessGroup] = None
):
"""广播全局 ExpertLocationMetadata 从 src_rank 到所有秩。
用于 Elastic EP 秩恢复后确保所有秩(包括新恢复的)共享相同的专家位置元数据。
"""
metadata = get_global_expert_location_metadata()
assert metadata is not None, "ExpertLocationMetadata not initialized"
# 确保设备张量是连续的,以便就地广播
metadata.physical_to_logical_map = metadata.physical_to_logical_map.contiguous()
metadata.logical_to_all_physical_map = (
metadata.logical_to_all_physical_map.contiguous()
)
metadata.logical_to_all_physical_map_num_valid = (
metadata.logical_to_all_physical_map_num_valid.contiguous()
)
if metadata.logical_to_rank_dispatch_physical_map is not None:
metadata.logical_to_rank_dispatch_physical_map = (
metadata.logical_to_rank_dispatch_physical_map.contiguous()
)
device_tensors = [
metadata.physical_to_logical_map,
metadata.logical_to_all_physical_map,
metadata.logical_to_all_physical_map_num_valid,
]
if metadata.logical_to_rank_dispatch_physical_map is not None:
device_tensors.append(metadata.logical_to_rank_dispatch_physical_map)
# 逐个张量广播,利用 PyTorch 分布式原语
for tensor in device_tensors:
torch.distributed.broadcast(tensor, src=src_rank, group=group)
# 广播完成后更新 CPU 副本,避免后续 GPU -> CPU 同步
metadata.physical_to_logical_map_cpu = metadata.physical_to_logical_map.cpu()
metadata.logical_to_all_physical_map_cpu = (
metadata.logical_to_all_physical_map.cpu()
)
评论区精华
风险与影响
- 风险:
- 性能回归:前向路径中的 CUDA 同步(
active_ranks.all())可能增加延迟,但仅限于启用弹性 EP 的配置。
- 分布式死锁:如果 Mooncake 库的同步保证有 bug 或版本不兼容,可能导致集群死锁。
- ZMQ 线程泄漏:
_reply_ports_as_server 线程在进程退出时不会优雅关闭,可能导致端口占用。
- 元数据不一致:广播源秩若尚未完全恢复可能广播陈旧数据;当前实现动态选择健康秩,但
--elastic-ep-rejoin 标志在滚动重启场景下可能失效。
- 缺少测试覆盖:无自动化测试,仅靠手动多节点验证。
- 影响:对用户:启用弹性 EP 的部署可获得节点级容错,系统管理员可在不停止服务的情况下替换节点。对系统:引入每前向迭代的健康检查(微小开销),但仅在 elastic EP 模式生效。对团队:此 PR 是系列的第 6 个,表明弹性 EP 功能趋于成熟,后续可在此基础上优化性能。
- 风险标记:CUDA 同步开销, 缺少自动化测试, ZMQ 线程泄漏, Mooncake 版本依赖
关联脉络
- PR #11657 [Elastic EP] ...: 此 PR 的基础,引入了弹性 EP 框架和 Mooncake 后端支持。
参与讨论