From 14561fabfd39030168a3365327d921ae5c49bb0c Mon Sep 17 00:00:00 2001 From: Wentao Ye <44945378+yewentao256@users.noreply.github.com> Date: Tue, 24 Feb 2026 07:13:11 -0500 Subject: [PATCH] [Perf] Optimize pooling model redundant copy, 1.8% throughput improvement (#35127) Signed-off-by: yewentao256 --- vllm/v1/worker/gpu_model_runner.py | 66 +++++++++++++++++++++++------- 1 file changed, 51 insertions(+), 15 deletions(-) diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index baef3fdc4..0013ec3d7 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -95,7 +95,6 @@ from vllm.sequence import IntermediateTensors from vllm.tasks import GenerationTask, PoolingTask, SupportedTask from vllm.tracing import instrument from vllm.utils import length_from_prompt_token_ids_or_embeds -from vllm.utils.jsontree import json_map_leaves from vllm.utils.math_utils import cdiv, round_up from vllm.utils.mem_utils import DeviceMemoryProfiler, format_gib from vllm.utils.nvtx_pytorch_hooks import PytHooks @@ -268,6 +267,51 @@ class AsyncGPUModelRunnerOutput(AsyncModelRunnerOutput): return output +def _copy_pooler_output_to_cpu( + raw_pooler_output: PoolerOutput, finished_mask: list[bool] +) -> list[torch.Tensor | None]: + num_reqs = len(finished_mask) + + if isinstance(raw_pooler_output, torch.Tensor): + if raw_pooler_output.shape[0] != num_reqs: + raise ValueError( + "Pooler output batch size does not match finished mask size: " + f"{raw_pooler_output.shape[0]} != {num_reqs}." + ) + + num_finished = sum(finished_mask) + if num_finished == 0: + return [None] * num_reqs + if num_finished == num_reqs: + return list(raw_pooler_output.to("cpu", non_blocking=True)) + + # partial finished + finished_indices = [i for i, include in enumerate(finished_mask) if include] + index_tensor = torch.tensor( + finished_indices, device=raw_pooler_output.device, dtype=torch.long + ) + finished_outputs = raw_pooler_output.index_select(0, index_tensor).to( + "cpu", non_blocking=True + ) + partial_pooler_output: list[torch.Tensor | None] = [None] * num_reqs + for i, out in zip(finished_indices, finished_outputs): + partial_pooler_output[i] = out + return partial_pooler_output + + assert isinstance(raw_pooler_output, list) + if len(raw_pooler_output) != num_reqs: + raise ValueError( + "Pooler output batch size does not match finished mask size: " + f"{len(raw_pooler_output)} != {num_reqs}." + ) + + pooler_output: list[torch.Tensor | None] = [None] * num_reqs + for i, (out, include) in enumerate(zip(raw_pooler_output, finished_mask)): + if include and out is not None: + pooler_output[i] = out.to("cpu", non_blocking=True) + return pooler_output + + class AsyncGPUPoolingModelRunnerOutput(AsyncModelRunnerOutput): def __init__( self, @@ -289,15 +333,11 @@ class AsyncGPUPoolingModelRunnerOutput(AsyncModelRunnerOutput): default_stream = torch.cuda.current_stream() with torch.cuda.stream(async_output_copy_stream): async_output_copy_stream.wait_stream(default_stream) - raw_pooler_output_cpu = json_map_leaves( - lambda x: None if x is None else x.to("cpu", non_blocking=True), - self._raw_pooler_output, + self._model_runner_output.pooler_output = _copy_pooler_output_to_cpu( + raw_pooler_output=self._raw_pooler_output, + finished_mask=finished_mask, ) self.async_copy_ready_event.record() - self._model_runner_output.pooler_output = [ - out if include else None - for out, include in zip(raw_pooler_output_cpu, finished_mask) - ] def get_output(self) -> ModelRunnerOutput: """Copy the device tensors to the host and return a ModelRunnerOutput. @@ -2705,14 +2745,10 @@ class GPUModelRunner( async_output_copy_stream=self.async_output_copy_stream, ) - raw_pooler_output = json_map_leaves( - lambda x: None if x is None else x.to("cpu", non_blocking=True), - raw_pooler_output, + model_runner_output.pooler_output = _copy_pooler_output_to_cpu( + raw_pooler_output=raw_pooler_output, + finished_mask=finished_mask, ) - model_runner_output.pooler_output = [ - out if include else None - for out, include in zip(raw_pooler_output, finished_mask) - ] self._sync_device() return model_runner_output