[BugFix] Async scheduling: handle model forward errors more cleanly (#31611)
Signed-off-by: njhill <nickhill123@gmail.com>
This commit is contained in:
@@ -179,7 +179,7 @@ class EngineCore:
|
||||
# to eliminate pipeline bubbles.
|
||||
self.batch_queue_size = self.model_executor.max_concurrent_batches
|
||||
self.batch_queue: (
|
||||
deque[tuple[Future[ModelRunnerOutput], SchedulerOutput]] | None
|
||||
deque[tuple[Future[ModelRunnerOutput], SchedulerOutput, Future[Any]]] | None
|
||||
) = None
|
||||
if self.batch_queue_size > 1:
|
||||
logger.info("Batch queue is enabled with size %d", self.batch_queue_size)
|
||||
@@ -337,16 +337,6 @@ class EngineCore:
|
||||
)
|
||||
raise err
|
||||
|
||||
def _log_err_callback(self, scheduler_output: SchedulerOutput):
|
||||
"""Log error details of a future that's not expected to return a result."""
|
||||
|
||||
def callback(f, sched_output=scheduler_output):
|
||||
with self.log_error_detail(sched_output):
|
||||
result = f.result()
|
||||
assert result is None
|
||||
|
||||
return callback
|
||||
|
||||
def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
|
||||
"""Schedule, execute, and make output.
|
||||
|
||||
@@ -423,8 +413,6 @@ class EngineCore:
|
||||
# No sampling required (no requests scheduled).
|
||||
future = cast(Future[ModelRunnerOutput], exec_future)
|
||||
else:
|
||||
exec_future.add_done_callback(self._log_err_callback(scheduler_output))
|
||||
|
||||
if not scheduler_output.pending_structured_output_tokens:
|
||||
# We aren't waiting for any tokens, get any grammar output
|
||||
# and sample immediately.
|
||||
@@ -441,7 +429,7 @@ class EngineCore:
|
||||
|
||||
if not deferred_scheduler_output:
|
||||
# Add this step's future to the queue.
|
||||
batch_queue.appendleft((future, scheduler_output))
|
||||
batch_queue.appendleft((future, scheduler_output, exec_future))
|
||||
if (
|
||||
model_executed
|
||||
and len(batch_queue) < self.batch_queue_size
|
||||
@@ -458,9 +446,14 @@ class EngineCore:
|
||||
return None, False
|
||||
|
||||
# Block until the next result is available.
|
||||
future, scheduler_output = batch_queue.pop()
|
||||
future, scheduler_output, exec_model_fut = batch_queue.pop()
|
||||
with self.log_error_detail(scheduler_output):
|
||||
model_output = future.result()
|
||||
if model_output is None:
|
||||
# None from sample_tokens() implies that the original execute_model()
|
||||
# call failed - raise that exception.
|
||||
exec_model_fut.result()
|
||||
raise RuntimeError("unexpected error")
|
||||
|
||||
# Before processing the model output, process any aborts that happened
|
||||
# during the model execution.
|
||||
@@ -479,7 +472,7 @@ class EngineCore:
|
||||
deferred_scheduler_output
|
||||
)
|
||||
future = self.model_executor.sample_tokens(grammar_output, non_block=True)
|
||||
batch_queue.appendleft((future, deferred_scheduler_output))
|
||||
batch_queue.appendleft((future, deferred_scheduler_output, exec_future))
|
||||
|
||||
return engine_core_outputs, model_executed
|
||||
|
||||
|
||||
Reference in New Issue
Block a user