[Perf] Optimize pooling model redundant copy, 1.8% throughput improvement (#35127)

Signed-off-by: yewentao256 <zhyanwentao@126.com>
This commit is contained in:
Wentao Ye
2026-02-24 07:13:11 -05:00
committed by GitHub
parent c77f3e1207
commit 14561fabfd

View File

@@ -95,7 +95,6 @@ from vllm.sequence import IntermediateTensors
from vllm.tasks import GenerationTask, PoolingTask, SupportedTask
from vllm.tracing import instrument
from vllm.utils import length_from_prompt_token_ids_or_embeds
from vllm.utils.jsontree import json_map_leaves
from vllm.utils.math_utils import cdiv, round_up
from vllm.utils.mem_utils import DeviceMemoryProfiler, format_gib
from vllm.utils.nvtx_pytorch_hooks import PytHooks
@@ -268,6 +267,51 @@ class AsyncGPUModelRunnerOutput(AsyncModelRunnerOutput):
return output
def _copy_pooler_output_to_cpu(
raw_pooler_output: PoolerOutput, finished_mask: list[bool]
) -> list[torch.Tensor | None]:
num_reqs = len(finished_mask)
if isinstance(raw_pooler_output, torch.Tensor):
if raw_pooler_output.shape[0] != num_reqs:
raise ValueError(
"Pooler output batch size does not match finished mask size: "
f"{raw_pooler_output.shape[0]} != {num_reqs}."
)
num_finished = sum(finished_mask)
if num_finished == 0:
return [None] * num_reqs
if num_finished == num_reqs:
return list(raw_pooler_output.to("cpu", non_blocking=True))
# partial finished
finished_indices = [i for i, include in enumerate(finished_mask) if include]
index_tensor = torch.tensor(
finished_indices, device=raw_pooler_output.device, dtype=torch.long
)
finished_outputs = raw_pooler_output.index_select(0, index_tensor).to(
"cpu", non_blocking=True
)
partial_pooler_output: list[torch.Tensor | None] = [None] * num_reqs
for i, out in zip(finished_indices, finished_outputs):
partial_pooler_output[i] = out
return partial_pooler_output
assert isinstance(raw_pooler_output, list)
if len(raw_pooler_output) != num_reqs:
raise ValueError(
"Pooler output batch size does not match finished mask size: "
f"{len(raw_pooler_output)} != {num_reqs}."
)
pooler_output: list[torch.Tensor | None] = [None] * num_reqs
for i, (out, include) in enumerate(zip(raw_pooler_output, finished_mask)):
if include and out is not None:
pooler_output[i] = out.to("cpu", non_blocking=True)
return pooler_output
class AsyncGPUPoolingModelRunnerOutput(AsyncModelRunnerOutput):
def __init__(
self,
@@ -289,15 +333,11 @@ class AsyncGPUPoolingModelRunnerOutput(AsyncModelRunnerOutput):
default_stream = torch.cuda.current_stream()
with torch.cuda.stream(async_output_copy_stream):
async_output_copy_stream.wait_stream(default_stream)
raw_pooler_output_cpu = json_map_leaves(
lambda x: None if x is None else x.to("cpu", non_blocking=True),
self._raw_pooler_output,
self._model_runner_output.pooler_output = _copy_pooler_output_to_cpu(
raw_pooler_output=self._raw_pooler_output,
finished_mask=finished_mask,
)
self.async_copy_ready_event.record()
self._model_runner_output.pooler_output = [
out if include else None
for out, include in zip(raw_pooler_output_cpu, finished_mask)
]
def get_output(self) -> ModelRunnerOutput:
"""Copy the device tensors to the host and return a ModelRunnerOutput.
@@ -2705,14 +2745,10 @@ class GPUModelRunner(
async_output_copy_stream=self.async_output_copy_stream,
)
raw_pooler_output = json_map_leaves(
lambda x: None if x is None else x.to("cpu", non_blocking=True),
raw_pooler_output,
model_runner_output.pooler_output = _copy_pooler_output_to_cpu(
raw_pooler_output=raw_pooler_output,
finished_mask=finished_mask,
)
model_runner_output.pooler_output = [
out if include else None
for out, include in zip(raw_pooler_output, finished_mask)
]
self._sync_device()
return model_runner_output