diff --git a/.buildkite/scripts/scheduled_integration_test/deepseek_v2_lite_prefetch_offload.sh b/.buildkite/scripts/scheduled_integration_test/deepseek_v2_lite_prefetch_offload.sh new file mode 100755 index 000000000..dddf23f1f --- /dev/null +++ b/.buildkite/scripts/scheduled_integration_test/deepseek_v2_lite_prefetch_offload.sh @@ -0,0 +1,57 @@ +#!/usr/bin/env bash +set -euxo pipefail + +# Nightly e2e test for prefetch offloading with a MoE model. +# Runs DeepSeek-V2-Lite with prefetch offloading of MoE expert weights +# and validates GSM8K accuracy matches baseline (no offloading). +# +# args: [THRESHOLD] [NUM_QUESTIONS] [START_PORT] +THRESHOLD=${1:-0.25} +NUM_Q=${2:-1319} +PORT=${3:-8030} +OUT_DIR=${OUT_DIR:-/tmp/vllm-scheduled} +mkdir -p "${OUT_DIR}" + +wait_for_server() { + local port=$1 + timeout 600 bash -c ' + until curl -sf "http://127.0.0.1:'"$port"'/health" > /dev/null; do + sleep 1 + done' +} + +MODEL="deepseek-ai/DeepSeek-V2-Lite" + +cleanup() { + if [[ -n "${SERVER_PID:-}" ]] && kill -0 "${SERVER_PID}" 2>/dev/null; then + kill "${SERVER_PID}" 2>/dev/null || true + for _ in {1..20}; do + kill -0 "${SERVER_PID}" 2>/dev/null || break + sleep 0.5 + done + kill -9 "${SERVER_PID}" 2>/dev/null || true + fi +} +trap cleanup EXIT + +vllm serve "$MODEL" \ + --max-model-len 2048 \ + --offload-group-size 8 \ + --offload-num-in-group 2 \ + --offload-prefetch-step 1 \ + --offload-params w13_weight w2_weight \ + --port "$PORT" & +SERVER_PID=$! +wait_for_server "$PORT" + +TAG=$(echo "$MODEL" | tr '/: \\n' '_____') +OUT="${OUT_DIR}/${TAG}_prefetch_offload.json" +python3 tests/evals/gsm8k/gsm8k_eval.py --host http://127.0.0.1 --port "$PORT" --num-questions "${NUM_Q}" --save-results "${OUT}" +python3 - <= ${THRESHOLD}, f"${MODEL} prefetch_offload accuracy {acc}" +PY + +cleanup +SERVER_PID= diff --git a/.buildkite/test_areas/e2e_integration.yaml b/.buildkite/test_areas/e2e_integration.yaml index d95b73073..5b7f96bc7 100644 --- a/.buildkite/test_areas/e2e_integration.yaml +++ b/.buildkite/test_areas/e2e_integration.yaml @@ -28,3 +28,12 @@ steps: working_dir: "/vllm-workspace" commands: - bash .buildkite/scripts/scheduled_integration_test/qwen30b_a3b_fp8_block_ep_eplb.sh 0.8 200 8020 2 1 + +- label: DeepSeek V2-Lite Prefetch Offload Accuracy (H100) + timeout_in_minutes: 60 + device: h100 + optional: true + num_devices: 1 + working_dir: "/vllm-workspace" + commands: + - bash .buildkite/scripts/scheduled_integration_test/deepseek_v2_lite_prefetch_offload.sh 0.25 200 8030 diff --git a/tests/basic_correctness/test_prefetch_offload.py b/tests/basic_correctness/test_prefetch_offload.py new file mode 100644 index 000000000..498887024 --- /dev/null +++ b/tests/basic_correctness/test_prefetch_offload.py @@ -0,0 +1,33 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Test prefetch offloading correctness with Llama model.""" + +from ..utils import compare_two_settings + + +def test_prefetch_offload_llama(): + """Test prefetch CPU offloading with Llama-3.2-1B-Instruct. + + Compares outputs between: + 1. Baseline (no offloading) + 2. Prefetch offloading (group_size=8, num_in_group=2, prefetch_step=1) + + This tests prefetching-based offloading on a dense model. + """ + compare_two_settings( + "meta-llama/Llama-3.2-1B-Instruct", + [ + # Prefetch offloading configuration + "--offload-group-size", + "8", + "--offload-num-in-group", + "2", + "--offload-prefetch-step", + "1", + # Selective offloading: only MLP weights + "--offload-params", + "gate_up_proj", + "down_proj", + ], + [], # Baseline: no offloading + ) diff --git a/vllm/compilation/cuda_graph.py b/vllm/compilation/cuda_graph.py index 7ffa74d0d..7bada5e7c 100644 --- a/vllm/compilation/cuda_graph.py +++ b/vllm/compilation/cuda_graph.py @@ -17,6 +17,7 @@ from vllm.config import CUDAGraphMode, VllmConfig from vllm.distributed.device_communicators.pynccl_allocator import set_graph_pool_id from vllm.forward_context import BatchDescriptor, get_forward_context from vllm.logger import init_logger +from vllm.model_executor.offloader.base import get_offloader from vllm.platforms import current_platform from vllm.utils.torch_utils import current_stream, weak_ref_tensors @@ -265,6 +266,11 @@ class CUDAGraphWrapper: set_graph_pool_id(self.graph_pool) else: set_graph_pool_id(current_platform.graph_pool_handle()) + + # Sync offloader's copy stream before capture. + # Ensure any pre-capture prefetches from offloader are complete. + get_offloader().sync_prev_onload() + # mind-exploding: carefully manage the reference and memory. with torch.cuda.graph( cudagraph, @@ -273,6 +279,11 @@ class CUDAGraphWrapper: ): # `output` is managed by pytorch's cudagraph pool output = self.runnable(*args, **kwargs) + # Join offloader's copy stream after forward to avoid + # unjoined stream error. The last layer's start_prefetch + # forks copy_stream, but wait_prefetch only happens in + # the next forward pass. + get_offloader().join_after_forward() if self.cudagraph_options.weak_ref_output: # by converting it to weak ref, # the original `output` will immediately be released @@ -305,5 +316,8 @@ class CUDAGraphWrapper: f"got {new_input_addresses}" ) + # Sync offloader before replay - ensures any external dependencies + # from pre-capture prefetches are satisfied. + get_offloader().sync_prev_onload() entry.cudagraph.replay() return entry.output diff --git a/vllm/config/__init__.py b/vllm/config/__init__.py index 5bcf9865c..452fb0466 100644 --- a/vllm/config/__init__.py +++ b/vllm/config/__init__.py @@ -24,6 +24,12 @@ from vllm.config.model import ( ) from vllm.config.multimodal import MultiModalConfig from vllm.config.observability import ObservabilityConfig +from vllm.config.offload import ( + OffloadBackend, + OffloadConfig, + PrefetchOffloadConfig, + UVAOffloadConfig, +) from vllm.config.parallel import EPLBConfig, ParallelConfig from vllm.config.pooler import PoolerConfig from vllm.config.profiler import ProfilerConfig @@ -85,6 +91,11 @@ __all__ = [ "MultiModalConfig", # From vllm.config.observability "ObservabilityConfig", + # From vllm.config.offload + "OffloadBackend", + "OffloadConfig", + "PrefetchOffloadConfig", + "UVAOffloadConfig", # From vllm.config.parallel "EPLBConfig", "ParallelConfig", diff --git a/vllm/config/cache.py b/vllm/config/cache.py index daceaa6c2..39ceb3920 100644 --- a/vllm/config/cache.py +++ b/vllm/config/cache.py @@ -100,17 +100,15 @@ class CacheConfig: load a 13B model with BF16 weight, which requires at least 26GB GPU memory. Note that this requires fast CPU-GPU interconnect, as part of the model is loaded from CPU memory to GPU memory on the fly in each model forward pass. + + DEPRECATED: This field is deprecated and will be removed in v0.16. + Please use OffloadConfig.uva.cpu_offload_gb instead. """ cpu_offload_params: set[str] = Field(default_factory=set) - """ The set of parameter name segments to target for CPU offloading. - Unmatched parameters are not offloaded. If this set is empty, parameters - are offloaded non-selectively until the memory limit defined by - `cpu_offload_gb` is reached. - Examples: - - For parameter name "mlp.experts.w2_weight": - - "experts" or "experts.w2_weight" will match. - - "expert" or "w2" will NOT match (must be exact segments). - This allows distinguishing parameters like "w2_weight" and "w2_weight_scale". + """The set of parameter name segments to target for CPU offloading. + + DEPRECATED: This field is deprecated and will be removed in v0.16. + Please use OffloadConfig.uva.cpu_offload_params instead. """ calculate_kv_scales: bool = False """This enables dynamic calculation of `k_scale` and `v_scale` when diff --git a/vllm/config/offload.py b/vllm/config/offload.py new file mode 100644 index 000000000..ad65e8acf --- /dev/null +++ b/vllm/config/offload.py @@ -0,0 +1,153 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Configuration for model weight offloading.""" + +import warnings +from typing import Literal + +from pydantic import Field, model_validator + +from vllm.config.utils import config + +OffloadBackend = Literal["auto", "uva", "prefetch"] + + +@config +class UVAOffloadConfig: + """Configuration for UVA (Unified Virtual Addressing) CPU offloading. + + Uses zero-copy access from CPU-pinned memory. Simple but requires + fast CPU-GPU interconnect. + """ + + cpu_offload_gb: float = Field(default=0, ge=0) + """The space in GiB to offload to CPU, per GPU. Default is 0, which means + no offloading. Intuitively, this argument can be seen as a virtual way to + increase the GPU memory size. For example, if you have one 24 GB GPU and + set this to 10, virtually you can think of it as a 34 GB GPU. Then you can + load a 13B model with BF16 weight, which requires at least 26GB GPU memory. + Note that this requires fast CPU-GPU interconnect, as part of the model is + loaded from CPU memory to GPU memory on the fly in each model forward pass. + This uses UVA (Unified Virtual Addressing) for zero-copy access. + """ + + cpu_offload_params: set[str] = Field(default_factory=set) + """The set of parameter name segments to target for CPU offloading. + Unmatched parameters are not offloaded. If this set is empty, parameters + are offloaded non-selectively until the memory limit defined by + `cpu_offload_gb` is reached. + Examples: + - For parameter name "mlp.experts.w2_weight": + - "experts" or "experts.w2_weight" will match. + - "expert" or "w2" will NOT match (must be exact segments). + This allows distinguishing parameters like "w2_weight" and "w2_weight_scale". + """ + + +@config +class PrefetchOffloadConfig: + """Configuration for prefetch-based CPU offloading. + + Groups layers and uses async H2D prefetch to hide transfer latency. + """ + + offload_group_size: int = Field(default=0, ge=0) + """Group every N layers together. Offload last `offload_num_in_group` + layers of each group. Default is 0 (disabled). + Example: group_size=8, num_in_group=2 offloads layers 6,7,14,15,22,23,... + Unlike cpu_offload_gb, this uses explicit async prefetching to hide transfer + latency. + """ + + offload_num_in_group: int = Field(default=1, ge=1) + """Number of layers to offload per group. + Must be <= offload_group_size. Default is 1.""" + + offload_prefetch_step: int = Field(default=1, ge=0) + """Number of layers to prefetch ahead. + Higher values hide more latency but use more GPU memory. Default is 1.""" + + offload_params: set[str] = Field(default_factory=set) + """The set of parameter name segments to target for prefetch offloading. + Unmatched parameters are not offloaded. If this set is empty, ALL + parameters of each offloaded layer are offloaded. + Uses segment matching: "w13_weight" matches "mlp.experts.w13_weight" + but not "mlp.experts.w13_weight_scale". + """ + + +@config +class OffloadConfig: + """Configuration for model weight offloading to reduce GPU memory usage.""" + + offload_backend: OffloadBackend = "auto" + """The backend for weight offloading. Options: + - "auto": Selects based on which sub-config has non-default values + (prefetch if offload_group_size > 0, uva if cpu_offload_gb > 0). + - "uva": UVA (Unified Virtual Addressing) zero-copy offloading. + - "prefetch": Async prefetch with group-based layer offloading. + """ + + uva: UVAOffloadConfig = Field(default_factory=UVAOffloadConfig) + """Parameters for UVA offloading backend.""" + + prefetch: PrefetchOffloadConfig = Field(default_factory=PrefetchOffloadConfig) + """Parameters for prefetch offloading backend.""" + + @model_validator(mode="after") + def validate_offload_config(self) -> "OffloadConfig": + """Validate offload configuration constraints.""" + if self.offload_backend == "prefetch" or self.prefetch.offload_group_size > 0: + if self.prefetch.offload_num_in_group > self.prefetch.offload_group_size: + raise ValueError( + f"offload_num_in_group ({self.prefetch.offload_num_in_group})" + f" must be <= offload_group_size" + f" ({self.prefetch.offload_group_size})" + ) + if self.prefetch.offload_prefetch_step < 1: + raise ValueError( + f"offload_prefetch_step" + f" ({self.prefetch.offload_prefetch_step})" + f" must be >= 1 when prefetch offloading is enabled" + f" (offload_group_size > 0)" + ) + + # Warn if both backends have non-default values + uva_active = self.uva.cpu_offload_gb > 0 + prefetch_active = self.prefetch.offload_group_size > 0 + if self.offload_backend == "uva" and prefetch_active: + warnings.warn( + "Prefetch offload fields are set but offload_backend='uva'. " + "Prefetch settings will be ignored.", + stacklevel=2, + ) + elif self.offload_backend == "prefetch" and uva_active: + warnings.warn( + "UVA offload fields are set but offload_backend='prefetch'. " + "UVA settings will be ignored.", + stacklevel=2, + ) + elif self.offload_backend == "auto" and uva_active and prefetch_active: + warnings.warn( + "Both UVA and prefetch offload fields are set with " + "offload_backend='auto'. Prefetch backend will be selected. " + "Set offload_backend explicitly to suppress this warning.", + stacklevel=2, + ) + return self + + def compute_hash(self) -> str: + """ + Provide a hash that uniquely identifies all the offload configs. + + All fields are included because PrefetchOffloader patches module + forwards and inserts custom ops (wait_prefetch, start_prefetch) + into the computation graph. Changing any offload setting can + alter which layers are hooked and how prefetch indices are + computed, so the compilation cache must distinguish them. + """ + from vllm.config.utils import get_hash_factors, hash_factors + + factors = get_hash_factors(self, ignored_factors=set()) + hash_str = hash_factors(factors) + return hash_str diff --git a/vllm/config/vllm.py b/vllm/config/vllm.py index d7deadd50..33d486263 100644 --- a/vllm/config/vllm.py +++ b/vllm/config/vllm.py @@ -37,6 +37,7 @@ from .load import LoadConfig from .lora import LoRAConfig from .model import ModelConfig from .observability import ObservabilityConfig +from .offload import OffloadConfig from .parallel import ParallelConfig from .profiler import ProfilerConfig from .scheduler import SchedulerConfig @@ -259,6 +260,8 @@ class VllmConfig: """Device configuration.""" load_config: LoadConfig = Field(default_factory=LoadConfig) """Load configuration.""" + offload_config: OffloadConfig = Field(default_factory=OffloadConfig) + """Model weight offloading configuration.""" attention_config: AttentionConfig = Field(default_factory=AttentionConfig) """Attention configuration.""" kernel_config: KernelConfig = Field(default_factory=KernelConfig) @@ -361,6 +364,10 @@ class VllmConfig: vllm_factors.append(self.load_config.compute_hash()) else: vllm_factors.append("None") + if self.offload_config: + vllm_factors.append(self.offload_config.compute_hash()) + else: + vllm_factors.append("None") if self.attention_config: vllm_factors.append(self.attention_config.compute_hash()) else: diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index a962baba2..15a662ba2 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -48,12 +48,15 @@ from vllm.config import ( ModelConfig, MultiModalConfig, ObservabilityConfig, + OffloadConfig, ParallelConfig, PoolerConfig, + PrefetchOffloadConfig, ProfilerConfig, SchedulerConfig, SpeculativeConfig, StructuredOutputsConfig, + UVAOffloadConfig, VllmConfig, WeightTransferConfig, get_attr_docs, @@ -439,8 +442,13 @@ class EngineArgs: disable_sliding_window: bool = ModelConfig.disable_sliding_window disable_cascade_attn: bool = ModelConfig.disable_cascade_attn swap_space: float = CacheConfig.swap_space - cpu_offload_gb: float = CacheConfig.cpu_offload_gb - cpu_offload_params: set[str] = get_field(CacheConfig, "cpu_offload_params") + offload_backend: str = OffloadConfig.offload_backend + cpu_offload_gb: float = UVAOffloadConfig.cpu_offload_gb + cpu_offload_params: set[str] = get_field(UVAOffloadConfig, "cpu_offload_params") + offload_group_size: int = PrefetchOffloadConfig.offload_group_size + offload_num_in_group: int = PrefetchOffloadConfig.offload_num_in_group + offload_prefetch_step: int = PrefetchOffloadConfig.offload_prefetch_step + offload_params: set[str] = get_field(PrefetchOffloadConfig, "offload_params") gpu_memory_utilization: float = CacheConfig.gpu_memory_utilization kv_cache_memory_bytes: int | None = CacheConfig.kv_cache_memory_bytes max_num_batched_tokens: int | None = None @@ -948,10 +956,6 @@ class EngineArgs: cache_group.add_argument( "--prefix-caching-hash-algo", **cache_kwargs["prefix_caching_hash_algo"] ) - cache_group.add_argument("--cpu-offload-gb", **cache_kwargs["cpu_offload_gb"]) - cache_group.add_argument( - "--cpu-offload-params", **cache_kwargs["cpu_offload_params"] - ) cache_group.add_argument( "--calculate-kv-scales", **cache_kwargs["calculate_kv_scales"] ) @@ -977,6 +981,37 @@ class EngineArgs: "--kv-offloading-backend", **cache_kwargs["kv_offloading_backend"] ) + # Model weight offload related configs + offload_kwargs = get_kwargs(OffloadConfig) + uva_kwargs = get_kwargs(UVAOffloadConfig) + prefetch_kwargs = get_kwargs(PrefetchOffloadConfig) + offload_group = parser.add_argument_group( + title="OffloadConfig", + description=OffloadConfig.__doc__, + ) + offload_group.add_argument( + "--offload-backend", **offload_kwargs["offload_backend"] + ) + offload_group.add_argument("--cpu-offload-gb", **uva_kwargs["cpu_offload_gb"]) + offload_group.add_argument( + "--cpu-offload-params", **uva_kwargs["cpu_offload_params"] + ) + offload_group.add_argument( + "--offload-group-size", + **prefetch_kwargs["offload_group_size"], + ) + offload_group.add_argument( + "--offload-num-in-group", + **prefetch_kwargs["offload_num_in_group"], + ) + offload_group.add_argument( + "--offload-prefetch-step", + **prefetch_kwargs["offload_prefetch_step"], + ) + offload_group.add_argument( + "--offload-params", **prefetch_kwargs["offload_params"] + ) + # Multimodal related configs multimodal_kwargs = get_kwargs(MultiModalConfig) multimodal_group = parser.add_argument_group( @@ -1466,8 +1501,6 @@ class EngineArgs: sliding_window=sliding_window, enable_prefix_caching=self.enable_prefix_caching, prefix_caching_hash_algo=self.prefix_caching_hash_algo, - cpu_offload_gb=self.cpu_offload_gb, - cpu_offload_params=self.cpu_offload_params, calculate_kv_scales=self.calculate_kv_scales, kv_sharing_fast_prefill=self.kv_sharing_fast_prefill, mamba_cache_dtype=self.mamba_cache_dtype, @@ -1825,6 +1858,21 @@ class EngineArgs: compilation_config.max_cudagraph_capture_size = ( self.max_cudagraph_capture_size ) + + offload_config = OffloadConfig( + offload_backend=self.offload_backend, + uva=UVAOffloadConfig( + cpu_offload_gb=self.cpu_offload_gb, + cpu_offload_params=self.cpu_offload_params, + ), + prefetch=PrefetchOffloadConfig( + offload_group_size=self.offload_group_size, + offload_num_in_group=self.offload_num_in_group, + offload_prefetch_step=self.offload_prefetch_step, + offload_params=self.offload_params, + ), + ) + config = VllmConfig( model_config=model_config, cache_config=cache_config, @@ -1832,6 +1880,7 @@ class EngineArgs: scheduler_config=scheduler_config, device_config=device_config, load_config=load_config, + offload_config=offload_config, attention_config=attention_config, kernel_config=kernel_config, lora_config=lora_config, diff --git a/vllm/entrypoints/llm.py b/vllm/entrypoints/llm.py index 2d925d0a9..ee78d4d48 100644 --- a/vllm/entrypoints/llm.py +++ b/vllm/entrypoints/llm.py @@ -170,6 +170,19 @@ class LLM: the model weights. This virtually increases the GPU memory space you can use to hold the model weights, at the cost of CPU-GPU data transfer for every forward pass. + offload_group_size: Prefetch offloading: Group every N layers + together. Offload last `offload_num_in_group` layers of each group. + Default is 0 (disabled). + offload_num_in_group: Prefetch offloading: Number of layers to + offload per group. Default is 1. + offload_prefetch_step: Prefetch offloading: Number of layers to + prefetch ahead. Higher values hide more latency but use more GPU + memory. Default is 1. + offload_params: Prefetch offloading: Set of parameter name segments + to selectively offload. Only parameters whose names contain one of + these segments will be offloaded (e.g., {"gate_up_proj", "down_proj"} + for MLP weights, or {"w13_weight", "w2_weight"} for MoE expert + weights). If None or empty, all parameters are offloaded. enforce_eager: Whether to enforce eager execution. If True, we will disable CUDA graph and always execute the model in eager mode. If False, we will use CUDA graph and eager execution in hybrid. @@ -224,6 +237,10 @@ class LLM: gpu_memory_utilization: float = 0.9, swap_space: float = 4, cpu_offload_gb: float = 0, + offload_group_size: int = 0, + offload_num_in_group: int = 1, + offload_prefetch_step: int = 1, + offload_params: set[str] | None = None, enforce_eager: bool = False, enable_return_routed_experts: bool = False, disable_custom_all_reduce: bool = False, @@ -333,6 +350,10 @@ class LLM: kv_cache_memory_bytes=kv_cache_memory_bytes, swap_space=swap_space, cpu_offload_gb=cpu_offload_gb, + offload_group_size=offload_group_size, + offload_num_in_group=offload_num_in_group, + offload_prefetch_step=offload_prefetch_step, + offload_params=offload_params or set(), enforce_eager=enforce_eager, enable_return_routed_experts=enable_return_routed_experts, disable_custom_all_reduce=disable_custom_all_reduce, diff --git a/vllm/model_executor/models/utils.py b/vllm/model_executor/models/utils.py index 658742489..c55693bcf 100644 --- a/vllm/model_executor/models/utils.py +++ b/vllm/model_executor/models/utils.py @@ -9,11 +9,9 @@ from typing import Any, Literal, Protocol, overload import torch import torch.nn as nn -from torch.func import functional_call from torch.nn.modules.module import register_module_module_registration_hook from transformers import PretrainedConfig -import vllm.envs as envs from vllm.config import VllmConfig from vllm.distributed import ( get_tensor_model_parallel_rank, @@ -31,14 +29,11 @@ from vllm.model_executor.models.interfaces import supports_any_eagle from vllm.multimodal import NestedTensors from vllm.sequence import IntermediateTensors from vllm.utils.math_utils import cdiv -from vllm.utils.mem_utils import format_gib from vllm.utils.platform_utils import ( is_pin_memory_available, - is_uva_available, ) from vllm.utils.torch_utils import ( direct_register_custom_op, - get_accelerator_view_from_cpu_tensor, ) logger = init_logger(__name__) @@ -612,98 +607,6 @@ class PPMissingLayer(torch.nn.Identity): return args[0] if args else next(iter(kwargs.values())) -_CPU_OFFLOAD_BYTES = 0 -_CPU_OFFLOAD_MAX_BYTES = 0 -_CPU_OFFLOAD_PARAMS = set() - - -def set_cpu_offload_max_bytes(max_bytes: int) -> None: - global _CPU_OFFLOAD_MAX_BYTES, _CPU_OFFLOAD_BYTES - _CPU_OFFLOAD_BYTES = 0 - _CPU_OFFLOAD_MAX_BYTES = max_bytes - - -def set_cpu_offload_params(params: set[str]) -> None: - global _CPU_OFFLOAD_PARAMS - _CPU_OFFLOAD_PARAMS = params - - -def maybe_offload_to_cpu(module: torch.nn.Module) -> torch.nn.Module: - if (params := next(module.parameters(), None)) is None: - return module - - device = params.device - - if device == torch.device("cpu"): - return module - - global _CPU_OFFLOAD_MAX_BYTES, _CPU_OFFLOAD_BYTES - if _CPU_OFFLOAD_BYTES >= _CPU_OFFLOAD_MAX_BYTES: - return module - - pin_memory = ( - is_pin_memory_available() and not envs.VLLM_WEIGHT_OFFLOADING_DISABLE_PIN_MEMORY - ) - uva_offloading = is_uva_available() and not envs.VLLM_WEIGHT_OFFLOADING_DISABLE_UVA - - # offload parameters to CPU - # use pin_memory if possible, which helps cudagraph capture speed - offloaded_parameters = False - for name, p in module.named_parameters(): - if _CPU_OFFLOAD_BYTES >= _CPU_OFFLOAD_MAX_BYTES: - # we use per-parameter offloading - # one module might have some parameters offloaded and some not - break - - if _CPU_OFFLOAD_PARAMS: - # Check if parameter belongs to the offloading set - # Add dots here to ensure we match full segments only - # e.g., "experts.w2_weight" matches "mlp.experts.w2_weight" but not - # "mlp.experts.w2_weight_scale" - should_offload = any( - f".{param}." in f".{name}." for param in _CPU_OFFLOAD_PARAMS - ) - if not should_offload: - continue - - cpu_data = p.data.to(device="cpu") - if pin_memory: - cpu_data = cpu_data.pin_memory() - - if not uva_offloading: - p.data = cpu_data - else: - p.data = get_accelerator_view_from_cpu_tensor(cpu_data) - p._vllm_is_uva_offloaded = True - - _CPU_OFFLOAD_BYTES += p.data.numel() * p.data.element_size() - offloaded_parameters = True - - if offloaded_parameters and not uva_offloading: - original_forward = module.forward - - def forward(*args, **kwargs): - module.forward = original_forward - device_state = { - # here we blindly call `to(device)` - # if the parameter is already on the device, it will be a no-op - k: v.to(device, non_blocking=True) - for k, v in module.state_dict().items() - } - - # set `tie_weights=False` as tied weights in original model - # become untied when calling .to(device) individually - output = functional_call( - module, device_state, args=args, kwargs=kwargs, tie_weights=False - ) - module.forward = forward - return output - - module.forward = forward - - return module - - def make_layers( num_hidden_layers: int, layer_fn: LayerFn, @@ -711,25 +614,31 @@ def make_layers( ) -> tuple[int, int, torch.nn.ModuleList]: """Make a list of layers with the given layer function, taking pipeline parallelism into account. + + Args: + num_hidden_layers: Total number of hidden layers in the model. + layer_fn: Function to create a layer given its index. + prefix: Prefix for layer names. + + Returns: + Tuple of (start_layer, end_layer, modules). """ from vllm.distributed.parallel_state import get_pp_group from vllm.distributed.utils import get_pp_indices + from vllm.model_executor.offloader import get_offloader start_layer, end_layer = get_pp_indices( num_hidden_layers, get_pp_group().rank_in_group, get_pp_group().world_size ) + modules = torch.nn.ModuleList( [PPMissingLayer() for _ in range(start_layer)] - + [ - maybe_offload_to_cpu(layer_fn(prefix=f"{prefix}.{idx}")) - for idx in range(start_layer, end_layer) - ] + + get_offloader().wrap_modules( + layer_fn(prefix=f"{prefix}.{idx}") for idx in range(start_layer, end_layer) + ) + [PPMissingLayer() for _ in range(end_layer, num_hidden_layers)] ) - if _CPU_OFFLOAD_MAX_BYTES > 0: - logger.info( - "Total CPU offloaded parameters: %s GBs", format_gib(_CPU_OFFLOAD_BYTES) - ) + return start_layer, end_layer, modules diff --git a/vllm/model_executor/offloader/__init__.py b/vllm/model_executor/offloader/__init__.py new file mode 100644 index 000000000..a6522ff7c --- /dev/null +++ b/vllm/model_executor/offloader/__init__.py @@ -0,0 +1,23 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Model parameter offloading infrastructure.""" + +from vllm.model_executor.offloader.base import ( + BaseOffloader, + NoopOffloader, + create_offloader, + get_offloader, + set_offloader, +) +from vllm.model_executor.offloader.prefetch import PrefetchOffloader +from vllm.model_executor.offloader.uva import UVAOffloader + +__all__ = [ + "BaseOffloader", + "NoopOffloader", + "UVAOffloader", + "PrefetchOffloader", + "create_offloader", + "get_offloader", + "set_offloader", +] diff --git a/vllm/model_executor/offloader/base.py b/vllm/model_executor/offloader/base.py new file mode 100644 index 000000000..7c61b318b --- /dev/null +++ b/vllm/model_executor/offloader/base.py @@ -0,0 +1,145 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +# Adapted from +# https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/utils/offloader.py +"""Base classes for model parameter offloading.""" + +from abc import ABC, abstractmethod +from collections.abc import Generator +from typing import TYPE_CHECKING + +import torch.nn as nn + +from vllm.logger import init_logger + +if TYPE_CHECKING: + from vllm.config import OffloadConfig + +logger = init_logger(__name__) + + +""" +class relation: + +BaseOffloader (ABC) + * implemented by: UVAOffloader + * implemented by: PrefetchOffloader + * uses: _ModuleOffloader + * uses: _BaseParamOffloader (ABC) + * implemented by: _CpuParamOffloader +""" + + +class BaseOffloader(ABC): + """Base class for model parameter offloading strategies. + + Offloaders control how model parameters are stored and loaded during + inference. Different strategies trade memory for compute/transfer time. + """ + + @abstractmethod + def wrap_modules( + self, + modules_generator: Generator[nn.Module, None, None], + ) -> list[nn.Module]: + """Wrap modules with offloading logic. + + Args: + modules_generator: Generator yielding modules to potentially offload. + + Returns: + List of modules, potentially with offloading hooks installed. + """ + pass + + def post_init(self): + """Called after model construction completes. + + Offloaders can use this to: + - Finalize parameter storage + - Start initial prefetching + - Allocate shared resources + """ + return + + def sync_prev_onload(self) -> None: # noqa: B027 + """Sync previous onload operations. Override in subclasses.""" + pass + + def join_after_forward(self) -> None: # noqa: B027 + """Join streams after forward. Override in subclasses.""" + pass + + def _wait_for_layer(self, layer_idx: int) -> None: # noqa: B027 + """Wait for layer prefetch. Override in subclasses.""" + pass + + def _start_prefetch(self, layer_idx: int) -> None: # noqa: B027 + """Start layer prefetch. Override in subclasses.""" + pass + + +class NoopOffloader(BaseOffloader): + """No-op offloader that returns modules as-is without any offloading.""" + + def wrap_modules( + self, + modules_generator: Generator[nn.Module, None, None], + ) -> list[nn.Module]: + """Return modules unchanged.""" + return list(modules_generator) + + +# Global singleton offloader instance (defaults to no-op). +_instance: BaseOffloader = NoopOffloader() + + +def get_offloader() -> BaseOffloader: + """Get the global offloader instance.""" + return _instance + + +def set_offloader(instance: BaseOffloader) -> None: + """Set the global offloader instance.""" + global _instance + _instance = instance + logger.info("Offloader set to %s", type(instance).__name__) + + +def create_offloader(offload_config: "OffloadConfig") -> BaseOffloader: + """Create an offloader based on the offload configuration. + + Uses the explicit ``offload_backend`` selector. When set to ``"auto"``, + selects prefetch if ``offload_group_size > 0``, UVA if + ``cpu_offload_gb > 0``, otherwise noop. + """ + from vllm.model_executor.offloader.prefetch import PrefetchOffloader + from vllm.model_executor.offloader.uva import UVAOffloader + + backend = offload_config.offload_backend + uva = offload_config.uva + prefetch = offload_config.prefetch + + if backend == "auto": + if prefetch.offload_group_size > 0: + backend = "prefetch" + elif uva.cpu_offload_gb > 0: + backend = "uva" + else: + return NoopOffloader() + + if backend == "prefetch": + return PrefetchOffloader( + group_size=prefetch.offload_group_size, + num_in_group=prefetch.offload_num_in_group, + prefetch_step=prefetch.offload_prefetch_step, + offload_params=prefetch.offload_params, + mode="cpu", + ) + elif backend == "uva": + return UVAOffloader( + cpu_offload_max_bytes=int(uva.cpu_offload_gb * 1024**3), + cpu_offload_params=uva.cpu_offload_params, + ) + else: + return NoopOffloader() diff --git a/vllm/model_executor/offloader/prefetch.py b/vllm/model_executor/offloader/prefetch.py new file mode 100644 index 000000000..b43cb8b7d --- /dev/null +++ b/vllm/model_executor/offloader/prefetch.py @@ -0,0 +1,704 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +# Adapted from +# https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/utils/offloader.py +"""Prefetch-based CPU offloading with async prefetching. + +Uses static buffers and event-based stream forking for torch.compile + +CUDA graph compatibility. Events allow the copy stream to join CUDA +graph captures, ensuring H2D copies are properly captured. +""" + +from abc import ABC, abstractmethod +from collections.abc import Generator +from dataclasses import dataclass +from typing import Any + +import torch +import torch.nn as nn + +# Import prefetch_ops to register custom ops at module load time +import vllm.model_executor.offloader.prefetch_ops # noqa: F401 +from vllm.logger import init_logger +from vllm.model_executor.offloader.base import BaseOffloader +from vllm.utils.platform_utils import is_pin_memory_available + +logger = init_logger(__name__) + + +@dataclass +class ParamInfo: + """Metadata about an offloaded parameter.""" + + name: str + shape: tuple[int, ...] + stride: tuple[int, ...] + dtype: torch.dtype + + @property + def key(self) -> tuple[str, tuple[int, ...], tuple[int, ...], torch.dtype]: + """Unique key for buffer pool grouping. + + Includes parameter name to prevent different parameters with the same + shape from sharing buffers within the same layer. Parameters with the + same name across different layers will share buffers (via slots). + + Includes stride because parameters with same shape but different + strides need separate buffers to preserve memory layout. + """ + return (self.name, self.shape, self.stride, self.dtype) + + @property + def num_bytes(self) -> int: + """Size in bytes.""" + numel = 1 + for dim in self.shape: + numel *= dim + return numel * torch.finfo(self.dtype).bits // 8 + + +class StaticBufferPool: + """Pre-allocated GPU buffer pool for offloaded parameters. + + Allocates slot_capacity copies of each unique parameter + (name, shape, stride, dtype), allowing for double/triple buffering + during prefetch. + + Buffer slots are reused circularly: layer N uses slot (N % slot_capacity). + + The key includes parameter name to prevent different parameters within + the same layer from sharing buffers. Parameters with the same name + across different layers share buffers via the slot mechanism. + """ + + def __init__( + self, + param_infos: list[ParamInfo], + slot_capacity: int, + device: torch.device, + ): + self.slot_capacity = slot_capacity + self.total_bytes = 0 + self._device = device + + # Group by (shape, stride, dtype) - only allocate unique combinations + unique_params: dict[tuple, ParamInfo] = {} + for info in param_infos: + if info.key not in unique_params: + unique_params[info.key] = info + + # Allocate buffers: key -> list of tensors (one per slot) + self._buffers: dict[tuple, list[torch.Tensor]] = {} + for key, info in unique_params.items(): + slot_tensors = [] + for _ in range(slot_capacity): + # Use empty_strided to preserve parameter's memory layout + buf = torch.empty_strided( + size=info.shape, + stride=info.stride, + dtype=info.dtype, + device=device, + ) + slot_tensors.append(buf) + self.total_bytes += info.num_bytes + self._buffers[key] = slot_tensors + + logger.debug( + "[StaticBufferPool] Allocated %d unique (name, shape, stride, dtype), " + "%d slots each, total %.4f GB", + len(unique_params), + slot_capacity, + self.total_bytes / 1e9, + ) + + def get_buffer( + self, + name: str, + shape: tuple[int, ...], + stride: tuple[int, ...], + dtype: torch.dtype, + slot_idx: int, + ) -> torch.Tensor: + """Get a static buffer for the given name/shape/stride/dtype/slot.""" + key = (name, shape, stride, dtype) + return self._buffers[key][slot_idx % self.slot_capacity] + + +class PrefetchOffloader(BaseOffloader): + """Prefetching-based offloader with group-based layer selection. + + Groups layers and uses async H2D prefetch to hide transfer latency. + Uses static buffers and stream synchronization for torch.compile and + CUDA graph compatibility. + + Args: + group_size: Group every N layers together. + num_in_group: Offload this many layers per group (last N of each group). + prefetch_step: Number of layers to prefetch ahead. + mode: Offload mode ("cpu" is currently supported). + """ + + def __init__( + self, + group_size: int, + num_in_group: int, + prefetch_step: int, + offload_params: set[str] | None = None, + mode: str = "cpu", + ): + self.group_size = group_size + self.num_in_group = num_in_group + self.prefetch_step = prefetch_step + self.offload_params = offload_params or set() + self.mode = mode + + # Copy stream for async H2D transfers + self.copy_stream = torch.cuda.Stream() + + # Module offloaders and buffer pool (populated in wrap_modules/post_init) + self.module_offloaders: list[_ModuleOffloader] = [] + self.buffer_pool: StaticBufferPool | None = None + self.total_offloaded_bytes = 0 + + def wrap_modules( + self, + modules_generator: Generator[nn.Module, None, None], + ) -> list[nn.Module]: + """Wrap modules with prefetch offloading logic.""" + assert len(self.module_offloaders) == 0, ( + "wrap_modules should only be called once" + ) + + all_modules = [] + offload_modules = [] + + for module_index, module in enumerate(modules_generator): + all_modules.append(module) + + # Select layers to offload based on group pattern + # Offload last num_in_group layers of each group_size + if module_index % self.group_size >= self.group_size - self.num_in_group: + if self.offload_params: + whitelist = [ + name + for name, _ in module.named_parameters() + if any(f".{p}." in f".{name}." for p in self.offload_params) + ] + else: + whitelist = [name for name, _ in module.named_parameters()] + + if not whitelist: + continue # skip layers with no matching params + + offload_modules.append(module) + self.module_offloaders.append( + _ModuleOffloader( + mode=self.mode, + module=module, + copy_stream=self.copy_stream, + whitelist_param_names=whitelist, + layer_idx=len(self.module_offloaders), + ) + ) + + for index, module in enumerate(offload_modules): + self._hook_module_forward(index, module) + + return all_modules + + def _hook_module_forward(self, index: int, module: nn.Module): + """Hook module's forward with torch.compile-compatible sync.""" + original_forward = module.forward + + def forward(*args, **kwargs): + # Temporarily restore original forward to avoid recursion + module.forward = original_forward + + # Wait for this layer's prefetch to complete + # mutates_args on input_tensor creates data dependency for torch.compile + input_tensor = args[0] if args else kwargs.get("hidden_states") + torch.ops.vllm.wait_prefetch(input_tensor, index) + + # No parameter swapping needed - parameters already point to + # GPU static buffers (set in assign_static_buffer) + output = original_forward(*args, **kwargs) + + # Start prefetch for next layer (circular) + # mutates_args on output_tensor creates ordering dependency + next_index = (index + self.prefetch_step) % len(self.module_offloaders) + # Handle tuple output (e.g., (hidden_states, residual)) + if isinstance(output, tuple): + torch.ops.vllm.start_prefetch(output[0], next_index) + else: + torch.ops.vllm.start_prefetch(output, next_index) + + # No explicit offload needed - static buffers are reused implicitly + + # Restore hooked forward + module.forward = forward + return output + + module.forward = forward + + def _wait_for_layer(self, layer_idx: int): + """Called by custom op - wait for copy to complete. + + Synchronization strategy: + - During CUDA graph capture: use event-based wait (graph-compatible) + - Outside capture (warmup/eager): use wait_stream (more robust) + + During capture, we skip wait for pre-capture prefetches because: + 1. sync_before_graph_capture() ensures pre-capture work is complete + 2. We can't wait on pre-capture events during capture (isolation error) + """ + offloader = self.module_offloaders[layer_idx] + + if torch.cuda.is_current_stream_capturing(): + # During capture, skip wait for pre-capture prefetches. + # sync_before_graph_capture() ensures pre-capture work is complete. + if not offloader._prefetch_in_capture: + return + # Event-based wait for in-capture prefetches (graph-compatible) + torch.cuda.current_stream().wait_event(offloader._copy_done_event) + # Mark that this prefetch has been waited on (joined). + offloader._prefetch_in_capture = False + else: + if offloader._event_valid_for_eager: + # Use per-layer event to only wait for THIS layer's copy, + # allowing other layers' prefetches to run concurrently. + torch.cuda.current_stream().wait_event(offloader._copy_done_event) + else: + # Event not usable (unrecorded or recorded during capture). + # Fall back to wait_stream to drain all copy_stream work. + torch.cuda.current_stream().wait_stream(self.copy_stream) + + def sync_prev_onload(self): + """Sync previous onload operations. + + Ensures any H2D copies in flight on copy_stream complete before + the compute stream continues. Call this before CUDA graph + capture/replay or when synchronization is needed. + """ + torch.cuda.current_stream().wait_stream(self.copy_stream) + + def _start_prefetch(self, layer_idx: int): + """Called by custom op - start async copy to static buffer.""" + offloader = self.module_offloaders[layer_idx] + offloader.start_onload_to_static() + + def join_after_forward(self): + """Join copy_stream after model forward completes. + + Call this after the model forward pass but before CUDA graph capture + ends. This ensures copy_stream is rejoined for any prefetches started + during the forward pass. + + We join ALL layers that have _prefetch_in_capture=True, meaning their + prefetch was started during capture but not yet waited on (joined). + This handles both full and piecewise cudagraph modes correctly: + - Full mode: joins layers 0..prefetch_step-1 (prefetched by last layers) + - Piecewise mode: joins only layers prefetched by THIS subgraph's layers + """ + if not self.module_offloaders: + return + # Join all layers whose prefetch was started in capture but not waited on + for offloader in self.module_offloaders: + if offloader._prefetch_in_capture: + torch.cuda.current_stream().wait_event(offloader._copy_done_event) + offloader._prefetch_in_capture = False + + def post_init(self): + """Allocate static buffer pool and start initial prefetches. + + Note: Parameters have already been offloaded to CPU during wrap_modules() + (in _CpuParamOffloader.__init__), so GPU memory is available for the + static buffer pool. + """ + # Sync CPU storage with current param.data BEFORE collecting param info. + # This is needed because process_weights_after_loading may have: + # 1. Transformed weights (quantization, transpose, etc.) + # 2. Created new CPU tensors via device_loading_context + # Our _cpu_storage would be stale otherwise. + for offloader in self.module_offloaders: + offloader.sync_cpu_storage() + + # Collect parameter info (now using synced CPU storage) + param_infos: list[ParamInfo] = [] + device: torch.device | None = None + + for offloader in self.module_offloaders: + param_infos.extend(offloader.get_param_infos()) + if device is None: + device = offloader.device + + if device is None: + # No modules to offload + return + + # Allocate static buffer pool + self.buffer_pool = StaticBufferPool( + param_infos=param_infos, + slot_capacity=self.prefetch_step, + device=device, + ) + + # Assign buffer slots and point parameters to GPU buffers + for idx, offloader in enumerate(self.module_offloaders): + slot_idx = idx % self.prefetch_step + offloader.assign_buffer_slot(self.buffer_pool, slot_idx) + + # Collect offloaded bytes + for offloader in self.module_offloaders: + offloader.post_init() + self.total_offloaded_bytes += offloader.offloaded_bytes + + logger.info_once( + f"[PrefetchOffloader] Initialized {len(self.module_offloaders)} modules. " + f"Total GPU memory saved: {self.total_offloaded_bytes / 1e9:.4f} GB, " + f"Static buffer pool: {self.buffer_pool.total_bytes / 1e9:.4f} GB " + f"(group_size={self.group_size}, num_in_group={self.num_in_group}, " + f"prefetch_step={self.prefetch_step}, mode={self.mode})" + ) + + # Start initial prefetches + for i in range(min(self.prefetch_step, len(self.module_offloaders))): + self.module_offloaders[i].start_onload_to_static() + + +class _ModuleOffloader: + """Manages offloading for a single module. + + Uses static buffers from a shared pool instead of dynamic allocation. + """ + + def __init__( + self, + mode: str, + module: nn.Module, + copy_stream: torch.cuda.Stream, + whitelist_param_names: list[str], + layer_idx: int, + ): + self.mode = mode + self.module = module + self.device = next(module.parameters()).device + self.copy_stream = copy_stream + self.layer_idx = layer_idx + self.offloaded_bytes = 0 + + # Event to signal when H2D copy to static buffer is complete. + # Used for per-layer synchronization (both eager and capture modes). + self._copy_done_event = torch.cuda.Event() + + # Track whether _copy_done_event is valid for eager-mode wait_event. + # False when: (1) never recorded, or (2) last recorded during a + # cudagraph capture (events become invalid after capture ends). + # In these cases we fall back to wait_stream. + self._event_valid_for_eager = False + + # Track if last prefetch was started during CUDA graph capture. + # Used to skip wait_event during capture for pre-capture prefetches. + self._prefetch_in_capture = False + + assert self.device != torch.device("cpu"), ( + "Module parameters should not already be on CPU " + "(offloader handles CPU placement)" + ) + + # Buffer pool and slot (assigned in assign_buffer_slot) + self._buffer_pool: StaticBufferPool | None = None + self._buffer_slot_idx: int = 0 + + param_dict = dict(self.module.named_parameters()) + assert all(name in param_dict for name in whitelist_param_names), ( + f"Whitelist params {whitelist_param_names} not found in module params " + f"{list(param_dict.keys())}" + ) + + self._param_offloaders = { + name: _BaseParamOffloader.create(mode, module=module, param_name=name) + for name in whitelist_param_names + } + + def post_init(self): + """Collect total offloaded bytes (offloading already done in __init__).""" + for param_offloader in self._param_offloaders.values(): + param_offloader.post_init() + self.offloaded_bytes += param_offloader.offloaded_bytes + + def sync_cpu_storage(self): + """Sync CPU storage with current param.data. + + Called after process_weights_after_loading to ensure _cpu_storage + contains the final processed weights, not stale pre-loading data. + """ + for param_offloader in self._param_offloaders.values(): + param_offloader.sync_cpu_storage() + + def get_param_infos(self) -> list[ParamInfo]: + """Get parameter metadata for buffer pool allocation. + + Note: sync_cpu_storage() must be called before this method to ensure + _cpu_storage reflects the final processed weights (after quantization). + """ + infos = [] + for name, offloader in self._param_offloaders.items(): + cpu_storage = offloader._cpu_storage + assert cpu_storage is not None, "CPU storage not initialized" + infos.append( + ParamInfo( + name=name, + shape=tuple(cpu_storage.shape), + stride=tuple(cpu_storage.stride()), + dtype=cpu_storage.dtype, + ) + ) + return infos + + def assign_buffer_slot(self, pool: StaticBufferPool, slot_idx: int): + """Assign this module to a buffer slot in the pool. + + Also assigns static GPU buffers to each parameter offloader, + which moves the parameter data to point to the GPU buffer. + """ + self._buffer_pool = pool + self._buffer_slot_idx = slot_idx + + # Assign static buffers to parameters + # Use CPU storage shape/stride/dtype since param.data is now empty + for name, offloader in self._param_offloaders.items(): + cpu_storage = offloader._cpu_storage + assert cpu_storage is not None, "CPU storage not initialized" + buffer = pool.get_buffer( + name=name, + shape=tuple(cpu_storage.shape), + stride=tuple(cpu_storage.stride()), + dtype=cpu_storage.dtype, + slot_idx=slot_idx, + ) + offloader.assign_static_buffer(buffer) + + def start_onload_to_static(self): + """Start async copy from CPU storage to GPU buffer. + + Uses event-based forking to join copy_stream to CUDA graph capture. + This ensures H2D copies are properly captured when recording a graph. + + IMPORTANT: We must wait for the compute stream before copying, because + the previous layer's forward may still be using the buffer (GPU ops are + async). Without this sync, we could overwrite the buffer while it's + being read. + """ + assert self._buffer_pool is not None, "Buffer pool not assigned" + + # Track if this prefetch is being captured (for _wait_for_layer logic) + self._prefetch_in_capture = torch.cuda.is_current_stream_capturing() + + # Fork: record event on compute stream, copy_stream waits on it + # This joins copy_stream to any active CUDA graph capture + fork_event = torch.cuda.Event() + torch.cuda.current_stream().record_event(fork_event) + self.copy_stream.wait_event(fork_event) + + with torch.cuda.stream(self.copy_stream): + for name, offloader in self._param_offloaders.items(): + cpu_storage = offloader._cpu_storage + gpu_buffer = offloader._gpu_buffer + assert cpu_storage is not None, "CPU storage not initialized" + assert gpu_buffer is not None, "GPU buffer not assigned" + assert not is_pin_memory_available() or cpu_storage.is_pinned(), ( + f"CPU storage for {name} is not pinned! " + "non_blocking=True H2D copy from non-pinned memory " + "causes stream synchronization that breaks " + "event-based fork synchronization." + ) + gpu_buffer.copy_(cpu_storage, non_blocking=True) + + # Record completion event for _wait_for_layer to use + self._copy_done_event.record(self.copy_stream) + # Event is only valid for eager wait_event if recorded outside capture. + # Events recorded during capture become invalid after capture ends. + self._event_valid_for_eager = not torch.cuda.is_current_stream_capturing() + + +class _BaseParamOffloader(ABC): + """Base class for parameter offloading strategies.""" + + # CPU storage for offloaded parameters (set by subclasses) + _cpu_storage: torch.Tensor | None + # GPU buffer reference (set by subclasses when using static buffers) + _gpu_buffer: torch.Tensor | None + + @staticmethod + def create(mode: str, **kwargs) -> "_BaseParamOffloader": + """Factory method to create appropriate offloader for mode.""" + if mode == "cpu": + return _CpuParamOffloader(**kwargs) + else: + raise ValueError(f"Unknown offload mode: {mode}") + + def __init__(self, module: nn.Module, param_name: str): + self._module = module + self._param_name = param_name + self.offloaded_bytes = 0 + self._cpu_storage = None + self._gpu_buffer = None + + @property + def _param(self) -> nn.Parameter: + """Get the parameter being offloaded. + + Supports dotted names (e.g. 'self_attn.qkv_proj.weight') by + traversing the module hierarchy. + """ + obj: Any = self._module + for attr in self._param_name.split("."): + obj = getattr(obj, attr) + return obj + + def post_init(self): + """Initialize offloading (move parameter to storage).""" + return + + @abstractmethod + def sync_cpu_storage(self) -> None: + """Sync CPU storage with current param.data. + + Called after process_weights_after_loading to update _cpu_storage + with the final processed weights. + """ + pass + + @abstractmethod + def assign_static_buffer(self, gpu_buffer: torch.Tensor) -> None: + """Point parameter data to GPU static buffer.""" + pass + + +class _CpuParamOffloader(_BaseParamOffloader): + """Offload parameter to pinned CPU memory. + + Uses GPU static buffers as the actual parameter, with CPU storage + kept separately. This ensures torch.compile sees GPU tensors at trace time. + + The offloading happens in two phases: + 1. __init__() - copies GPU data to CPU, frees GPU memory immediately + 2. assign_static_buffer() - points param.data to GPU static buffer + """ + + def __init__(self, module: nn.Module, param_name: str): + super().__init__(module, param_name) + self._cpu_storage: torch.Tensor | None = None + self._gpu_buffer: torch.Tensor | None = None # Store reference to GPU buffer + + # Offload to CPU immediately to free GPU memory during model loading + self._offload_to_cpu_internal() + + def _offload_to_cpu_internal(self): + """Copy parameter data to pinned CPU storage and free GPU memory. + + This replaces param.data with CPU storage, allowing weight loading + to continue writing to CPU memory. GPU memory is freed when the + original GPU tensor is garbage collected. + """ + param = self._param + pin_memory = is_pin_memory_available() + + # Create pinned CPU storage and copy current GPU data + self._cpu_storage = torch.empty_strided( + size=param.data.size(), + stride=param.data.stride(), + dtype=param.data.dtype, + layout=param.data.layout, + device="cpu", + pin_memory=pin_memory, + ) + self._cpu_storage.copy_(param.data) + + self.offloaded_bytes = ( + self._cpu_storage.numel() * self._cpu_storage.element_size() + ) + + # Point param.data to CPU storage - this allows weight loading to work + # and frees GPU memory when the original GPU tensor is garbage collected + param.data = self._cpu_storage + + def _update_cpu_storage_from_param(self) -> None: + """Update _cpu_storage from current param.data, ensuring pinned memory. + + After process_weights_after_loading, device_loading_context creates + non-pinned CPU tensors via `p.data = p.data.to("cpu")`. Using + non-pinned memory with `copy_(src, non_blocking=True)` causes CUDA to + perform a stream synchronization before the copy, breaking the + event-based fork synchronization and potentially allowing the copy + to overwrite the GPU buffer while the compute stream still reads it. + + This method ensures _cpu_storage always uses pinned memory when + available, re-pinning if necessary. + """ + param = self._param + + if param.data.device.type == "cpu": + if is_pin_memory_available() and not param.data.is_pinned(): + pinned = torch.empty_strided( + size=param.data.size(), + stride=param.data.stride(), + dtype=param.data.dtype, + layout=param.data.layout, + device="cpu", + pin_memory=True, + ) + pinned.copy_(param.data) + self._cpu_storage = pinned + else: + self._cpu_storage = param.data + else: + # param.data is on GPU - copy to existing CPU storage + assert self._cpu_storage is not None + self._cpu_storage.copy_(param.data) + + def assign_static_buffer(self, gpu_buffer: torch.Tensor) -> None: + """Point parameter data to GPU static buffer. + + This is called after weight loading AND process_weights_after_loading + complete. At this point: + - param.data may have been replaced by device_loading_context + (which creates new CPU tensors after quantization processing) + - We need to update _cpu_storage to point to current param.data + so that prefetch copies the processed weights, not stale data + - Then point param.data to the GPU buffer for torch.compile + """ + assert self._cpu_storage is not None, ( + "_offload_to_cpu_internal() must be called before assign_static_buffer()" + ) + + # Get current parameter (may have been replaced by + # process_weights_after_loading) + param = self._param + + # Update _cpu_storage to current param.data. This is critical because: + # 1. process_weights_after_loading may transform weights (quantization) + # 2. device_loading_context creates NEW CPU tensors when moving back + # 3. Our old _cpu_storage would have pre-processed or stale data + self._update_cpu_storage_from_param() + + # Store reference to GPU buffer for use in start_onload + self._gpu_buffer = gpu_buffer + + # Point parameter to static GPU buffer - this is what torch.compile sees + param.data = gpu_buffer + + def sync_cpu_storage(self) -> None: + """Sync CPU storage with current param.data. + + Called after process_weights_after_loading to update _cpu_storage + with the final processed weights. This is critical because: + 1. process_weights_after_loading may transform weights (quantization) + 2. device_loading_context creates NEW CPU tensors when moving back + 3. Our old _cpu_storage would have pre-processed or stale data + """ + self._update_cpu_storage_from_param() + + def post_init(self): + """No-op: offloading done in offload_to_cpu/assign_static_buffer.""" + pass diff --git a/vllm/model_executor/offloader/prefetch_ops.py b/vllm/model_executor/offloader/prefetch_ops.py new file mode 100644 index 000000000..d1f59b67b --- /dev/null +++ b/vllm/model_executor/offloader/prefetch_ops.py @@ -0,0 +1,94 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Custom ops for prefetch offloader torch.compile + CUDA graph compatibility. + +These ops use mutates_args to create data dependencies that prevent +the compiler from reordering prefetch/sync operations. +""" + +from __future__ import annotations + +import torch + +from vllm.model_executor.offloader.base import get_offloader +from vllm.utils.torch_utils import direct_register_custom_op + +# --- wait_prefetch op --- + + +def _wait_prefetch_impl( + input_tensor: torch.Tensor, + layer_idx: int, +) -> None: + """Wait for prefetch of layer_idx to complete. + + Synchronizes the compute stream with the copy stream to ensure + the prefetched weights are ready for use. + + Args: + input_tensor: Input to the layer (e.g., hidden_states) - declared + as mutated to create data dependency for torch.compile. + layer_idx: Index of the layer to wait for. + """ + get_offloader()._wait_for_layer(layer_idx) + + +def _wait_prefetch_fake( + input_tensor: torch.Tensor, + layer_idx: int, +) -> None: + """Fake implementation for torch.compile tracing.""" + return + + +# --- start_prefetch op --- + + +def _start_prefetch_impl( + output_tensor: torch.Tensor, + layer_idx: int, +) -> None: + """Start async prefetch of layer_idx weights. + + Initiates H2D copy on the copy stream for the specified layer. + + Args: + output_tensor: Output from forward - declared as mutated to + prevent torch.compile from reordering this op before the + computation that produces output_tensor. + layer_idx: Index of the layer to prefetch. + """ + get_offloader()._start_prefetch(layer_idx) + + +def _start_prefetch_fake( + output_tensor: torch.Tensor, + layer_idx: int, +) -> None: + """Fake implementation for torch.compile tracing.""" + return + + +def register_prefetch_offloader_ops() -> None: + """Register custom ops for prefetch offloader. + + Must be called before the ops are used. This is typically done + at module import time. + """ + direct_register_custom_op( + op_name="wait_prefetch", + op_func=_wait_prefetch_impl, + mutates_args=["input_tensor"], + fake_impl=_wait_prefetch_fake, + ) + + direct_register_custom_op( + op_name="start_prefetch", + op_func=_start_prefetch_impl, + mutates_args=["output_tensor"], + fake_impl=_start_prefetch_fake, + ) + + +# Register ops at module import time +register_prefetch_offloader_ops() diff --git a/vllm/model_executor/offloader/uva.py b/vllm/model_executor/offloader/uva.py new file mode 100644 index 000000000..c524e43cd --- /dev/null +++ b/vllm/model_executor/offloader/uva.py @@ -0,0 +1,140 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""UVA-based CPU offloading using Unified Virtual Addressing.""" + +from collections.abc import Generator + +import torch +import torch.nn as nn +from torch.func import functional_call + +import vllm.envs as envs +from vllm.logger import init_logger +from vllm.model_executor.offloader.base import BaseOffloader +from vllm.utils.mem_utils import format_gib +from vllm.utils.platform_utils import is_pin_memory_available, is_uva_available +from vllm.utils.torch_utils import get_accelerator_view_from_cpu_tensor + +logger = init_logger(__name__) + + +class UVAOffloader(BaseOffloader): + """Offloader using Unified Virtual Addressing (UVA) for zero-copy access. + + This offloader moves parameters to pinned CPU memory and creates CUDA views + using UVA. The GPU can then directly access the CPU memory without explicit + transfers, at the cost of PCIe bandwidth (slower than GPU memory). + + When UVA is disabled via env var, falls back to a functional_call-based + approach that moves parameters on-demand. + + Args: + cpu_offload_max_bytes: Maximum bytes to offload to CPU. + cpu_offload_params: Set of parameter name segments to selectively + offload. If empty, all parameters are eligible up to the byte limit. + """ + + def __init__( + self, + cpu_offload_max_bytes: int, + cpu_offload_params: set[str] | None = None, + ): + self.cpu_offload_max_bytes = cpu_offload_max_bytes + self.cpu_offload_bytes = 0 + self.cpu_offload_params = cpu_offload_params or set() + + self.pin_memory = ( + is_pin_memory_available() + and not envs.VLLM_WEIGHT_OFFLOADING_DISABLE_PIN_MEMORY + ) + self.uva_offloading = ( + is_uva_available() and not envs.VLLM_WEIGHT_OFFLOADING_DISABLE_UVA + ) + + def wrap_modules( + self, + modules_generator: Generator[nn.Module, None, None], + ) -> list[nn.Module]: + """Wrap modules with UVA offloading.""" + modules = [self._maybe_offload_to_cpu(module) for module in modules_generator] + if self.cpu_offload_bytes > 0: + logger.info( + "Total CPU offloaded parameters: %s", + format_gib(self.cpu_offload_bytes), + ) + return modules + + def _maybe_offload_to_cpu(self, module: nn.Module) -> nn.Module: + """Offload module parameters to CPU using UVA if budget allows.""" + if (params := next(module.parameters(), None)) is None: + return module + + device = params.device + + if device == torch.device("cpu"): + return module + + if self.cpu_offload_bytes >= self.cpu_offload_max_bytes: + return module + + # offload parameters to CPU + # use pin_memory if possible, which helps cudagraph capture speed + offloaded_parameters = False + for name, p in module.named_parameters(): + if self.cpu_offload_bytes >= self.cpu_offload_max_bytes: + # we use per-parameter offloading + # one module might have some parameters offloaded and some not + break + + if self.cpu_offload_params: + # Check if parameter belongs to the offloading set + # Add dots here to ensure we match full segments only + # e.g., "experts.w2_weight" matches "mlp.experts.w2_weight" + # but not "mlp.experts.w2_weight_scale" + should_offload = any( + f".{param}." in f".{name}." for param in self.cpu_offload_params + ) + if not should_offload: + continue + + cpu_data = p.data.to(device="cpu") + if self.pin_memory: + cpu_data = cpu_data.pin_memory() + + if not self.uva_offloading: + p.data = cpu_data + else: + p.data = get_accelerator_view_from_cpu_tensor(cpu_data) + p._vllm_is_uva_offloaded = True + + self.cpu_offload_bytes += p.data.numel() * p.data.element_size() + offloaded_parameters = True + + if offloaded_parameters and not self.uva_offloading: + original_forward = module.forward + + def forward(*args, **kwargs): + module.forward = original_forward + device_state = { + # here we blindly call `to(device)` + # if the parameter is already on the device, + # it will be a no-op + k: v.to(device, non_blocking=True) + for k, v in module.state_dict().items() + } + + # set `tie_weights=False` as tied weights in original model + # become untied when calling .to(device) individually + output = functional_call( + module, + device_state, + args=args, + kwargs=kwargs, + tie_weights=False, + ) + module.forward = forward + return output + + module.forward = forward + + return module diff --git a/vllm/v1/worker/gpu/cudagraph_utils.py b/vllm/v1/worker/gpu/cudagraph_utils.py index 5665937a0..d70a4c7ab 100644 --- a/vllm/v1/worker/gpu/cudagraph_utils.py +++ b/vllm/v1/worker/gpu/cudagraph_utils.py @@ -12,6 +12,7 @@ from vllm.config import VllmConfig from vllm.config.compilation import CUDAGraphMode from vllm.distributed.parallel_state import graph_capture, is_global_first_rank from vllm.forward_context import BatchDescriptor, set_forward_context +from vllm.model_executor.offloader.base import get_offloader from vllm.utils.math_utils import cdiv from vllm.v1.kv_cache_interface import KVCacheConfig from vllm.v1.worker.gpu.attn_utils import ( @@ -189,6 +190,11 @@ class CudaGraphManager: # Capture the graph. assert num_tokens not in self.graphs graph = torch.cuda.CUDAGraph() + + # Sync offloader's copy stream before capture. + # Ensure any pre-capture prefetches from offloader are complete. + get_offloader().sync_prev_onload() + with ( set_forward_context( attn_metadata=attn_metadata, @@ -205,6 +211,11 @@ class CudaGraphManager: positions=positions, inputs_embeds=inputs_embeds, ) + # Join offloader's copy stream after forward to avoid unjoined + # stream error. The last layer's start_prefetch forks copy_stream, + # but wait_prefetch only happens in the next forward pass. + get_offloader().join_after_forward() + if self.use_aux_hidden_state_outputs: hidden_states, aux_hidden_states = model_output else: @@ -329,6 +340,13 @@ class CudaGraphManager: self, num_tokens: int ) -> torch.Tensor | tuple[torch.Tensor, list[torch.Tensor]]: assert num_tokens in self.graphs, f"No cudagraph for {num_tokens} tokens" + # Sync offloader before replay - needed when transitioning from + # eager/piecewise to full cudagraph (e.g., prefill → decode). + # The previous eager iteration's start_prefetch may have queued + # H2D copies on copy_stream that the graph's captured events + # cannot see. Without this, replay could overwrite static buffers + # while those copies are still in flight. + get_offloader().sync_prev_onload() self.graphs[num_tokens].replay() assert self.hidden_states is not None hidden_states = self.hidden_states[:num_tokens] diff --git a/vllm/v1/worker/gpu/spec_decode/eagle/cudagraph.py b/vllm/v1/worker/gpu/spec_decode/eagle/cudagraph.py index c489a172c..eda8c37d5 100644 --- a/vllm/v1/worker/gpu/spec_decode/eagle/cudagraph.py +++ b/vllm/v1/worker/gpu/spec_decode/eagle/cudagraph.py @@ -7,6 +7,7 @@ import torch from vllm.config import VllmConfig from vllm.config.compilation import CUDAGraphMode +from vllm.model_executor.offloader.base import get_offloader from vllm.v1.kv_cache_interface import KVCacheConfig from vllm.v1.worker.gpu.block_table import BlockTables from vllm.v1.worker.gpu.cudagraph_utils import ( @@ -115,6 +116,11 @@ class EagleCudaGraphManager: ) -> None: assert num_tokens not in self.graphs graph = torch.cuda.CUDAGraph() + + # Sync offloader's copy stream before capture. + # Ensure any pre-capture prefetches from offloader are complete. + get_offloader().sync_prev_onload() + with torch.cuda.graph(graph, self.pool): generate_fn( num_reqs, @@ -124,6 +130,10 @@ class EagleCudaGraphManager: num_tokens_across_dp, CUDAGraphMode.NONE, ) + # Join offloader's copy stream after forward to avoid unjoined + # stream error. The last layer's start_prefetch forks copy_stream, + # but wait_prefetch only happens in the next forward pass. + get_offloader().join_after_forward() self.graphs[num_tokens] = graph def _capture_piecewise_graph( @@ -171,4 +181,11 @@ class EagleCudaGraphManager: def run_fullgraph(self, num_tokens: int) -> None: assert num_tokens in self.graphs + # Sync offloader before replay - needed when transitioning from + # eager/piecewise to full cudagraph (e.g., prefill → decode). + # The previous eager iteration's start_prefetch may have queued + # H2D copies on copy_stream that the graph's captured events + # cannot see. Without this, replay could overwrite static buffers + # while those copies are still in flight. + get_offloader().sync_prev_onload() self.graphs[num_tokens].replay() diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index f711d1d79..d82b83b8c 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -81,6 +81,11 @@ from vllm.model_executor.models.interfaces_base import ( is_pooling_model, is_text_generation_model, ) +from vllm.model_executor.offloader import ( + create_offloader, + get_offloader, + set_offloader, +) from vllm.multimodal import MULTIMODAL_REGISTRY from vllm.multimodal.encoder_budget import MultiModalBudget from vllm.multimodal.inputs import ( @@ -378,6 +383,7 @@ class GPUModelRunner( self.vllm_config = vllm_config self.model_config = vllm_config.model_config self.cache_config = vllm_config.cache_config + self.offload_config = vllm_config.offload_config self.compilation_config = vllm_config.compilation_config self.lora_config = vllm_config.lora_config self.load_config = vllm_config.load_config @@ -386,14 +392,6 @@ class GPUModelRunner( self.speculative_config = vllm_config.speculative_config self.observability_config = vllm_config.observability_config - from vllm.model_executor.models.utils import ( - set_cpu_offload_max_bytes, - set_cpu_offload_params, - ) - - set_cpu_offload_max_bytes(int(self.cache_config.cpu_offload_gb * 1024**3)) - set_cpu_offload_params(self.cache_config.cpu_offload_params) - model_config = self.model_config cache_config = self.cache_config scheduler_config = self.scheduler_config @@ -749,6 +747,10 @@ class GPUModelRunner( pin_memory=self.pin_memory, ) + # Model weight offloader + # Make sure this is called before any get_offloader call + set_offloader(create_offloader(self.offload_config)) + # Ephemeral state transferred between execute_model() and sample_tokens(). self.execute_model_state: ExecuteModelState | None = None self.kv_connector_output: KVConnectorOutput | None = None @@ -4342,6 +4344,8 @@ class GPUModelRunner( self.model, self.vllm_config, CUDAGraphMode.NONE, self.device ) + get_offloader().post_init() + def _get_eagle3_aux_layers_from_config(self) -> tuple[int, ...] | None: """Extract Eagle3 auxiliary layer indices from speculative config. @@ -5780,7 +5784,7 @@ class GPUModelRunner( if block_sizes != [self.cache_config.block_size] or kernel_block_sizes != [ self.cache_config.block_size ]: - assert self.cache_config.cpu_offload_gb == 0, ( + assert self.offload_config.uva.cpu_offload_gb == 0, ( "Cannot re-initialize the input batch when CPU weight " "offloading is enabled. See https://github.com/vllm-project/vllm/pull/18298 " # noqa: E501 "for more details." diff --git a/vllm/v1/worker/gpu_ubatch_wrapper.py b/vllm/v1/worker/gpu_ubatch_wrapper.py index edbf797b1..45ba1bef9 100644 --- a/vllm/v1/worker/gpu_ubatch_wrapper.py +++ b/vllm/v1/worker/gpu_ubatch_wrapper.py @@ -20,6 +20,7 @@ from vllm.forward_context import ( override_forward_context, ) from vllm.logger import init_logger +from vllm.model_executor.offloader.base import get_offloader from vllm.platforms import current_platform from vllm.sequence import IntermediateTensors from vllm.utils.import_utils import has_deep_gemm @@ -239,6 +240,11 @@ class UBatchWrapper: set_graph_pool_id(self.graph_pool) else: set_graph_pool_id(current_platform.graph_pool_handle()) + + # Sync offloader's copy stream before capture. + # Ensure any pre-capture prefetches from offloader are complete. + get_offloader().sync_prev_onload() + with torch.cuda.graph( cudagraph_metadata.cudagraph, stream=compute_stream, @@ -250,6 +256,10 @@ class UBatchWrapper: sorted_results = [value for position, value in sorted(results)] result = torch.cat(sorted_results, dim=0) cudagraph_metadata.outputs = result + # Join offloader's copy stream after forward to avoid unjoined + # stream error. The last layer's start_prefetch forks copy_stream, + # but wait_prefetch only happens in the next forward pass. + get_offloader().join_after_forward() self.cudagraphs[num_tokens] = cudagraph_metadata return cudagraph_metadata.outputs @@ -461,6 +471,9 @@ class UBatchWrapper: and cudagraph_runtime_mode is CUDAGraphMode.FULL ): cudagraph_metadata = self.cudagraphs[num_tokens] + # Sync offloader before replay - ensures any external dependencies + # from pre-capture prefetches are satisfied. + get_offloader().sync_prev_onload() cudagraph_metadata.cudagraph.replay() return cudagraph_metadata.outputs else: