[KV Connector] Support using FlexKV as KV Cache Offloading option. (#34328)

Signed-off-by: phaedonsun <phaedonsun@tencent.com>
Co-authored-by: phaedonsun <phaedonsun@tencent.com>
This commit is contained in:
sfeiqiang
2026-03-12 15:46:20 +08:00
committed by GitHub
parent 00726c74c9
commit 8cb24d3aed
5 changed files with 725 additions and 0 deletions

View File

@@ -44,6 +44,12 @@ For NixlConnector, you may also specify one or multiple NIXL_Backend. Such as:
--kv-transfer-config '{"kv_connector":"OffloadingConnector","kv_role":"kv_both","kv_connector_extra_config":{"block_size": 64, "cpu_bytes_to_use": 1000000000}}'
```
- **FlexKVConnectorV1**: refer to [examples/offline_inference/prefix_caching_flexkv.py](../../examples/offline_inference/prefix_caching_flexkv.py) for the example usage of FlexKVConnectorV1. FlexKV is a distributed KV Store and multi-level cache management system for ultra-large-scale LLM inference.
```bash
--kv-transfer-config '{"kv_connector":"FlexKVConnectorV1","kv_role":"kv_both"}'
```
## Benchmarks
Please refer to [benchmarks/disagg_benchmarks](../../benchmarks/disagg_benchmarks) for disaggregated prefilling benchmarks.

View File

@@ -0,0 +1,221 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
This example shows how to use FlexKV with vLLM for prefix caching.
FlexKV is a distributed KV Store and multi-level cache management system for
ultra-large-scale LLM inference.
Requirements:
- Install FlexKV (https://github.com/taco-project/FlexKV):
1. git clone git@github.com:taco-project/FlexKV.git
2. cd FlexKV && bash build.sh
- Ensure FlexKV is compatible with your vLLM version.
Usage:
1. Run this script:
python examples/offline_inference/prefix_caching_flexkv.py \
--model /path/to/your/model
2. Arguments:
--model Path or name of the model (required)
--tp-size Tensor parallel size (default: 1)
--gpu-memory-util GPU memory utilization (default: 0.4)
3. The script will:
- Create a FlexKV configuration file.
- Set the FLEXKV_CONFIG_PATH environment variable.
- Run vLLM with FlexKVConnectorV1 enabled.
- Compare results between regular execution, vLLM's default prefix
caching, and FlexKV.
"""
import argparse
import json
import os
import time
from vllm import LLM, SamplingParams
from vllm.distributed import cleanup_dist_env_and_memory
# NOTE: This is just a running example. For benchmarking purpose,
# please see benchmarks/benchmark_prefix_caching.py
def parse_args():
parser = argparse.ArgumentParser(
description="Example of using FlexKV with vLLM for prefix caching."
)
parser.add_argument(
"--model",
type=str,
required=True,
help="Path or name of the model to use.",
)
parser.add_argument(
"--tp-size",
type=int,
default=1,
help="Tensor parallel size (default: 1).",
)
parser.add_argument(
"--gpu-memory-util",
type=float,
default=0.4,
help="GPU memory utilization fraction (default: 0.4).",
)
return parser.parse_args()
def main():
args = parse_args()
flexkv_config = {
"server_recv_port": f"ipc:///tmp/flexkv_test_{os.getpid()}",
"cache_config": {
"enable_cpu": True,
"num_cpu_blocks": 10240,
},
"num_log_interval_requests": 200,
}
flexkv_config_path = f"./flexkv_config_{os.getpid()}.json"
with open(flexkv_config_path, "w") as f:
json.dump(flexkv_config, f)
os.environ["FLEXKV_CONFIG_PATH"] = flexkv_config_path
try:
_run(args)
finally:
if os.path.exists(flexkv_config_path):
os.remove(flexkv_config_path)
def _run(args):
# Common prefix.
prefix = (
"You are an expert school principal, skilled in effectively managing "
"faculty and staff. Draft 10-15 questions for a potential first grade "
"Head Teacher for my K-12, all-girls', independent school that emphasizes "
"community, joyful discovery, and life-long learning. The candidate is "
"coming in for a first-round panel interview for a 8th grade Math "
"teaching role. They have 5 years of previous teaching experience "
"as an assistant teacher at a co-ed, public school with experience "
"in middle school math teaching. Based on these information, fulfill "
"the following paragraph: "
)
# Sample prompts.
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
generating_prompts = [prefix + prompt for prompt in prompts]
# Create a sampling params object.
sampling_params = SamplingParams(temperature=0.0)
kv_transfer_config = {
"kv_connector": "FlexKVConnectorV1",
"kv_role": "kv_both",
}
# Create an LLM without prefix caching as a baseline.
regular_llm = LLM(
model=args.model,
enable_prefix_caching=False,
gpu_memory_utilization=args.gpu_memory_util,
tensor_parallel_size=args.tp_size,
)
print("Results without `enable_prefix_caching`")
# ruff: noqa: E501
# Generate texts from the prompts. The output is a list of RequestOutput
# objects that contain the prompt, generated text, and other information.
outputs = regular_llm.generate(generating_prompts, sampling_params)
regular_generated_texts = []
# Print the outputs.
print("-" * 50)
for output in outputs:
prompt = output.prompt
generated_text = output.outputs[0].text
regular_generated_texts.append(generated_text)
print(f"Prompt: {prompt!r}\nGenerated text: {generated_text!r}")
print("-" * 50)
# Destroy the LLM object and free up the GPU memory.
del regular_llm
cleanup_dist_env_and_memory()
# Create an LLM with prefix caching enabled.
prefix_cached_llm = LLM(
model=args.model,
enable_prefix_caching=True,
gpu_memory_utilization=args.gpu_memory_util,
tensor_parallel_size=args.tp_size,
kv_transfer_config=kv_transfer_config,
)
# Warmup so that the shared prompt's KV cache is computed.
prefix_cached_llm.generate(generating_prompts[0], sampling_params)
# wait for offload kv task finished.
time.sleep(2)
# Generate with prefix caching.
outputs = prefix_cached_llm.generate(generating_prompts, sampling_params)
print("Results with `enable_prefix_caching`")
cached_generated_texts = []
# Print the outputs. You should see the same outputs as before.
print("-" * 50)
for output in outputs:
prompt = output.prompt
generated_text = output.outputs[0].text
cached_generated_texts.append(generated_text)
print(f"Prompt: {prompt!r}\nGenerated text: {generated_text!r}")
print("-" * 50)
# Compare the results and display the speedup
generated_same = all(
regular_generated_texts[i] == cached_generated_texts[i]
for i in range(len(prompts))
)
print(f"Generated answers are the same: {generated_same}")
# wait for offload kv task finished.
time.sleep(2)
# reset prefix cache to use flexkv
prefix_cached_llm.reset_prefix_cache()
# Generate with prefix caching.
outputs = prefix_cached_llm.generate(generating_prompts, sampling_params)
print("Results with `flexkv`")
flexkv_generated_texts = []
# Print the outputs. You should see the same outputs as before.
print("-" * 50)
for output in outputs:
prompt = output.prompt
generated_text = output.outputs[0].text
flexkv_generated_texts.append(generated_text)
print(f"Prompt: {prompt!r}\nGenerated text: {generated_text!r}")
print("-" * 50)
# Compare the results and display the speedup
generated_same = all(
regular_generated_texts[i] == flexkv_generated_texts[i]
for i in range(len(prompts))
)
print(f"Generated answers are the same: {generated_same}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,232 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Unit tests for FlexKVConnectorV1.
These tests mock the ``flexkv`` package so they can run without a real FlexKV
installation. They verify:
1. That ``FlexKVConnectorV1`` raises a helpful ``ImportError`` when FlexKV is
not installed.
2. That all public methods are correctly delegated to the underlying
``FlexKVConnectorV1Impl``.
"""
import sys
import types
from unittest.mock import MagicMock, patch
import pytest
import torch
from vllm.config import KVTransferConfig, VllmConfig
from vllm.distributed.kv_transfer.kv_connector.v1 import KVConnectorRole
from vllm.v1.kv_cache_interface import KVCacheConfig
from .utils import create_vllm_config
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_vllm_config(
kv_connector: str = "FlexKVConnectorV1",
kv_role: str = "kv_both",
) -> VllmConfig:
"""Return a minimal VllmConfig with a KVTransferConfig attached."""
vllm_config = create_vllm_config(block_size=16, max_num_batched_tokens=512)
vllm_config.kv_transfer_config = KVTransferConfig(
kv_connector=kv_connector,
kv_role=kv_role,
)
return vllm_config
def _make_kv_cache_config() -> KVCacheConfig:
return MagicMock(spec=KVCacheConfig)
def _make_flexkv_module(
impl_mock: MagicMock,
) -> tuple[types.ModuleType, types.ModuleType]:
"""Build a fake ``flexkv`` package hierarchy that returns *impl_mock*
when ``FlexKVConnectorV1Impl`` is instantiated."""
flexkv_mod = types.ModuleType("flexkv")
integration_mod = types.ModuleType("flexkv.integration")
vllm_mod = types.ModuleType("flexkv.integration.vllm")
adapter_mod = types.ModuleType("flexkv.integration.vllm.vllm_v1_adapter")
# Make FlexKVConnectorV1Impl() return our mock instance.
# The "# type: ignore" markers below are needed because ModuleType does
# not declare these attributes statically; they are set dynamically.
FlexKVConnectorV1ImplCls = MagicMock(return_value=impl_mock)
adapter_mod.FlexKVConnectorV1Impl = FlexKVConnectorV1ImplCls # type: ignore
flexkv_mod.integration = integration_mod # type: ignore
integration_mod.vllm = vllm_mod # type: ignore
vllm_mod.vllm_v1_adapter = adapter_mod # type: ignore
return flexkv_mod, adapter_mod
def _install_flexkv_mock(impl_mock: MagicMock):
"""Insert fake flexkv modules into sys.modules and return a context that
cleans them up afterwards."""
flexkv_mod, adapter_mod = _make_flexkv_module(impl_mock)
mods = {
"flexkv": flexkv_mod,
"flexkv.integration": flexkv_mod.integration,
"flexkv.integration.vllm": flexkv_mod.integration.vllm,
"flexkv.integration.vllm.vllm_v1_adapter": adapter_mod,
}
return patch.dict(sys.modules, mods)
def _build_connector(vllm_config: VllmConfig, impl_mock: MagicMock):
"""Instantiate FlexKVConnectorV1 with faked flexkv modules."""
from vllm.distributed.kv_transfer.kv_connector.v1.flexkv_connector import (
FlexKVConnectorV1,
)
with _install_flexkv_mock(impl_mock):
connector = FlexKVConnectorV1(
vllm_config=vllm_config,
role=KVConnectorRole.WORKER,
kv_cache_config=_make_kv_cache_config(),
)
return connector
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestFlexKVConnectorImportError:
"""FlexKVConnectorV1 should fail with a helpful message when flexkv is
absent."""
def test_import_error_message(self):
from vllm.distributed.kv_transfer.kv_connector.v1.flexkv_connector import (
FlexKVConnectorV1,
)
# Ensure flexkv is NOT in sys.modules
for key in list(sys.modules):
if key.startswith("flexkv"):
del sys.modules[key]
with pytest.raises(ImportError, match="(?i)flexkv") as exc_info:
FlexKVConnectorV1(
vllm_config=_make_vllm_config(),
role=KVConnectorRole.WORKER,
kv_cache_config=_make_kv_cache_config(),
)
assert "https://github.com/taco-project/FlexKV" in str(exc_info.value)
class TestFlexKVConnectorDelegation:
"""All public API methods should be forwarded to the impl."""
@pytest.fixture()
def connector_and_impl(self):
impl = MagicMock()
cfg = _make_vllm_config()
connector = _build_connector(cfg, impl)
return connector, impl
def test_shutdown(self, connector_and_impl):
connector, impl = connector_and_impl
connector.shutdown()
impl.shutdown.assert_called_once()
def test_start_load_kv(self, connector_and_impl):
connector, impl = connector_and_impl
ctx = MagicMock()
connector.start_load_kv(ctx, extra_arg="x")
impl.start_load_kv.assert_called_once_with(ctx, extra_arg="x")
def test_save_kv_layer(self, connector_and_impl):
connector, impl = connector_and_impl
kv_layer = torch.zeros(4, 4)
attn_meta = MagicMock()
connector.save_kv_layer("layer_0", kv_layer, attn_meta)
impl.save_kv_layer.assert_called_once_with("layer_0", kv_layer, attn_meta)
def test_wait_for_save(self, connector_and_impl):
connector, impl = connector_and_impl
connector.wait_for_save()
impl.wait_for_save.assert_called_once()
def test_get_finished(self, connector_and_impl):
connector, impl = connector_and_impl
impl.get_finished.return_value = ({"req1"}, None)
result = connector.get_finished({"req1"})
impl.get_finished.assert_called_once_with({"req1"})
assert result == ({"req1"}, None)
def test_register_kv_caches(self, connector_and_impl):
connector, impl = connector_and_impl
kv_caches = {"layer_0": torch.zeros(1)}
connector.register_kv_caches(kv_caches)
impl.register_kv_caches.assert_called_once_with(kv_caches)
def test_get_num_new_matched_tokens(self, connector_and_impl):
connector, impl = connector_and_impl
req = MagicMock()
impl.get_num_new_matched_tokens.return_value = (10, False)
result = connector.get_num_new_matched_tokens(req, 5)
impl.get_num_new_matched_tokens.assert_called_once_with(req, 5)
assert result == (10, False)
def test_update_state_after_alloc(self, connector_and_impl):
connector, impl = connector_and_impl
req = MagicMock()
blocks = MagicMock()
connector.update_state_after_alloc(req, blocks, 4)
impl.update_state_after_alloc.assert_called_once_with(req, blocks, 4)
def test_build_connector_meta(self, connector_and_impl):
connector, impl = connector_and_impl
sched_out = MagicMock()
connector.build_connector_meta(sched_out)
impl.build_connector_meta.assert_called_once_with(sched_out)
def test_update_connector_output(self, connector_and_impl):
connector, impl = connector_and_impl
out = MagicMock()
connector.update_connector_output(out)
impl.update_connector_output.assert_called_once_with(out)
def test_request_finished(self, connector_and_impl):
connector, impl = connector_and_impl
req = MagicMock()
impl.request_finished.return_value = (True, {"key": "val"})
result = connector.request_finished(req, [1, 2, 3])
impl.request_finished.assert_called_once_with(req, [1, 2, 3])
assert result == (True, {"key": "val"})
def test_take_events(self, connector_and_impl):
connector, impl = connector_and_impl
impl.take_events.return_value = iter([])
list(connector.take_events())
impl.take_events.assert_called_once()
def test_get_kv_connector_stats(self, connector_and_impl):
connector, impl = connector_and_impl
impl.get_kv_connector_stats.return_value = None
result = connector.get_kv_connector_stats()
impl.get_kv_connector_stats.assert_called_once()
assert result is None
def test_get_block_ids_with_load_errors(self, connector_and_impl):
connector, impl = connector_and_impl
impl.get_block_ids_with_load_errors.return_value = {7, 8}
result = connector.get_block_ids_with_load_errors()
assert result == {7, 8}
def test_wait_for_layer_load(self, connector_and_impl):
connector, impl = connector_and_impl
connector.wait_for_layer_load("layer_0")
impl.wait_for_layer_load.assert_called_once_with("layer_0")

View File

@@ -207,3 +207,9 @@ KVConnectorFactory.register_connector(
"vllm.distributed.kv_transfer.kv_connector.v1.mooncake.mooncake_connector",
"MooncakeConnector",
)
KVConnectorFactory.register_connector(
"FlexKVConnectorV1",
"vllm.distributed.kv_transfer.kv_connector.v1.flexkv_connector",
"FlexKVConnectorV1",
)

View File

@@ -0,0 +1,260 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any
import torch
from vllm.config import VllmConfig
from vllm.distributed.kv_transfer.kv_connector.v1.base import (
KVConnectorBase_V1,
KVConnectorMetadata,
KVConnectorRole,
)
from vllm.distributed.kv_transfer.kv_connector.v1.metrics import KVConnectorStats
from vllm.logger import init_logger
from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.outputs import KVConnectorOutput
if TYPE_CHECKING:
from vllm.distributed.kv_events import KVCacheEvent
from vllm.forward_context import ForwardContext
from vllm.v1.attention.backend import AttentionMetadata
from vllm.v1.core.kv_cache_manager import KVCacheBlocks
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.request import Request
logger = init_logger(__name__)
# FlexKV is a distributed KV Store and multi-level cache management system for
# ultra-large-scale LLM inference.
# GitHub: https://github.com/taco-project/FlexKV
# Install: git clone git@github.com:taco-project/FlexKV.git \
# && cd FlexKV && bash build.sh
class FlexKVConnectorV1(KVConnectorBase_V1):
"""KV Connector that offloads KV cache to FlexKV.
FlexKV is a distributed KV Store and multi-level cache management system
designed for ultra-large-scale LLM inference. It supports offloading KV
cache to CPU memory, SSD, and remote storage.
Installation:
See https://github.com/taco-project/FlexKV for installation instructions.
Quick start::
git clone git@github.com:taco-project/FlexKV.git
cd FlexKV && bash build.sh
Configuration:
Pass ``kv_connector="FlexKVConnectorV1"`` via ``--kv-transfer-config``::
--kv-transfer-config \
'{"kv_connector":"FlexKVConnectorV1","kv_role":"kv_both"}'
"""
def __init__(
self,
vllm_config: "VllmConfig",
role: KVConnectorRole,
kv_cache_config: "KVCacheConfig",
):
super().__init__(
vllm_config=vllm_config, role=role, kv_cache_config=kv_cache_config
)
try:
from flexkv.integration.vllm.vllm_v1_adapter import FlexKVConnectorV1Impl
except ImportError as e:
raise ImportError(
"FlexKV is not installed. Please install it to use "
"FlexKVConnectorV1. See https://github.com/taco-project/FlexKV "
"for installation instructions."
) from e
self._flexkv_connector = FlexKVConnectorV1Impl(vllm_config, role)
def shutdown(self):
self._flexkv_connector.shutdown()
# ==============================
# Worker-side methods
# ==============================
def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
"""No-op for FlexKV (currently).
FlexKV manages all KV transfers on the **scheduler side** via
``build_connector_meta`` (which calls ``launch_tasks``) and
``update_connector_output`` (which polls ``query_finished_task``).
KV blocks are transferred directly between the FlexKV server and
vLLM's GPU memory without worker-side intervention during the
forward pass — similar to how NIXL operates.
These worker-side hooks are kept (rather than omitted) to satisfy
the ``KVConnectorBase_V1`` interface contract and to serve as
extension points for a future worker-side layer-pipelining path.
Args:
forward_context (ForwardContext): the forward context.
**kwargs (Any): additional arguments (unused).
"""
self._flexkv_connector.start_load_kv(forward_context, **kwargs)
def wait_for_layer_load(self, layer_name: str) -> None:
"""No-op for FlexKV (currently).
FlexKV manages all KV transfers on the scheduler side.
This hook is retained for ``KVConnectorBase_V1`` API compatibility.
Args:
layer_name: the name of the layer (unused).
"""
self._flexkv_connector.wait_for_layer_load(layer_name)
def save_kv_layer(
self,
layer_name: str,
kv_layer: torch.Tensor,
attn_metadata: "AttentionMetadata",
**kwargs,
) -> None:
"""No-op for FlexKV (currently).
FlexKV offloads KV cache asynchronously from the scheduler side
after a request finishes (see ``request_finished``). It does not
intercept individual layer tensors during the forward pass.
This hook is retained to satisfy ``KVConnectorBase_V1`` and as an
extension point for future per-layer async offload support.
Args:
layer_name (str): the name of the layer (unused).
kv_layer (torch.Tensor): the paged KV buffer (unused).
attn_metadata (AttentionMetadata): the attention metadata (unused).
**kwargs (Any): additional arguments (unused).
"""
self._flexkv_connector.save_kv_layer(
layer_name, kv_layer, attn_metadata, **kwargs
)
def wait_for_save(self):
"""No-op for FlexKV (currently).
KV offload tasks are tracked asynchronously by the scheduler
connector via ``request_finished`` / ``query_finished_task``.
There is no pending worker-side save to wait for at
forward-context exit.
Retained to satisfy ``KVConnectorBase_V1`` and as an extension
point for future worker-side save-completion signalling.
"""
self._flexkv_connector.wait_for_save()
def get_finished(
self, finished_req_ids: set[str]
) -> tuple[set[str] | None, set[str] | None]:
"""Notify worker-side connector of requests that have finished
generating tokens.
Returns:
Tuple of (sending/saving ids, recving/loading ids) for requests
that have finished asynchronous transfer. The finished saves/sends
req ids must belong to a set provided in a call to this method
(this call or a prior one).
"""
return self._flexkv_connector.get_finished(finished_req_ids)
def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]):
"""Initialize with the KV caches. Useful for pre-registering the
KV caches in the KVConnector (e.g. for NIXL).
Args:
kv_caches: dictionary of layer names to kv cache tensors.
"""
self._flexkv_connector.register_kv_caches(kv_caches)
# ==============================
# Scheduler-side methods
# ==============================
def get_num_new_matched_tokens(
self,
request: "Request",
num_computed_tokens: int,
) -> tuple[int, bool]:
"""Get the number of new tokens that can be loaded from the
external KV cache beyond ``num_computed_tokens``.
Args:
request (Request): the request object.
num_computed_tokens (int): the number of locally computed
tokens for this request.
Returns:
Tuple of (num_external_tokens, is_ready) where
num_external_tokens is the number of additional tokens that
can be loaded from the external KV cache.
"""
return self._flexkv_connector.get_num_new_matched_tokens(
request, num_computed_tokens
)
def update_state_after_alloc(
self, request: "Request", blocks: "KVCacheBlocks", num_external_tokens: int
):
"""Update KVConnector state after block allocation."""
self._flexkv_connector.update_state_after_alloc(
request, blocks, num_external_tokens
)
def build_connector_meta(
self, scheduler_output: SchedulerOutput
) -> KVConnectorMetadata:
"""Build the connector metadata for this step.
This function should NOT modify fields in the scheduler_output.
Also, calling this function will reset the state of the connector.
Args:
scheduler_output (SchedulerOutput): the scheduler output object.
"""
return self._flexkv_connector.build_connector_meta(scheduler_output)
def update_connector_output(self, connector_output: KVConnectorOutput):
"""Update KVConnector state from worker-side connectors output.
Args:
connector_output (KVConnectorOutput): the worker-side
connectors output.
"""
self._flexkv_connector.update_connector_output(connector_output)
def request_finished(
self,
request: "Request",
block_ids: list[int],
) -> tuple[bool, dict[str, Any] | None]:
"""Called when a request has finished, before its blocks are freed.
Returns:
Tuple of (async_save, kv_transfer_params) where async_save is
True if the request is being saved/sent asynchronously and blocks
should not be freed until the request_id is returned from
:meth:`get_finished`. kv_transfer_params is an optional dict of
KVTransferParams to be included in the request outputs.
"""
return self._flexkv_connector.request_finished(request, block_ids)
def take_events(self) -> Iterable["KVCacheEvent"]:
"""Collect buffered KV cache events.
Returns:
New KV cache events since the last call.
"""
return self._flexkv_connector.take_events()
def get_kv_connector_stats(self) -> KVConnectorStats | None:
"""Get the KV connector stats collected during the last interval."""
return self._flexkv_connector.get_kv_connector_stats()
def get_block_ids_with_load_errors(self) -> set[int]:
"""Get the block ids that have failed to load."""
return self._flexkv_connector.get_block_ids_with_load_errors()