Prhub

#27072 hicache: publish split write-through fragments

原始 PR 作者 ishandhanani 合并时间 2026-06-04 05:52 文件变更 5 提交数 3 评论 10 代码增减 +279 / -29

执行摘要

修复 HiCache 节点分裂时 write-through 事件丢失

在HiCache的write-through模式下,CPU事件异步发布。如果节点在提交host索引后、DMA确认到达前被分裂(例如因插入前缀匹配的新序列),则pending的ack仍指向原节点,只能发布后缀的事件,前缀的事件被静默丢失。该PR旨在消除这一竞态条件。

建议相关人员精读实现,特别是_replace_pending_write_through_node中列表替换的逻辑和_finish_write_through_ack中对storage的持久化方式。设计上通过publish_nodes列表而非直接修改ack回调,是一种简洁的解耦方案。

讨论亮点

来自chatgpt-codex-connector的Review指出(路径unified_radix_cache.py):当enable_storage开启且节点分裂后,_finish_write_through_ack仅对lock_node调用write_backup_storage,而lock_node此时只持有后缀,前缀的CPU事件虽已发布但从未持久化到存储后端,可能导致数据丢失。该问题在第三个提交中修复:改为对publish_nodes中所有节点调用write_backup_storage。

实现拆解

  1. 新增节点字段:在 TreeNodeUnifiedTreeNode 类中添加 write_through_pending_id 属性,标记节点是否有待处理的write-through发布。
  2. 扩展字典结构:将 ongoing_write_through 字典的值从 (node, lock_params/bi) 扩展为 (node, lock_params/bi, publish_nodes),其中 publish_nodes 记录需要发布事件的节点列表,初始为 [node]
  3. 提取三个辅助方法_track_write_through_node 负责录入待发布节点;_replace_pending_write_through_node 在节点分裂时被调用,用新节点替换待发布列表中的旧节点;_finish_write_through_ack 在DMA确认时遍历 publish_nodes 为每个节点生成 BlockStored 事件。
  4. 挂接到分裂流程:在 _split_node 中,当分裂的子节点已backuped(即有pending write-through)时,调用 _replace_pending_write_through_node
  5. 替换原write_backup和writing_check中的逻辑write_backup 中改用 _track_write_through_nodewriting_check 中改用 _finish_write_through_ack,并在 enable_storage 情况下对每个publish节点执行 write_backup_storage,确保前缀也被持久化。
  6. 添加单元测试:新增 test/registered/unit/mem_cache/test_hiradix_cache_unit.py,在 test_unified_radix_cache_unittest.py 中添加新方法,模拟分裂pending write-through场景,验证事件发布正确性。
文件 模块 状态 重要度
python/sglang/srt/mem_cache/unified_radix_cache.py 缓存层 modified 7.76
python/sglang/srt/mem_cache/hiradix_cache.py 缓存层 modified 7.66
test/registered/unit/mem_cache/test_hiradix_cache_unit.py 缓存层测试 added 7.24
test/registered/unit/mem_cache/test_unified_radix_cache_unittest.py 缓存层测试 modified 5.49
python/sglang/srt/mem_cache/radix_cache.py 缓存层 modified 4.35

关键符号

_track_write_through_node _replace_pending_write_through_node _finish_write_through_ack _split_node write_backup writing_check

关键源码片段

python/sglang/srt/mem_cache/unified_radix_cache.py core-logic

实现了核心的三个辅助方法,并修改了 _split_node 和 write_backup 等关键路径。

