执行摘要
- 一句话:为Engine API暴露子进程PID列表
- 推荐动作:值得精读,特别是对于需要直接集成SGLang Engine且需要进程健康监控的团队;设计简单,无副作用。
功能与动机
一些调用者直接使用Engine作为库(无HTTP服务器),并运行自己的健康监控来验证所有SGLang子进程是否存活。之前无法从Engine API发现子进程PID,导致监控无法可靠监测子进程状态。
实现拆解
- 在
SchedulerInitResult数据类中新增all_child_pids: List[int]字段,用于存储所有子进程PID;
- 在
Engine类中新增get_all_child_pids()方法返回该列表;
- 修改
_launch_scheduler_processes和_launch_subprocesses方法,在启动scheduler和detokenizer进程后收集其PID,并对于dp_size>1场景从DP控制器管道消息中获取子调度器PID;
- 在
data_parallel_controller.py中添加scheduler_pids常量,并在管道消息中传入子调度器PID;
- 新增测试文件
test_engine_child_pids.py验证返回值是活进程、包含至少2个PID(scheduler+detokenizer)、无重复。
关键文件:
python/sglang/srt/entrypoints/engine.py(模块 Engine入口;类别 source;类型 core-logic;符号 get_all_child_pids, SchedulerInitResult): 核心修改:添加all_child_pids字段、get_all_child_pids方法、收集子进程PID的逻辑。
python/sglang/srt/managers/data_parallel_controller.py(模块 DP控制器;类别 source;类型 entrypoint;符号 SCHEDULER_PIDS_ARG): 新增SCHEDULER_PIDS_ARG常量和在管道消息中传递子调度器PID,支持dp_size>1场景。
test/registered/core/test_engine_child_pids.py(模块 进程PID测试;类别 test;类型 test-coverage;符号 TestEngineChildPids): 新增测试文件,验证新API的正确性(PID存活、至少包含scheduler和detokenizer、无重复)。
关键符号:get_all_child_pids, SchedulerInitResult.all_child_pids, run_data_parallel_controller_process
关键源码片段
python/sglang/srt/entrypoints/engine.py
核心修改:添加all_child_pids字段、get_all_child_pids方法、收集子进程PID的逻辑。
# 在 SchedulerInitResult 数据类中新增 all_child_pids 字段
@dataclasses.dataclass
class SchedulerInitResult:
scheduler_infos: List[Dict[str, Any]]
all_child_pids: List[int] = dataclasses.field(default_factory=list) # 新增:存储所有子进程 PID
wait_for_ready: Callable[[], None] = lambda: None
wait_for_completion: Callable[[], None] = lambda: None
engine_info_bootstrap_server: Optional[Any] = None
# Engine 类中新增公共方法
def get_all_child_pids(self) -> List[int]:
"""Returns a list of all child process PIDs."""
return self._scheduler_init_result.all_child_pids
# 在 _launch_scheduler_processes 中收集 scheduler 和 DP controller 的子进程 PID
def _launch_scheduler_processes(self, ...):
# ... 启动 scheduler 进程后
all_child_pids = [proc.pid for proc in scheduler_procs]
# 在 wait_for_ready 中,如果 dp_size > 1,从 DP controller 管道消息中提取子调度器 PID
def wait_for_ready():
infos = _wait_for_scheduler_ready(...)
if server_args.dp_size > 1:
for info in infos:
if SCHEDULER_PIDS_ARG in info:
all_child_pids.extend(info[SCHEDULER_PIDS_ARG])
# ...
# 在 _launch_subprocesses 中收集 detokenizer PID
detoken_proc.start()
scheduler_init_result.all_child_pids.append(detoken_proc.pid)
python/sglang/srt/managers/data_parallel_controller.py
新增SCHEDULER_PIDS_ARG常量和在管道消息中传递子调度器PID,支持dp_size>1场景。
# 新增常量
SCHEDULER_PIDS_ARG = "scheduler_pids"
# 在 run_data_parallel_controller_process 中收集 scheduler 的 PID 并通过管道发送
def run_data_parallel_controller_process(...):
controller = DataParallelController(...)
scheduler_pids = [
proc.pid for proc in controller.scheduler_procs if proc is not None
]
pipe_writer.send({
"status": "ready",
"max_total_num_tokens": controller.max_total_num_tokens,
"max_req_input_len": controller.max_req_input_len,
SCHEDULER_PIDS_ARG: scheduler_pids, # 新增:传递子调度器 PID
})
test/registered/core/test_engine_child_pids.py
新增测试文件,验证新API的正确性(PID存活、至少包含scheduler和detokenizer、无重复)。
class TestEngineChildPids(CustomTestCase):
def test_get_all_child_pids_returns_live_pids(self):
engine = sgl.Engine(model_path=DEFAULT_SMALL_MODEL_NAME_FOR_TEST, random_seed=42)
try:
pids = engine.get_all_child_pids()
# 验证返回类型和数量
self.assertIsInstance(pids, list)
self.assertGreater(len(pids), 0)
# 验证每个 PID 都是当前进程的后代
current_proc = psutil.Process(os.getpid())
child_pids = {c.pid for c in current_proc.children(recursive=True)}
for pid in pids:
self.assertIn(pid, child_pids, f"PID {pid} is not a child")
finally:
engine.shutdown()
def test_child_pids_include_scheduler_and_detokenizer(self):
engine = sgl.Engine(...)
try:
pids = engine.get_all_child_pids()
# dp_size=1 时至少应有 scheduler 和 detokenizer 两个 PID
self.assertGreaterEqual(len(pids), 2)
finally:
engine.shutdown()
def test_child_pids_no_duplicates(self):
engine = sgl.Engine(...)
try:
pids = engine.get_all_child_pids()
self.assertEqual(len(pids), len(set(pids)))
finally:
engine.shutdown()
评论区精华
审核者kpham-sgl对engine.py中使用常量SCHEDULER_PIDS_ARG提出了细微意见(可直接使用字符串),但结论是使用常量也无妨,PR已合并。
- 是否应使用字符串字面量而非常量 (style): 保留常量定义,但 reviewer 认可。
风险与影响
- 风险:风险较低。需确保在dp_size>1场景下,DP控制器管道消息中的scheduler_pids在
wait_for_ready时已可用;当前实现是在控制器内部收集,时序可靠。另外,如果子进程在调用get_all_child_pids之前已结束,返回的PID可能无效,但测试已覆盖实时性验证。
- 影响:仅影响通过
Engine类直接使用SGLang的调用者,为其提供了健康的子进程监控能力;对于通过HTTP服务器使用的方式无影响。影响范围较小,正向增益。
- 风险标记:子进程可能提前退出导致PID无效
关联脉络
参与讨论