Skip to content

Commit c648342

Browse files
ShangmingCaiwhybeyoung
authored andcommitted
[PD] Fix abort_request for PD disaggregation (#8352)
Signed-off-by: Shangming Cai <[email protected]> Co-authored-by: ybyang <[email protected]>
1 parent 06324fc commit c648342

File tree

2 files changed

+47
-0
lines changed

2 files changed

+47
-0
lines changed

python/sglang/srt/disaggregation/mooncake/conn.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,14 @@ def failure_exception(self):
10741074
)
10751075
raise KVTransferError(self.bootstrap_room, failure_reason)
10761076

1077+
def abort(self):
1078+
self.kv_mgr.record_failure(
1079+
self.bootstrap_room,
1080+
"Aborted by AbortReq.",
1081+
)
1082+
# Explicitly set the status to failure since this request has been aborted
1083+
self.conclude_state = KVPoll.Failed
1084+
10771085

10781086
class MooncakeKVReceiver(BaseKVReceiver):
10791087
_ctx = zmq.Context()
@@ -1403,6 +1411,14 @@ def failure_exception(self):
14031411
)
14041412
raise KVTransferError(self.bootstrap_room, failure_reason)
14051413

1414+
def abort(self):
1415+
self.kv_mgr.record_failure(
1416+
self.bootstrap_room,
1417+
"Aborted by AbortReq.",
1418+
)
1419+
# Explicitly set the status to failure since this request has been aborted
1420+
self.conclude_state = KVPoll.Failed
1421+
14061422

14071423
class MooncakeKVBootstrapServer(BaseKVBootstrapServer):
14081424
def __init__(self, port: int):

python/sglang/srt/managers/scheduler.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2440,6 +2440,37 @@ def abort_request(self, recv_req: AbortReq):
24402440
req.grammar.cancel()
24412441
req.set_finish_with_abort("Aborted by AbortReq.")
24422442

2443+
# Delete requests not in the waiting queue when PD disaggregation is enabled
2444+
if self.disaggregation_mode == DisaggregationMode.PREFILL:
2445+
# Abort requests that have not yet been bootstrapped
2446+
for i, req in enumerate(self.disagg_prefill_bootstrap_queue.queue):
2447+
logger.debug(f"Abort bootstrap queue request. {req.rid=}")
2448+
if recv_req.abort_all or req.rid.startswith(recv_req.rid):
2449+
if hasattr(req.disagg_kv_sender, "abort"):
2450+
req.disagg_kv_sender.abort()
2451+
2452+
# Abort in-flight requests
2453+
for i, req in enumerate(self.disagg_prefill_inflight_queue):
2454+
logger.debug(f"Abort inflight queue request. {req.rid=}")
2455+
if recv_req.abort_all or req.rid.startswith(recv_req.rid):
2456+
if hasattr(req.disagg_kv_sender, "abort"):
2457+
req.disagg_kv_sender.abort()
2458+
2459+
elif self.disaggregation_mode == DisaggregationMode.DECODE:
2460+
# Abort requests that have not yet finished preallocation
2461+
for i, decode_req in enumerate(self.disagg_decode_prealloc_queue.queue):
2462+
logger.debug(f"Abort prealloc queue request. {decode_req.req.rid=}")
2463+
if recv_req.abort_all or decode_req.req.rid.startswith(recv_req.rid):
2464+
if hasattr(decode_req.kv_receiver, "abort"):
2465+
decode_req.kv_receiver.abort()
2466+
2467+
# Abort requests waiting for kvcache to release tree cache
2468+
for i, decode_req in enumerate(self.disagg_decode_transfer_queue.queue):
2469+
logger.debug(f"Abort transfer queue request. {decode_req.req.rid=}")
2470+
if recv_req.abort_all or decode_req.req.rid.startswith(recv_req.rid):
2471+
if hasattr(decode_req.kv_receiver, "abort"):
2472+
decode_req.kv_receiver.abort()
2473+
24432474
# Delete requests in the running batch
24442475
if self.cur_batch is self.running_batch or self.cur_batch is None:
24452476
reqs = self.running_batch.reqs

0 commit comments

Comments
 (0)