diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 277877896..d5c08c851 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -179,7 +179,7 @@ class EngineCore: # to eliminate pipeline bubbles. self.batch_queue_size = self.model_executor.max_concurrent_batches self.batch_queue: ( - deque[tuple[Future[ModelRunnerOutput], SchedulerOutput]] | None + deque[tuple[Future[ModelRunnerOutput], SchedulerOutput, Future[Any]]] | None ) = None if self.batch_queue_size > 1: logger.info("Batch queue is enabled with size %d", self.batch_queue_size) @@ -337,16 +337,6 @@ class EngineCore: ) raise err - def _log_err_callback(self, scheduler_output: SchedulerOutput): - """Log error details of a future that's not expected to return a result.""" - - def callback(f, sched_output=scheduler_output): - with self.log_error_detail(sched_output): - result = f.result() - assert result is None - - return callback - def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]: """Schedule, execute, and make output. @@ -423,8 +413,6 @@ class EngineCore: # No sampling required (no requests scheduled). future = cast(Future[ModelRunnerOutput], exec_future) else: - exec_future.add_done_callback(self._log_err_callback(scheduler_output)) - if not scheduler_output.pending_structured_output_tokens: # We aren't waiting for any tokens, get any grammar output # and sample immediately. @@ -441,7 +429,7 @@ class EngineCore: if not deferred_scheduler_output: # Add this step's future to the queue. - batch_queue.appendleft((future, scheduler_output)) + batch_queue.appendleft((future, scheduler_output, exec_future)) if ( model_executed and len(batch_queue) < self.batch_queue_size @@ -458,9 +446,14 @@ class EngineCore: return None, False # Block until the next result is available. - future, scheduler_output = batch_queue.pop() + future, scheduler_output, exec_model_fut = batch_queue.pop() with self.log_error_detail(scheduler_output): model_output = future.result() + if model_output is None: + # None from sample_tokens() implies that the original execute_model() + # call failed - raise that exception. + exec_model_fut.result() + raise RuntimeError("unexpected error") # Before processing the model output, process any aborts that happened # during the model execution. @@ -479,7 +472,7 @@ class EngineCore: deferred_scheduler_output ) future = self.model_executor.sample_tokens(grammar_output, non_block=True) - batch_queue.appendleft((future, deferred_scheduler_output)) + batch_queue.appendleft((future, deferred_scheduler_output, exec_future)) return engine_core_outputs, model_executed