[BugFix] Eagerly abort cancelled final-step requests (#29987)

Currently, when requests are cancelled while executing their final
step, "completion" is handled based on normal stop processing (e.g.
length or stop token), so the abort has no effect. This is typically
not a problem, but when a kv connector is involved it thinks the
request completed successfully rather than being aborted.

This is problematic for disaggregated prefill which will free kv
cache blocks if the request was aborted but not if it completed
successfully—since the cancelled request will never be sent to
the decode side, kv cache blocks remain pinned until the fall-back
timeout expires. The problem is exacerbated when many requests
are cancelled and/or there are large prefills whose forward pass
takes a long time (since the window is bigger).

This PR fixes the problem by processing pending aborts
immediately prior to processing model output each step; we process
only aborts, not new requests, since it's preferable for latency to
process model outputs before new incoming requests.

Fixes #26400.

Signed-off-by: Nick Hill <nhill@redhat.com>
This commit is contained in:
Nick Hill
2025-12-05 09:28:32 -08:00
committed by GitHub
parent 78c44fd722
commit dc264bcea1
3 changed files with 352 additions and 6 deletions

View File

@@ -204,6 +204,8 @@ class EngineCore:
)
self.async_scheduling = vllm_config.scheduler_config.async_scheduling
self.aborts_queue = queue.Queue[list[str]]()
# Mark the startup heap as static so that it's ignored by GC.
# Reduces pause times of oldest generation collections.
freeze_gc_heap()
@@ -347,6 +349,9 @@ class EngineCore:
if model_output is None:
model_output = self.model_executor.sample_tokens(grammar_output)
# Before processing the model output, process any aborts that happened
# during the model execution.
self._process_aborts_queue()
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output
)
@@ -440,6 +445,9 @@ class EngineCore:
with self.log_error_detail(scheduler_output):
model_output = future.result()
# Before processing the model output, process any aborts that happened
# during the model execution.
self._process_aborts_queue()
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output
)
@@ -458,6 +466,18 @@ class EngineCore:
return engine_core_outputs, model_executed
def _process_aborts_queue(self):
if not self.aborts_queue.empty():
request_ids = []
while not self.aborts_queue.empty():
ids = self.aborts_queue.get_nowait()
if isinstance(ids, str):
# Should be a list here, but also handle string just in case.
ids = (ids,)
request_ids.extend(ids)
# More efficient to abort all as a single batch.
self.abort_requests(request_ids)
def shutdown(self):
self.structured_output_manager.clear_backend()
if self.model_executor:
@@ -871,9 +891,13 @@ class EngineCoreProc(EngineCore):
and not self.scheduler.has_requests()
and not self.batch_queue
):
if logger.isEnabledFor(DEBUG) and self.input_queue.empty():
logger.debug("EngineCore waiting for work.")
waited = True
if self.input_queue.empty():
# Drain aborts queue; all aborts are also processed via input_queue.
with self.aborts_queue.mutex:
self.aborts_queue.queue.clear()
if logger.isEnabledFor(DEBUG):
logger.debug("EngineCore waiting for work.")
waited = True
req = self.input_queue.get()
self._handle_client_request(*req)
@@ -1027,6 +1051,13 @@ class EngineCoreProc(EngineCore):
else:
request = generic_decoder.decode(data_frames)
if request_type == EngineCoreRequestType.ABORT:
# Aborts are added to *both* queues, allows us to eagerly
# process aborts while also ensuring ordering in the input
# queue to avoid leaking requests. This is ok because
# aborting in the scheduler is idempotent.
self.aborts_queue.put_nowait(request)
# Push to input queue for core busy loop.
self.input_queue.put_nowait((request_type, request))