执行摘要
- 一句话:新增 Mamba 延迟额外缓冲策略,减少内存占用
- 推荐动作:值得精读,特别是理解 Mamba 调度器的状态管理设计。关注
_handle_finish_state_updated_req 的重构、mamba_lazy_prealloc_at_boundary 的边界处理和 release_kv_cache 的 is_insert 参数传递,这些是模式复用和抽象的关键。
功能与动机
原有 extra_buffer 策略需要 2 个额外缓冲来避免在 decode track 边界、overlap schedule 和请求完成同时发生时的状态损坏,这是一种 tradeoff。在高吞吐场景下,常驻 2 个缓冲造成不必要的内存压力,extra_buffer_lazy 按需分配第二个槽位,仅在边界时临时申请,从而降低常驻内存占用。
实现拆解
- 新增调度策略字符串:在
server_args.py 的 MAMBA_SCHEDULER_STRATEGY_CHOICES 中增加 "extra_buffer_lazy",并添加 enable_mamba_extra_buffer_lazy() 属性供其他模块查询。
- 内存池懒分配:在
memory_pool.py 的 HybridReqToTokenPool 中新增 enable_mamba_extra_buffer_lazy 参数;新方法 _alloc_ping_pong_buffer 在懒模式下只预分配 1 个槽位(第二个设为 -1);增加 set_mamba_ping_pong_slot 和 donate_mamba_ping_pong_slot 用于按需设置/归还临时槽位。
- 调度器边界预分配:在
schedule_batch.py 的 Req 类添加 mamba_lazy_is_insert 标志;新增 mamba_lazy_prealloc_at_boundary() 方法,在每次 prepare_for_decode 时检查是否处于 track 边界,若是则尝试分配第二个槽位,分配失败时尝试 evict 一个 mamba 状态后重试,仍失败则标记该请求跳过缓存插入。
- 后处理适配:在
batch_result_processor.py 中,将原来的 _handle_finished_req 替换为 _handle_finish_state_updated_req(在 update_finish_state 之后调用);新函数内部调用 _mamba_prefix_cache_update,并在释放缓存时根据 req.mamba_lazy_is_insert 决定是否实际插入 radix cache;新增 mamba_lazy_post_decode_at_boundary 负责在解码后清理临时槽位。
- 缓存释放与插入控制:在
mem_cache/common.py 的 release_kv_cache 增加 is_insert 参数;mamba_radix_cache.py 和 unified_cache_components/mamba_component.py 根据此参数决定是否跳过插入。
- 测试覆盖:新增
test/registered/models_e2e/test_qwen3_next_models.py,包含 4 个测试类:TestQwen3NextLazyExtraBuffer、TestQwen3NextLazyExtraBufferLargePage、以及两个手动运行的 AllocFail 变体,通过环境变量模拟分配失败场景,验证 GSM8K 精度、KL 散度和前缀缓存分支。同时修改 test/manual/4-gpu-models/test_qwen3_next_models.py 增加懒模式测试入口。
关键文件:
python/sglang/srt/managers/scheduler_components/batch_result_processor.py(模块 批处理器;类别 source;类型 core-logic;符号 _handle_finished_req, _handle_finish_state_updated_req, _mamba_check_track_boundary, mamba_lazy_post_decode_at_boundary): 核心变更:将 _handle_finished_req 替换为 _handle_finish_state_updated_req,新增 _mamba_check_track_boundary 和 mamba_lazy_post_decode_at_boundary,实现懒模式的后处理逻辑。
python/sglang/srt/mem_cache/memory_pool.py(模块 内存池;类别 source;类型 core-logic;符号 get_mamba_ping_pong_keep_idx, _alloc_ping_pong_buffer, set_mamba_ping_pong_slot, donate_mamba_ping_pong_slot): 引入了懒分配所需的内存池方法:_alloc_ping_pong_buffer(只分配 1 个槽位)、set_mamba_ping_pong_slot、donate_mamba_ping_pong_slot、get_mamba_ping_pong_keep_idx,以及对 ReqToTokenPool 类的类变量扩展。
python/sglang/srt/managers/schedule_batch.py(模块 调度批处理;类别 source;类型 core-logic;符号 mamba_lazy_prealloc_at_boundary): 新增 mamba_lazy_prealloc_at_boundary 方法,在 decode 准备阶段按需分配第二个 ping-pong 槽位;同时为 Req 添加 mamba_lazy_is_insert 标志,用于控制是否插入 radix cache。
关键符号:_handle_finish_state_updated_req, mamba_lazy_post_decode_at_boundary, get_mamba_ping_pong_keep_idx, _alloc_ping_pong_buffer, set_mamba_ping_pong_slot, donate_mamba_ping_pong_slot, mamba_lazy_prealloc_at_boundary, enable_mamba_extra_buffer_lazy, _mamba_check_track_boundary
关键源码片段
python/sglang/srt/managers/scheduler_components/batch_result_processor.py
核心变更:将 _handle_finished_req 替换为 _handle_finish_state_updated_req,新增 _mamba_check_track_boundary 和 mamba_lazy_post_decode_at_boundary,实现懒模式的后处理逻辑。
# python/sglang/srt/managers/scheduler_components/batch_result_processor.py
def _handle_finish_state_updated_req(
self,
req: Req,
batch: ScheduleBatch,
result: GenerationBatchResult,
i: int,
logits_output: LogitsProcessorOutput,
):
# 在 update_finish_state 之后调用,此时 req.finished() 已可用
# 用于 mamba_lazy_post_decode_at_boundary 内部的判断
self._mamba_prefix_cache_update(req, batch, result, i)
# 以下原 _handle_finished_req 逻辑不变 ...
if (
self.server_args.disaggregation_decode_enable_offload_kvcache
and not req.finished()
):
...
# 释放缓存时根据懒模式决定是否插入 radix cache
is_insert = (
req.mamba_lazy_is_insert
if get_global_server_args().enable_mamba_extra_buffer_lazy()
else True
)
release_kv_cache(req, self.tree_cache, is_insert=is_insert)
req.time_stats.set_completion_time()
def _mamba_prefix_cache_update(self, req, batch, result, i):
"""更新 Mamba track 状态,在 ping-pong 边界处处理。"""
if req.mamba_ping_pong_track_buffer is None:
return
lazy = get_global_server_args().enable_mamba_extra_buffer_lazy()
at_boundary, track_seqlen = self._mamba_check_track_boundary(req, batch, result, i)
if not at_boundary:
return
# 非懒模式:直接交换 ping-pong 索引
# 懒模式:保持索引不变,后处理中释放临时槽位
if not lazy:
req.mamba_next_track_idx = (
batch.req_to_token_pool.get_mamba_ping_pong_other_idx(
req.mamba_next_track_idx
)
)
else:
# 后续由 mamba_lazy_post_decode_at_boundary 清理临时槽位
pass
python/sglang/srt/mem_cache/memory_pool.py
引入了懒分配所需的内存池方法:_alloc_ping_pong_buffer(只分配 1 个槽位)、set_mamba_ping_pong_slot、donate_mamba_ping_pong_slot、get_mamba_ping_pong_keep_idx,以及对 ReqToTokenPool 类的类变量扩展。
# python/sglang/srt/mem_cache/memory_pool.py
class ReqToTokenPool:
enable_mamba_extra_buffer_lazy: bool = False # 类变量,标记懒模式
def _alloc_ping_pong_buffer(self, req: "Req"):
"""为请求分配 ping-pong 缓冲。
懒模式只分配 1 个槽位,第二个设为 -1(按需分配);
非懒模式分配全部槽位(通常是 2 个)。
"""
n = 1 if self.enable_mamba_extra_buffer_lazy else self.mamba_ping_pong_track_buffer_size
slots = self.mamba_pool.alloc(n)
assert slots is not None, (
"Not enough space for mamba ping pong idx, "
"try to increase --mamba-full-memory-ratio."
)
buf = torch.full(
(self.mamba_ping_pong_track_buffer_size,),
-1,
dtype=slots.dtype,
device=slots.device,
)
buf[:n] = slots
req.mamba_ping_pong_track_buffer = buf
req.mamba_next_track_idx = 0
def set_mamba_ping_pong_slot(self, req: "Req", idx: int, value):
"""设置 ping-pong 缓冲的某个槽位值,并同步设备端映射。"""
req.mamba_ping_pong_track_buffer[idx] = value
# 同步设备端映射 ...(略)
def donate_mamba_ping_pong_slot(self, req: "Req", idx: int):
"""归还 ping-pong 缓冲的某个槽位到内存池。"""
slot = req.mamba_ping_pong_track_buffer[idx].item()
if slot != -1:
self.mamba_pool.free(slot)
req.mamba_ping_pong_track_buffer[idx] = -1
def get_mamba_ping_pong_keep_idx(self, req: "Req") -> int:
"""返回保存当前 track 状态的 ping-pong 索引。
懒模式下有效状态在 next_track_idx(未交换),
非懒模式下在另一个索引(已交换)。
"""
if self.enable_mamba_extra_buffer_lazy:
return req.mamba_next_track_idx
return self.get_mamba_ping_pong_other_idx(req.mamba_next_track_idx)
python/sglang/srt/managers/schedule_batch.py
新增 mamba_lazy_prealloc_at_boundary 方法,在 decode 准备阶段按需分配第二个 ping-pong 槽位;同时为 Req 添加 mamba_lazy_is_insert 标志,用于控制是否插入 radix cache。
# python/sglang/srt/managers/schedule_batch.py
class ScheduleBatch:
def mamba_lazy_prealloc_at_boundary(self, mamba_track_interval: int):
"""在 track 边界为懒模式请求预分配第二个 ping-pong 槽位。
遍历所有请求,若当前 seq_len 是 track_interval 的整数倍,
且第二个槽位尚未占用(值为 -1),则尝试分配。
分配失败时 evict 一个 mamba 状态后重试,仍失败则标记
mamba_lazy_is_insert = False,跳过缓存插入。
"""
pool = self.req_to_token_pool
for i, req in enumerate(self.reqs):
buf = req.mamba_ping_pong_track_buffer
assert buf is not None
if self.seq_lens_cpu[i].item() % mamba_track_interval != 0:
continue
other_idx = 1 - req.mamba_next_track_idx
if buf[other_idx].item() != -1:
# overlap 时前一轮后处理尚未释放,跳过
continue
if envs.SGLANG_TEST_MAMBA_LAZY_ALLOC_FAIL.get():
new_slot = None
else:
new_slot = pool.mamba_pool.alloc(1)
if new_slot is None:
# 尝试 evict 一个 mamba 状态后重试
self.tree_cache.evict(EvictParams(num_tokens=0, mamba_num=1))
new_slot = pool.mamba_pool.alloc(1)
if new_slot is not None:
pool.set_mamba_ping_pong_slot(req, other_idx, new_slot[0])
req.mamba_next_track_idx = other_idx
else:
# 分配失败,标记不插入缓存
req.mamba_lazy_is_insert = False
评论区精华
PR 未产生有效 review 讨论,仅有作者触发的 CI 重新运行注释。
风险与影响
关联脉络
参与讨论