[Model Runner V2] Minor CPU optimizations (#34856)
Signed-off-by: Nick Hill <nickhill123@gmail.com>
This commit is contained in:
@@ -513,8 +513,8 @@ class MessageQueue:
|
|||||||
assert self._is_local_reader, "Only readers can acquire read"
|
assert self._is_local_reader, "Only readers can acquire read"
|
||||||
start_time = time.monotonic()
|
start_time = time.monotonic()
|
||||||
n_warning = 1
|
n_warning = 1
|
||||||
while True:
|
with self.buffer.get_metadata(self.current_idx) as metadata_buffer:
|
||||||
with self.buffer.get_metadata(self.current_idx) as metadata_buffer:
|
while True:
|
||||||
# Memory fence ensures we see the latest writes from the writer.
|
# Memory fence ensures we see the latest writes from the writer.
|
||||||
# Without this, we may read stale flags from our CPU cache
|
# Without this, we may read stale flags from our CPU cache
|
||||||
# and spin indefinitely even though writer has updated them.
|
# and spin indefinitely even though writer has updated them.
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
# SPDX-License-Identifier: Apache-2.0
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
||||||
|
import contextlib
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import torch
|
import torch
|
||||||
@@ -14,6 +15,7 @@ class AsyncOutput(AsyncModelRunnerOutput):
|
|||||||
model_runner_output: ModelRunnerOutput,
|
model_runner_output: ModelRunnerOutput,
|
||||||
sampler_output: SamplerOutput,
|
sampler_output: SamplerOutput,
|
||||||
num_sampled_tokens: torch.Tensor,
|
num_sampled_tokens: torch.Tensor,
|
||||||
|
main_stream: torch.cuda.Stream,
|
||||||
copy_stream: torch.cuda.Stream,
|
copy_stream: torch.cuda.Stream,
|
||||||
copy_event: torch.cuda.Event,
|
copy_event: torch.cuda.Event,
|
||||||
):
|
):
|
||||||
@@ -25,9 +27,8 @@ class AsyncOutput(AsyncModelRunnerOutput):
|
|||||||
self.num_sampled_tokens = num_sampled_tokens
|
self.num_sampled_tokens = num_sampled_tokens
|
||||||
self.copy_event = copy_event
|
self.copy_event = copy_event
|
||||||
|
|
||||||
default_stream = torch.cuda.current_stream()
|
with stream(copy_stream, main_stream):
|
||||||
with torch.cuda.stream(copy_stream):
|
copy_stream.wait_stream(main_stream)
|
||||||
copy_stream.wait_stream(default_stream)
|
|
||||||
|
|
||||||
self.sampled_token_ids = async_copy_to_np(sampler_output.sampled_token_ids)
|
self.sampled_token_ids = async_copy_to_np(sampler_output.sampled_token_ids)
|
||||||
self.logprobs_tensors: LogprobsTensors | None = None
|
self.logprobs_tensors: LogprobsTensors | None = None
|
||||||
@@ -71,3 +72,15 @@ class AsyncOutput(AsyncModelRunnerOutput):
|
|||||||
|
|
||||||
def async_copy_to_np(x: torch.Tensor) -> np.ndarray:
|
def async_copy_to_np(x: torch.Tensor) -> np.ndarray:
|
||||||
return x.to("cpu", non_blocking=True).numpy()
|
return x.to("cpu", non_blocking=True).numpy()
|
||||||
|
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def stream(to_stream: torch.cuda.Stream, from_stream: torch.cuda.Stream):
|
||||||
|
"""Lightweight version of torch.cuda.stream() context manager which
|
||||||
|
avoids current_stream and device lookups.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
torch.cuda.set_stream(to_stream)
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
torch.cuda.set_stream(from_stream)
|
||||||
|
|||||||
@@ -22,7 +22,6 @@ def async_copy_to_gpu(
|
|||||||
if isinstance(x, np.ndarray):
|
if isinstance(x, np.ndarray):
|
||||||
x = torch.from_numpy(x)
|
x = torch.from_numpy(x)
|
||||||
assert x.is_cpu
|
assert x.is_cpu
|
||||||
assert not x.is_pinned()
|
|
||||||
|
|
||||||
if out is None:
|
if out is None:
|
||||||
assert device is not None
|
assert device is not None
|
||||||
@@ -30,6 +29,8 @@ def async_copy_to_gpu(
|
|||||||
|
|
||||||
# CPU-to-CPU copy
|
# CPU-to-CPU copy
|
||||||
tmp = x.pin_memory()
|
tmp = x.pin_memory()
|
||||||
|
assert tmp is not x
|
||||||
|
|
||||||
# CPU-to-GPU copy
|
# CPU-to-GPU copy
|
||||||
return out.copy_(tmp, non_blocking=True)
|
return out.copy_(tmp, non_blocking=True)
|
||||||
|
|
||||||
@@ -75,11 +76,8 @@ class UvaBufferPool:
|
|||||||
out: torch.Tensor | None = None,
|
out: torch.Tensor | None = None,
|
||||||
) -> torch.Tensor:
|
) -> torch.Tensor:
|
||||||
uva = self.copy_to_uva(x)
|
uva = self.copy_to_uva(x)
|
||||||
if out is None:
|
|
||||||
# CPU-to-GPU copy
|
|
||||||
return uva.clone()
|
|
||||||
# CPU-to-GPU copy
|
# CPU-to-GPU copy
|
||||||
return out.copy_(uva, non_blocking=True)
|
return uva.clone() if out is None else out.copy_(uva, non_blocking=True)
|
||||||
|
|
||||||
|
|
||||||
class UvaBackedTensor:
|
class UvaBackedTensor:
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
# SPDX-License-Identifier: Apache-2.0
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
||||||
|
import functools
|
||||||
import gc
|
import gc
|
||||||
import time
|
import time
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
@@ -239,6 +240,11 @@ class GPUModelRunner(LoRAModelRunnerMixin):
|
|||||||
def get_model(self) -> nn.Module:
|
def get_model(self) -> nn.Module:
|
||||||
return self.model
|
return self.model
|
||||||
|
|
||||||
|
@functools.cached_property
|
||||||
|
def main_stream(self) -> torch.cuda.Stream:
|
||||||
|
# Cache the default CUDA stream to avoid lookup overhead.
|
||||||
|
return torch.cuda.current_stream(self.device)
|
||||||
|
|
||||||
def get_kv_cache_spec(self):
|
def get_kv_cache_spec(self):
|
||||||
return get_kv_cache_spec(self.vllm_config)
|
return get_kv_cache_spec(self.vllm_config)
|
||||||
|
|
||||||
@@ -1065,6 +1071,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
|
|||||||
model_runner_output=model_runner_output,
|
model_runner_output=model_runner_output,
|
||||||
sampler_output=sampler_output,
|
sampler_output=sampler_output,
|
||||||
num_sampled_tokens=num_sampled,
|
num_sampled_tokens=num_sampled,
|
||||||
|
main_stream=self.main_stream,
|
||||||
copy_stream=self.output_copy_stream,
|
copy_stream=self.output_copy_stream,
|
||||||
copy_event=self.output_copy_event,
|
copy_event=self.output_copy_event,
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user