diff --git a/docs/features/disagg_prefill.md b/docs/features/disagg_prefill.md index dc5e11ea2..7b8280c4d 100644 --- a/docs/features/disagg_prefill.md +++ b/docs/features/disagg_prefill.md @@ -37,10 +37,10 @@ For NixlConnector, you may also specify one or multiple NIXL_Backend. Such as: --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both", "kv_buffer_device":"cuda", "kv_connector_extra_config":{"backends":["UCX", "GDS"]}}' ``` -- **OffloadingConnector**: enable offloading of KV data to CPU memory, customizing the CPU block size (in tokens) and number of blocks to allocate (per worker): +- **OffloadingConnector**: enable offloading of KV data to CPU memory, customizing the CPU block size (in tokens) and total CPU memory bytes to allocate: ```bash - --kv-transfer-config '{"kv_connector":"OffloadingConnector","kv_role":"kv_both","kv_connector_extra_config":{"block_size": 64, "num_cpu_blocks": 1000}}' + --kv-transfer-config '{"kv_connector":"OffloadingConnector","kv_role":"kv_both","kv_connector_extra_config":{"block_size": 64, "cpu_bytes_to_use": 1000000000}}' ``` ## Benchmarks diff --git a/tests/v1/kv_connector/unit/test_config.py b/tests/v1/kv_connector/unit/test_config.py index 6cf86f3d5..74075f3ee 100644 --- a/tests/v1/kv_connector/unit/test_config.py +++ b/tests/v1/kv_connector/unit/test_config.py @@ -15,7 +15,7 @@ pytestmark = pytest.mark.cpu_test [ ("native", 4.0, 1, 1, "OffloadingConnector", 4.0 * (1 << 30)), # bytes per rank: 8.0 GiB / (2 * 2) = 2.0 GiB - ("native", 8.0, 2, 2, "OffloadingConnector", 8.0 * (1 << 30) / 4), + ("native", 8.0, 2, 2, "OffloadingConnector", 8.0 * (1 << 30)), ("lmcache", 4.0, 1, 1, "LMCacheConnectorV1", 4.0), # size per rank: 8.0 GiB / (2 * 2) = 2.0 GiB ("lmcache", 8.0, 2, 2, "LMCacheConnectorV1", 2.0), @@ -54,8 +54,7 @@ def test_kv_connector( assert kv_transfer_config.kv_role == "kv_both" if kv_offloading_backend == "native": - assert kv_connector_extra_config["kv_bytes_per_rank"] == expected_bytes - assert kv_connector_extra_config["num_cpu_blocks"] == 0 + assert kv_connector_extra_config["cpu_bytes_to_use"] == expected_bytes # Existing config should be preserved assert kv_connector_extra_config["existing_key"] == "existing_value" elif kv_offloading_backend == "lmcache": diff --git a/tests/v1/kv_connector/unit/test_offloading_connector.py b/tests/v1/kv_connector/unit/test_offloading_connector.py index 987c475c8..5c049301c 100644 --- a/tests/v1/kv_connector/unit/test_offloading_connector.py +++ b/tests/v1/kv_connector/unit/test_offloading_connector.py @@ -26,6 +26,7 @@ from vllm.v1.core.kv_cache_utils import ( init_none_hash, ) from vllm.v1.core.sched.scheduler import Scheduler +from vllm.v1.kv_cache_interface import KVCacheConfig from vllm.v1.kv_offload.abstract import ( LoadStoreSpec, OffloadingEvent, @@ -93,8 +94,8 @@ class MockOffloadingHandler(OffloadingHandler): class MockOffloadingSpec(OffloadingSpec): - def __init__(self, vllm_config: VllmConfig): - super().__init__(vllm_config) + def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig): + super().__init__(vllm_config, kv_cache_config) self.manager = MagicMock(spec=OffloadingManager) self.manager.lookup.return_value = 0 diff --git a/tests/v1/kv_offload/test_cpu_offloading.py b/tests/v1/kv_offload/test_cpu_offloading.py index 8aeeffcae..103675608 100644 --- a/tests/v1/kv_offload/test_cpu_offloading.py +++ b/tests/v1/kv_offload/test_cpu_offloading.py @@ -161,7 +161,7 @@ def test_cpu_offloading(cpu_block_size: int, attn_backend: str) -> None: kv_connector="OffloadingConnector", kv_role="kv_both", kv_connector_extra_config={ - "num_cpu_blocks": 1000, + "cpu_bytes_to_use": 500 << 20, "block_size": cpu_block_size, }, ) diff --git a/vllm/config/vllm.py b/vllm/config/vllm.py index 63bfd056b..f9318c8c6 100644 --- a/vllm/config/vllm.py +++ b/vllm/config/vllm.py @@ -516,12 +516,8 @@ class VllmConfig: if kv_offloading_backend == "native": self.kv_transfer_config.kv_connector = "OffloadingConnector" - kv_bytes_per_rank = kv_offloading_size * (1 << 30) / num_kv_ranks - - # NOTE(ApostaC): the actual calculation for num_cpu_blocks should be - # done after the model's KV cache is initialized self.kv_transfer_config.kv_connector_extra_config.update( - {"kv_bytes_per_rank": kv_bytes_per_rank, "num_cpu_blocks": 0} + {"cpu_bytes_to_use": kv_offloading_size * (1 << 30)} ) elif kv_offloading_backend == "lmcache": self.kv_transfer_config.kv_connector = "LMCacheConnectorV1" diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py index 75c40ec5b..707dce4d2 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py @@ -56,7 +56,7 @@ class OffloadingConnector(KVConnectorBase_V1): ): super().__init__(vllm_config, role, kv_cache_config) - spec = OffloadingSpecFactory.create_spec(vllm_config) + spec = OffloadingSpecFactory.create_spec(vllm_config, kv_cache_config) self.connector_scheduler: OffloadingConnectorScheduler | None = None self.connector_worker: OffloadingConnectorWorker | None = None diff --git a/vllm/v1/kv_offload/cpu.py b/vllm/v1/kv_offload/cpu.py index 061cf2267..d07ef8ad0 100644 --- a/vllm/v1/kv_offload/cpu.py +++ b/vllm/v1/kv_offload/cpu.py @@ -7,6 +7,7 @@ import torch from vllm.config import VllmConfig from vllm.platforms import current_platform from vllm.v1.attention.backend import AttentionBackend +from vllm.v1.kv_cache_interface import KVCacheConfig from vllm.v1.kv_offload.abstract import LoadStoreSpec, OffloadingManager from vllm.v1.kv_offload.arc_manager import ARCOffloadingManager from vllm.v1.kv_offload.backends.cpu import CPUBackend @@ -18,15 +19,37 @@ from vllm.v1.kv_offload.worker.worker import OffloadingHandler class CPUOffloadingSpec(OffloadingSpec): - def __init__(self, vllm_config: VllmConfig): - super().__init__(vllm_config) + def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig): + super().__init__(vllm_config, kv_cache_config) - num_cpu_blocks = self.extra_config.get("num_cpu_blocks") - if not num_cpu_blocks: + cpu_bytes_to_use = self.extra_config.get("cpu_bytes_to_use") + if not cpu_bytes_to_use: raise Exception( - "num_cpu_blocks must be specified in kv_connector_extra_config" + "cpu_bytes_to_use must be specified in kv_connector_extra_config" ) - self.num_cpu_blocks: int = num_cpu_blocks + + # calculate kv_bytes_per_offloaded_block + assert kv_cache_config is not None + page_sizes = { + kv_cache_group.kv_cache_spec.page_size_bytes + for kv_cache_group in kv_cache_config.kv_cache_groups + } + assert len(page_sizes) == 1 + page_size_bytes = page_sizes.pop() + kv_bytes_per_block = ( + page_size_bytes + * len(kv_cache_config.kv_cache_tensors) + * vllm_config.parallel_config.world_size + ) + kv_bytes_per_offloaded_block = kv_bytes_per_block * ( + self.offloaded_block_size // self.gpu_block_size + ) + + self.num_blocks = ( + int(cpu_bytes_to_use) // kv_bytes_per_offloaded_block + if kv_bytes_per_offloaded_block > 0 + else 0 + ) # scheduler-side self._manager: OffloadingManager | None = None @@ -44,7 +67,7 @@ class CPUOffloadingSpec(OffloadingSpec): ) backend = CPUBackend( - block_size=self.offloaded_block_size, num_blocks=self.num_cpu_blocks + block_size=self.offloaded_block_size, num_blocks=self.num_blocks ) if self.eviction_policy == "lru": @@ -77,7 +100,7 @@ class CPUOffloadingSpec(OffloadingSpec): attn_backends=attn_backends, gpu_block_size=self.gpu_block_size, cpu_block_size=self.offloaded_block_size, - num_cpu_blocks=self.num_cpu_blocks, + num_cpu_blocks=self.num_blocks, gpu_caches=kv_caches, ) diff --git a/vllm/v1/kv_offload/factory.py b/vllm/v1/kv_offload/factory.py index b4d40cb48..8fe018b89 100644 --- a/vllm/v1/kv_offload/factory.py +++ b/vllm/v1/kv_offload/factory.py @@ -9,6 +9,7 @@ from vllm.v1.kv_offload.spec import OffloadingSpec if TYPE_CHECKING: from vllm.config import VllmConfig + from vllm.v1.kv_cache_interface import KVCacheConfig logger = init_logger(__name__) @@ -32,6 +33,7 @@ class OffloadingSpecFactory: def create_spec( cls, config: "VllmConfig", + kv_cache_config: "KVCacheConfig | None", ) -> OffloadingSpec: kv_transfer_config = config.kv_transfer_config assert kv_transfer_config is not None @@ -47,7 +49,7 @@ class OffloadingSpecFactory: spec_cls = getattr(spec_module, spec_name) assert issubclass(spec_cls, OffloadingSpec) logger.info("Creating offloading spec with name: %s", spec_name) - return spec_cls(config) + return spec_cls(config, kv_cache_config) # Register various specs here. diff --git a/vllm/v1/kv_offload/spec.py b/vllm/v1/kv_offload/spec.py index 549a0fdbf..1d41ea71f 100644 --- a/vllm/v1/kv_offload/spec.py +++ b/vllm/v1/kv_offload/spec.py @@ -13,6 +13,7 @@ from vllm.v1.kv_offload.worker.worker import OffloadingHandler if TYPE_CHECKING: from vllm.config import VllmConfig + from vllm.v1.kv_cache_interface import KVCacheConfig logger = init_logger(__name__) @@ -20,12 +21,15 @@ logger = init_logger(__name__) class OffloadingSpec(ABC): """Spec for an offloading connector""" - def __init__(self, vllm_config: "VllmConfig"): + def __init__( + self, vllm_config: "VllmConfig", kv_cache_config: "KVCacheConfig | None" + ): logger.warning( "Initializing OffloadingSpec. This API is experimental and " "subject to change in the future as we iterate the design." ) self.vllm_config = vllm_config + self.kv_cache_config = kv_cache_config kv_transfer_config = vllm_config.kv_transfer_config assert kv_transfer_config is not None