OffloadingConnector: Add cpu_bytes_to_use configuration (#24498)
Signed-off-by: Or Ozeri <oro@il.ibm.com>
This commit is contained in:
@@ -37,10 +37,10 @@ For NixlConnector, you may also specify one or multiple NIXL_Backend. Such as:
|
||||
--kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both", "kv_buffer_device":"cuda", "kv_connector_extra_config":{"backends":["UCX", "GDS"]}}'
|
||||
```
|
||||
|
||||
- **OffloadingConnector**: enable offloading of KV data to CPU memory, customizing the CPU block size (in tokens) and number of blocks to allocate (per worker):
|
||||
- **OffloadingConnector**: enable offloading of KV data to CPU memory, customizing the CPU block size (in tokens) and total CPU memory bytes to allocate:
|
||||
|
||||
```bash
|
||||
--kv-transfer-config '{"kv_connector":"OffloadingConnector","kv_role":"kv_both","kv_connector_extra_config":{"block_size": 64, "num_cpu_blocks": 1000}}'
|
||||
--kv-transfer-config '{"kv_connector":"OffloadingConnector","kv_role":"kv_both","kv_connector_extra_config":{"block_size": 64, "cpu_bytes_to_use": 1000000000}}'
|
||||
```
|
||||
|
||||
## Benchmarks
|
||||
|
||||
@@ -15,7 +15,7 @@ pytestmark = pytest.mark.cpu_test
|
||||
[
|
||||
("native", 4.0, 1, 1, "OffloadingConnector", 4.0 * (1 << 30)),
|
||||
# bytes per rank: 8.0 GiB / (2 * 2) = 2.0 GiB
|
||||
("native", 8.0, 2, 2, "OffloadingConnector", 8.0 * (1 << 30) / 4),
|
||||
("native", 8.0, 2, 2, "OffloadingConnector", 8.0 * (1 << 30)),
|
||||
("lmcache", 4.0, 1, 1, "LMCacheConnectorV1", 4.0),
|
||||
# size per rank: 8.0 GiB / (2 * 2) = 2.0 GiB
|
||||
("lmcache", 8.0, 2, 2, "LMCacheConnectorV1", 2.0),
|
||||
@@ -54,8 +54,7 @@ def test_kv_connector(
|
||||
assert kv_transfer_config.kv_role == "kv_both"
|
||||
|
||||
if kv_offloading_backend == "native":
|
||||
assert kv_connector_extra_config["kv_bytes_per_rank"] == expected_bytes
|
||||
assert kv_connector_extra_config["num_cpu_blocks"] == 0
|
||||
assert kv_connector_extra_config["cpu_bytes_to_use"] == expected_bytes
|
||||
# Existing config should be preserved
|
||||
assert kv_connector_extra_config["existing_key"] == "existing_value"
|
||||
elif kv_offloading_backend == "lmcache":
|
||||
|
||||
@@ -26,6 +26,7 @@ from vllm.v1.core.kv_cache_utils import (
|
||||
init_none_hash,
|
||||
)
|
||||
from vllm.v1.core.sched.scheduler import Scheduler
|
||||
from vllm.v1.kv_cache_interface import KVCacheConfig
|
||||
from vllm.v1.kv_offload.abstract import (
|
||||
LoadStoreSpec,
|
||||
OffloadingEvent,
|
||||
@@ -93,8 +94,8 @@ class MockOffloadingHandler(OffloadingHandler):
|
||||
|
||||
|
||||
class MockOffloadingSpec(OffloadingSpec):
|
||||
def __init__(self, vllm_config: VllmConfig):
|
||||
super().__init__(vllm_config)
|
||||
def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
|
||||
super().__init__(vllm_config, kv_cache_config)
|
||||
|
||||
self.manager = MagicMock(spec=OffloadingManager)
|
||||
self.manager.lookup.return_value = 0
|
||||
|
||||
@@ -161,7 +161,7 @@ def test_cpu_offloading(cpu_block_size: int, attn_backend: str) -> None:
|
||||
kv_connector="OffloadingConnector",
|
||||
kv_role="kv_both",
|
||||
kv_connector_extra_config={
|
||||
"num_cpu_blocks": 1000,
|
||||
"cpu_bytes_to_use": 500 << 20,
|
||||
"block_size": cpu_block_size,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -516,12 +516,8 @@ class VllmConfig:
|
||||
|
||||
if kv_offloading_backend == "native":
|
||||
self.kv_transfer_config.kv_connector = "OffloadingConnector"
|
||||
kv_bytes_per_rank = kv_offloading_size * (1 << 30) / num_kv_ranks
|
||||
|
||||
# NOTE(ApostaC): the actual calculation for num_cpu_blocks should be
|
||||
# done after the model's KV cache is initialized
|
||||
self.kv_transfer_config.kv_connector_extra_config.update(
|
||||
{"kv_bytes_per_rank": kv_bytes_per_rank, "num_cpu_blocks": 0}
|
||||
{"cpu_bytes_to_use": kv_offloading_size * (1 << 30)}
|
||||
)
|
||||
elif kv_offloading_backend == "lmcache":
|
||||
self.kv_transfer_config.kv_connector = "LMCacheConnectorV1"
|
||||
|
||||
@@ -56,7 +56,7 @@ class OffloadingConnector(KVConnectorBase_V1):
|
||||
):
|
||||
super().__init__(vllm_config, role, kv_cache_config)
|
||||
|
||||
spec = OffloadingSpecFactory.create_spec(vllm_config)
|
||||
spec = OffloadingSpecFactory.create_spec(vllm_config, kv_cache_config)
|
||||
|
||||
self.connector_scheduler: OffloadingConnectorScheduler | None = None
|
||||
self.connector_worker: OffloadingConnectorWorker | None = None
|
||||
|
||||
@@ -7,6 +7,7 @@ import torch
|
||||
from vllm.config import VllmConfig
|
||||
from vllm.platforms import current_platform
|
||||
from vllm.v1.attention.backend import AttentionBackend
|
||||
from vllm.v1.kv_cache_interface import KVCacheConfig
|
||||
from vllm.v1.kv_offload.abstract import LoadStoreSpec, OffloadingManager
|
||||
from vllm.v1.kv_offload.arc_manager import ARCOffloadingManager
|
||||
from vllm.v1.kv_offload.backends.cpu import CPUBackend
|
||||
@@ -18,15 +19,37 @@ from vllm.v1.kv_offload.worker.worker import OffloadingHandler
|
||||
|
||||
|
||||
class CPUOffloadingSpec(OffloadingSpec):
|
||||
def __init__(self, vllm_config: VllmConfig):
|
||||
super().__init__(vllm_config)
|
||||
def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
|
||||
super().__init__(vllm_config, kv_cache_config)
|
||||
|
||||
num_cpu_blocks = self.extra_config.get("num_cpu_blocks")
|
||||
if not num_cpu_blocks:
|
||||
cpu_bytes_to_use = self.extra_config.get("cpu_bytes_to_use")
|
||||
if not cpu_bytes_to_use:
|
||||
raise Exception(
|
||||
"num_cpu_blocks must be specified in kv_connector_extra_config"
|
||||
"cpu_bytes_to_use must be specified in kv_connector_extra_config"
|
||||
)
|
||||
self.num_cpu_blocks: int = num_cpu_blocks
|
||||
|
||||
# calculate kv_bytes_per_offloaded_block
|
||||
assert kv_cache_config is not None
|
||||
page_sizes = {
|
||||
kv_cache_group.kv_cache_spec.page_size_bytes
|
||||
for kv_cache_group in kv_cache_config.kv_cache_groups
|
||||
}
|
||||
assert len(page_sizes) == 1
|
||||
page_size_bytes = page_sizes.pop()
|
||||
kv_bytes_per_block = (
|
||||
page_size_bytes
|
||||
* len(kv_cache_config.kv_cache_tensors)
|
||||
* vllm_config.parallel_config.world_size
|
||||
)
|
||||
kv_bytes_per_offloaded_block = kv_bytes_per_block * (
|
||||
self.offloaded_block_size // self.gpu_block_size
|
||||
)
|
||||
|
||||
self.num_blocks = (
|
||||
int(cpu_bytes_to_use) // kv_bytes_per_offloaded_block
|
||||
if kv_bytes_per_offloaded_block > 0
|
||||
else 0
|
||||
)
|
||||
|
||||
# scheduler-side
|
||||
self._manager: OffloadingManager | None = None
|
||||
@@ -44,7 +67,7 @@ class CPUOffloadingSpec(OffloadingSpec):
|
||||
)
|
||||
|
||||
backend = CPUBackend(
|
||||
block_size=self.offloaded_block_size, num_blocks=self.num_cpu_blocks
|
||||
block_size=self.offloaded_block_size, num_blocks=self.num_blocks
|
||||
)
|
||||
|
||||
if self.eviction_policy == "lru":
|
||||
@@ -77,7 +100,7 @@ class CPUOffloadingSpec(OffloadingSpec):
|
||||
attn_backends=attn_backends,
|
||||
gpu_block_size=self.gpu_block_size,
|
||||
cpu_block_size=self.offloaded_block_size,
|
||||
num_cpu_blocks=self.num_cpu_blocks,
|
||||
num_cpu_blocks=self.num_blocks,
|
||||
gpu_caches=kv_caches,
|
||||
)
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from vllm.v1.kv_offload.spec import OffloadingSpec
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from vllm.config import VllmConfig
|
||||
from vllm.v1.kv_cache_interface import KVCacheConfig
|
||||
|
||||
logger = init_logger(__name__)
|
||||
|
||||
@@ -32,6 +33,7 @@ class OffloadingSpecFactory:
|
||||
def create_spec(
|
||||
cls,
|
||||
config: "VllmConfig",
|
||||
kv_cache_config: "KVCacheConfig | None",
|
||||
) -> OffloadingSpec:
|
||||
kv_transfer_config = config.kv_transfer_config
|
||||
assert kv_transfer_config is not None
|
||||
@@ -47,7 +49,7 @@ class OffloadingSpecFactory:
|
||||
spec_cls = getattr(spec_module, spec_name)
|
||||
assert issubclass(spec_cls, OffloadingSpec)
|
||||
logger.info("Creating offloading spec with name: %s", spec_name)
|
||||
return spec_cls(config)
|
||||
return spec_cls(config, kv_cache_config)
|
||||
|
||||
|
||||
# Register various specs here.
|
||||
|
||||
@@ -13,6 +13,7 @@ from vllm.v1.kv_offload.worker.worker import OffloadingHandler
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from vllm.config import VllmConfig
|
||||
from vllm.v1.kv_cache_interface import KVCacheConfig
|
||||
|
||||
logger = init_logger(__name__)
|
||||
|
||||
@@ -20,12 +21,15 @@ logger = init_logger(__name__)
|
||||
class OffloadingSpec(ABC):
|
||||
"""Spec for an offloading connector"""
|
||||
|
||||
def __init__(self, vllm_config: "VllmConfig"):
|
||||
def __init__(
|
||||
self, vllm_config: "VllmConfig", kv_cache_config: "KVCacheConfig | None"
|
||||
):
|
||||
logger.warning(
|
||||
"Initializing OffloadingSpec. This API is experimental and "
|
||||
"subject to change in the future as we iterate the design."
|
||||
)
|
||||
self.vllm_config = vllm_config
|
||||
self.kv_cache_config = kv_cache_config
|
||||
|
||||
kv_transfer_config = vllm_config.kv_transfer_config
|
||||
assert kv_transfer_config is not None
|
||||
|
||||
Reference in New Issue
Block a user