def _track_write_through_node(
    self,
    node: UnifiedTreeNode,
    lock_params: Optional[DecLockRefParams],
) -> None:
    # 标记节点有 pending 的 write-through,并将自身作为初始发布节点
    node.write_through_pending_id = node.id
    self.ongoing_write_through[node.id] = (node, lock_params, [node])def _replace_pending_write_through_node(
    self, old_node: UnifiedTreeNode, new_nodes: list[UnifiedTreeNode]
) -> None:
    # 节点分裂时,将 old_node 从待发布列表中替换为分裂后的新节点
    ack_id = old_node.write_through_pending_id
    if ack_id is None:
        return
    pending = self.ongoing_write_through.get(ack_id)
    if pending is None:
        return
    lock_node, lock_params, publish_nodes = pending
    updated_nodes = []
    replaced = False
    for node in publish_nodes:
        if node is old_node:
            updated_nodes.extend(new_nodes)
            replaced = True
        else:
            updated_nodes.append(node)
    if not replaced:
        return
    for node in new_nodes:
        node.write_through_pending_id = ack_id
    self.ongoing_write_through[ack_id] = (lock_node, lock_params, updated_nodes)def _finish_write_through_ack(self, ack_id: int) -> None:
    # DMA 确认时,为所有待发布节点生成 CPU 存储事件
    lock_node, lock_params, publish_nodes = self.ongoing_write_through.pop(ack_id)
    for node in publish_nodes:
        if node.write_through_pending_id == ack_id:
            node.write_through_pending_id = None
        self._record_store_event(node, medium=StorageMedium.CPU)
    if lock_params is not None:
        self.dec_lock_ref(lock_node, lock_params)
    if self.enable_storage:
        # 对每个碎片都调用 write_backup_storage,确保前缀也被持久化
        for node in publish_nodes:
            self.write_backup_storage(node)
test/registered/unit/mem_cache/test_hiradix_cache_unit.py test-coverage

新增完整的测试类,覆盖分裂 pending write-through 的核心场景。

def test_split_pending_write_through_publishes_fragments(self):
    cache, allocator = self._build_cache()
    cache.take_events()
​
    self._insert(cache, allocator, [1, 2, 3, 4])
    node = self._leaf_for(cache, [1, 2, 3, 4])
    backed_up = cache.write_backup(node, write_back=True)
    self.assertGreater(backed_up, 0)
​
    # 分裂节点,此时 write-through DMA 尚未确认
    self._insert(cache, allocator, [1, 2, 5, 6])
    self.assertEqual(self._stored_cpu_events(cache), [])
​
    cache.writing_check(write_back=True)
​
    # 两个碎片都应发布事件,且 parentage 正确
    stored_cpu = self._stored_cpu_events(cache)
    self.assertEqual(
        [list(e.token_ids) for e in stored_cpu],
        [[1, 2], [3, 4]],
    )
    self.assertIsNone(stored_cpu[0].parent_block_hash)
    self.assertEqual(stored_cpu[1].parent_block_hash, stored_cpu[0].block_hashes[0])

评论区精华

unified radix cache storage 持久化遗漏前缀 正确性

chatgpt-codex-connector 指出在 enable_storage 场景下,_finish_write_through_ack 仅对 lock_node 调用 write_backup_storage,而 lock_node 在分裂后只持有后缀,导致前缀片段的 CPU 事件被发布但未持久化到存储后端。

结论:作者在第三个提交中修复:改为对 publish_nodes 中所有节点调用 write_backup_storage。 · 已解决

风险与影响

主要风险包括:

  • 潜在死锁或竞态:新增的write_through_pending_id和节点列表替换逻辑可能引入新的并发问题,尤其在多线程或分布式环境下需仔细验证。
  • 性能开销:publish_nodes列表增加了少量内存和遍历开销,但由于write-through路径本身涉及DMA和event读取,额外开销可忽略。
  • 兼容性:ongoing_write_through字典结构变化,任何外部访问该字典的代码(目前无)需要适配。
  • 测试覆盖:测试仅覆盖了简单的分裂场景,未覆盖嵌套分裂、多节点pending等复杂情况。

直接影响所有使用HiCache write-through特性的用户和设备,修复了事件丢失的bug,增强了缓存一致性。对不使用HiCache或使用write-back策略的用户无影响。系统无需配置变更,升级后自动生效。

竞态条件修复 核心数据结构变更 需要验证并发安全

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论