执行摘要
- 一句话:修复MORI IO KV连接器API路由错误,确保预填充和解码节点使用正确端点。
- 推荐动作:该PR值得精读,重点关注代理服务器的路由重构设计,以及如何通过参数化API路径来避免硬编码,这对于构建灵活的服务端点有借鉴意义。
功能与动机
根据PR描述,当前MORI IO KV连接器已损坏,因为请求URL错误地使用了/v1/completions后缀,而正确应为/v1。此外,处理器未区分/v1/completions和/v1/chat/completions路由,导致问题。此PR旨在修复这些问题,确保连接器能正确工作,并通过lm_eval测试验证。
实现拆解
- 重构代理服务器路由处理:在
examples/online_serving/disaggregated_serving/moriio_toy_proxy_server.py中,将原单一handle_request函数拆分为handle_completions_request和handle_chat_completions_request两个独立路由函数,它们调用新的handle_request函数并传入API路径参数(/completions或/chat/completions)。
- 动态构建请求端点URL:在
handle_request函数中,使用传入的api参数动态拼接预填充和解码实例的端点URL(例如prefill_request_url = prefill_instance_endpoint["request_address"] + api),确保使用正确的API路径。
- 更新KV连接器注册地址:在
vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_connector.py的_ping方法中,将http_request_address从f"http://{self.request_address}/v1/completions"改为f"http://{self.request_address}/v1",以匹配代理服务器期望的基地址。
- 改进错误处理和修复语法错误:在代理服务器中,增强
send_request_to_prefill和stream_decode_response函数的错误消息,包含更多响应详情;并修复了review中发现的语法错误(多余括号)。
关键文件:
examples/online_serving/disaggregated_serving/moriio_toy_proxy_server.py(模块 代理服务器;类别 source;类型 core-logic;符号 handle_completions_request, handle_request, handle_chat_completions_request): 代理服务器的主逻辑文件,重构了路由处理以支持正确的API端点,是关键变更所在。
vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_connector.py(模块 KV连接器;类别 source;类型 configuration): KV连接器实现,更新了注册时的请求地址,确保与代理服务器匹配。
关键符号:handle_completions_request, handle_request, handle_chat_completions_request, _ping
关键源码片段
examples/online_serving/disaggregated_serving/moriio_toy_proxy_server.py
代理服务器的主逻辑文件,重构了路由处理以支持正确的API端点,是关键变更所在。
@app.route("/v1/completions", methods=["POST"])
async def handle_completions_request():
# 处理标准补全请求,调用通用处理器并传入 API 路径
return await handle_request("/completions", request)
@app.route("/v1/chat/completions", methods=["POST"])
async def handle_chat_completions_request():
# 处理聊天补全请求,区分路由以避免混淆
return await handle_request("/chat/completions", request)
async def handle_request(api: str, request: Request):
# 通用请求处理器,接收 API 路径参数
try:
# ... 原有逻辑 ...
# 动态构建预填充请求 URL,避免硬编码
prefill_request_url = prefill_instance_endpoint["request_address"] + api
send_prefill_task = asyncio.create_task(
send_request_to_prefill(
prefill_request_url, # 使用拼接后的 URL
req_data_to_prefill,
request_id,
decode_instance_endpoint,
selected_prefill_dp_rank,
)
)
# 类似地处理解码请求 URL
decode_request_url = decode_instance_endpoint["request_address"] + api
decode_request_task = asyncio.create_task(
start_decode_request(decode_request_url, req_data, request_id)
)
# ... 其余逻辑 ...
except Exception as e:
# 错误处理
return await make_response(...)
vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_connector.py
KV连接器实现,更新了注册时的请求地址,确保与代理服务器匹配。
def _ping(self, zmq_context):
# 构建 HTTP 请求地址,用于向代理服务器注册实例
# 修复前:http_request_address = f"http://{self.request_address}/v1/completions"
http_request_address = f"http://{self.request_address}/v1" # 移除 /completions 后缀,使用基地址
role = "P" if self.is_producer else "D"
retry_count = 0
index = 1
with zmq_context.socket(zmq.DEALER) as sock:
sock.connect(f"tcp://{self.proxy_ip}:{self.proxy_ping_port}")
while True:
try:
data = {
"type": "register",
"role": role,
"index": str(index),
"request_address": http_request_address, # 使用修正后的地址
"handshake_port": self.handshake_port,
"notify_port": self.notify_port,
"dp_size": self.moriio_config.dp_size,
"tp_size": self.moriio_config.tp_size,
"transfer_mode": self.mode.name,
}
# ... 发送注册数据 ...
except Exception as e:
# 错误处理
pass
评论区精华
review中主要讨论了代码正确性:
风险与影响
- 风险:技术风险较低:
- 回归风险:变更集中在MORI IO连接器的代理服务器和注册逻辑,属于特定功能模块,但若其他组件依赖原URL格式,可能引入兼容性问题。
- 性能风险:无显著性能影响,主要是路由和URL构建逻辑调整。
- 安全风险:无新增安全漏洞,错误信息改进可能泄露内部URL细节,但属于调试信息,风险可控。
- 兼容性风险:连接器注册地址变更可能影响已部署实例,需确保所有相关组件同步更新。
- 影响:影响范围有限但关键:
- 用户影响:修复后,使用MORI IO KV连接器的用户将能正常进行预填充和解码请求,避免因API路由错误导致的失败。
- 系统影响:仅影响分布式KV传输中的MORI IO连接器模块,不涉及核心推理路径。
- 团队影响:提供了更清晰的错误信息和模块化路由,便于未来维护和扩展。
- 风险标记:API路由变更, 缺少测试覆盖
关联脉络
- PR #34907 [ROCm][P/D][MORI] Add MORI IO KV connector: PR body中引用了该PR的Justfile和测试指令,表明当前PR是基于早期MORI IO连接器实现的修复。
参与讨论