diff --git a/docs/features/disagg_prefill.md b/docs/features/disagg_prefill.md index af5f77747..f7d3f9a70 100644 --- a/docs/features/disagg_prefill.md +++ b/docs/features/disagg_prefill.md @@ -44,6 +44,12 @@ For NixlConnector, you may also specify one or multiple NIXL_Backend. Such as: --kv-transfer-config '{"kv_connector":"OffloadingConnector","kv_role":"kv_both","kv_connector_extra_config":{"block_size": 64, "cpu_bytes_to_use": 1000000000}}' ``` +- **FlexKVConnectorV1**: refer to [examples/offline_inference/prefix_caching_flexkv.py](../../examples/offline_inference/prefix_caching_flexkv.py) for the example usage of FlexKVConnectorV1. FlexKV is a distributed KV Store and multi-level cache management system for ultra-large-scale LLM inference. + + ```bash + --kv-transfer-config '{"kv_connector":"FlexKVConnectorV1","kv_role":"kv_both"}' + ``` + ## Benchmarks Please refer to [benchmarks/disagg_benchmarks](../../benchmarks/disagg_benchmarks) for disaggregated prefilling benchmarks. diff --git a/examples/offline_inference/prefix_caching_flexkv.py b/examples/offline_inference/prefix_caching_flexkv.py new file mode 100644 index 000000000..f2ffb75ef --- /dev/null +++ b/examples/offline_inference/prefix_caching_flexkv.py @@ -0,0 +1,221 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +This example shows how to use FlexKV with vLLM for prefix caching. + +FlexKV is a distributed KV Store and multi-level cache management system for +ultra-large-scale LLM inference. + +Requirements: + - Install FlexKV (https://github.com/taco-project/FlexKV): + 1. git clone git@github.com:taco-project/FlexKV.git + 2. cd FlexKV && bash build.sh + - Ensure FlexKV is compatible with your vLLM version. + +Usage: + 1. Run this script: + python examples/offline_inference/prefix_caching_flexkv.py \ + --model /path/to/your/model + + 2. Arguments: + --model Path or name of the model (required) + --tp-size Tensor parallel size (default: 1) + --gpu-memory-util GPU memory utilization (default: 0.4) + + 3. The script will: + - Create a FlexKV configuration file. + - Set the FLEXKV_CONFIG_PATH environment variable. + - Run vLLM with FlexKVConnectorV1 enabled. + - Compare results between regular execution, vLLM's default prefix + caching, and FlexKV. +""" + +import argparse +import json +import os +import time + +from vllm import LLM, SamplingParams +from vllm.distributed import cleanup_dist_env_and_memory + +# NOTE: This is just a running example. For benchmarking purpose, +# please see benchmarks/benchmark_prefix_caching.py + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Example of using FlexKV with vLLM for prefix caching." + ) + parser.add_argument( + "--model", + type=str, + required=True, + help="Path or name of the model to use.", + ) + parser.add_argument( + "--tp-size", + type=int, + default=1, + help="Tensor parallel size (default: 1).", + ) + parser.add_argument( + "--gpu-memory-util", + type=float, + default=0.4, + help="GPU memory utilization fraction (default: 0.4).", + ) + return parser.parse_args() + + +def main(): + args = parse_args() + + flexkv_config = { + "server_recv_port": f"ipc:///tmp/flexkv_test_{os.getpid()}", + "cache_config": { + "enable_cpu": True, + "num_cpu_blocks": 10240, + }, + "num_log_interval_requests": 200, + } + flexkv_config_path = f"./flexkv_config_{os.getpid()}.json" + with open(flexkv_config_path, "w") as f: + json.dump(flexkv_config, f) + os.environ["FLEXKV_CONFIG_PATH"] = flexkv_config_path + + try: + _run(args) + finally: + if os.path.exists(flexkv_config_path): + os.remove(flexkv_config_path) + + +def _run(args): + # Common prefix. + prefix = ( + "You are an expert school principal, skilled in effectively managing " + "faculty and staff. Draft 10-15 questions for a potential first grade " + "Head Teacher for my K-12, all-girls', independent school that emphasizes " + "community, joyful discovery, and life-long learning. The candidate is " + "coming in for a first-round panel interview for a 8th grade Math " + "teaching role. They have 5 years of previous teaching experience " + "as an assistant teacher at a co-ed, public school with experience " + "in middle school math teaching. Based on these information, fulfill " + "the following paragraph: " + ) + + # Sample prompts. + prompts = [ + "Hello, my name is", + "The president of the United States is", + "The capital of France is", + "The future of AI is", + ] + + generating_prompts = [prefix + prompt for prompt in prompts] + + # Create a sampling params object. + sampling_params = SamplingParams(temperature=0.0) + + kv_transfer_config = { + "kv_connector": "FlexKVConnectorV1", + "kv_role": "kv_both", + } + + # Create an LLM without prefix caching as a baseline. + regular_llm = LLM( + model=args.model, + enable_prefix_caching=False, + gpu_memory_utilization=args.gpu_memory_util, + tensor_parallel_size=args.tp_size, + ) + + print("Results without `enable_prefix_caching`") + + # ruff: noqa: E501 + # Generate texts from the prompts. The output is a list of RequestOutput + # objects that contain the prompt, generated text, and other information. + outputs = regular_llm.generate(generating_prompts, sampling_params) + + regular_generated_texts = [] + # Print the outputs. + print("-" * 50) + for output in outputs: + prompt = output.prompt + generated_text = output.outputs[0].text + regular_generated_texts.append(generated_text) + print(f"Prompt: {prompt!r}\nGenerated text: {generated_text!r}") + print("-" * 50) + + # Destroy the LLM object and free up the GPU memory. + del regular_llm + cleanup_dist_env_and_memory() + + # Create an LLM with prefix caching enabled. + prefix_cached_llm = LLM( + model=args.model, + enable_prefix_caching=True, + gpu_memory_utilization=args.gpu_memory_util, + tensor_parallel_size=args.tp_size, + kv_transfer_config=kv_transfer_config, + ) + + # Warmup so that the shared prompt's KV cache is computed. + prefix_cached_llm.generate(generating_prompts[0], sampling_params) + + # wait for offload kv task finished. + time.sleep(2) + + # Generate with prefix caching. + outputs = prefix_cached_llm.generate(generating_prompts, sampling_params) + + print("Results with `enable_prefix_caching`") + + cached_generated_texts = [] + # Print the outputs. You should see the same outputs as before. + print("-" * 50) + for output in outputs: + prompt = output.prompt + generated_text = output.outputs[0].text + cached_generated_texts.append(generated_text) + print(f"Prompt: {prompt!r}\nGenerated text: {generated_text!r}") + print("-" * 50) + + # Compare the results and display the speedup + generated_same = all( + regular_generated_texts[i] == cached_generated_texts[i] + for i in range(len(prompts)) + ) + print(f"Generated answers are the same: {generated_same}") + + # wait for offload kv task finished. + time.sleep(2) + + # reset prefix cache to use flexkv + prefix_cached_llm.reset_prefix_cache() + + # Generate with prefix caching. + outputs = prefix_cached_llm.generate(generating_prompts, sampling_params) + + print("Results with `flexkv`") + + flexkv_generated_texts = [] + # Print the outputs. You should see the same outputs as before. + print("-" * 50) + for output in outputs: + prompt = output.prompt + generated_text = output.outputs[0].text + flexkv_generated_texts.append(generated_text) + print(f"Prompt: {prompt!r}\nGenerated text: {generated_text!r}") + print("-" * 50) + + # Compare the results and display the speedup + generated_same = all( + regular_generated_texts[i] == flexkv_generated_texts[i] + for i in range(len(prompts)) + ) + print(f"Generated answers are the same: {generated_same}") + + +if __name__ == "__main__": + main() diff --git a/tests/v1/kv_connector/unit/test_flexkv_connector.py b/tests/v1/kv_connector/unit/test_flexkv_connector.py new file mode 100644 index 000000000..8cb573663 --- /dev/null +++ b/tests/v1/kv_connector/unit/test_flexkv_connector.py @@ -0,0 +1,232 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Unit tests for FlexKVConnectorV1. + +These tests mock the ``flexkv`` package so they can run without a real FlexKV +installation. They verify: + +1. That ``FlexKVConnectorV1`` raises a helpful ``ImportError`` when FlexKV is + not installed. +2. That all public methods are correctly delegated to the underlying + ``FlexKVConnectorV1Impl``. +""" + +import sys +import types +from unittest.mock import MagicMock, patch + +import pytest +import torch + +from vllm.config import KVTransferConfig, VllmConfig +from vllm.distributed.kv_transfer.kv_connector.v1 import KVConnectorRole +from vllm.v1.kv_cache_interface import KVCacheConfig + +from .utils import create_vllm_config + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_vllm_config( + kv_connector: str = "FlexKVConnectorV1", + kv_role: str = "kv_both", +) -> VllmConfig: + """Return a minimal VllmConfig with a KVTransferConfig attached.""" + vllm_config = create_vllm_config(block_size=16, max_num_batched_tokens=512) + vllm_config.kv_transfer_config = KVTransferConfig( + kv_connector=kv_connector, + kv_role=kv_role, + ) + return vllm_config + + +def _make_kv_cache_config() -> KVCacheConfig: + return MagicMock(spec=KVCacheConfig) + + +def _make_flexkv_module( + impl_mock: MagicMock, +) -> tuple[types.ModuleType, types.ModuleType]: + """Build a fake ``flexkv`` package hierarchy that returns *impl_mock* + when ``FlexKVConnectorV1Impl`` is instantiated.""" + flexkv_mod = types.ModuleType("flexkv") + integration_mod = types.ModuleType("flexkv.integration") + vllm_mod = types.ModuleType("flexkv.integration.vllm") + adapter_mod = types.ModuleType("flexkv.integration.vllm.vllm_v1_adapter") + + # Make FlexKVConnectorV1Impl() return our mock instance. + # The "# type: ignore" markers below are needed because ModuleType does + # not declare these attributes statically; they are set dynamically. + FlexKVConnectorV1ImplCls = MagicMock(return_value=impl_mock) + adapter_mod.FlexKVConnectorV1Impl = FlexKVConnectorV1ImplCls # type: ignore + + flexkv_mod.integration = integration_mod # type: ignore + integration_mod.vllm = vllm_mod # type: ignore + vllm_mod.vllm_v1_adapter = adapter_mod # type: ignore + + return flexkv_mod, adapter_mod + + +def _install_flexkv_mock(impl_mock: MagicMock): + """Insert fake flexkv modules into sys.modules and return a context that + cleans them up afterwards.""" + flexkv_mod, adapter_mod = _make_flexkv_module(impl_mock) + mods = { + "flexkv": flexkv_mod, + "flexkv.integration": flexkv_mod.integration, + "flexkv.integration.vllm": flexkv_mod.integration.vllm, + "flexkv.integration.vllm.vllm_v1_adapter": adapter_mod, + } + return patch.dict(sys.modules, mods) + + +def _build_connector(vllm_config: VllmConfig, impl_mock: MagicMock): + """Instantiate FlexKVConnectorV1 with faked flexkv modules.""" + from vllm.distributed.kv_transfer.kv_connector.v1.flexkv_connector import ( + FlexKVConnectorV1, + ) + + with _install_flexkv_mock(impl_mock): + connector = FlexKVConnectorV1( + vllm_config=vllm_config, + role=KVConnectorRole.WORKER, + kv_cache_config=_make_kv_cache_config(), + ) + return connector + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestFlexKVConnectorImportError: + """FlexKVConnectorV1 should fail with a helpful message when flexkv is + absent.""" + + def test_import_error_message(self): + from vllm.distributed.kv_transfer.kv_connector.v1.flexkv_connector import ( + FlexKVConnectorV1, + ) + + # Ensure flexkv is NOT in sys.modules + for key in list(sys.modules): + if key.startswith("flexkv"): + del sys.modules[key] + + with pytest.raises(ImportError, match="(?i)flexkv") as exc_info: + FlexKVConnectorV1( + vllm_config=_make_vllm_config(), + role=KVConnectorRole.WORKER, + kv_cache_config=_make_kv_cache_config(), + ) + + assert "https://github.com/taco-project/FlexKV" in str(exc_info.value) + + +class TestFlexKVConnectorDelegation: + """All public API methods should be forwarded to the impl.""" + + @pytest.fixture() + def connector_and_impl(self): + impl = MagicMock() + cfg = _make_vllm_config() + connector = _build_connector(cfg, impl) + return connector, impl + + def test_shutdown(self, connector_and_impl): + connector, impl = connector_and_impl + connector.shutdown() + impl.shutdown.assert_called_once() + + def test_start_load_kv(self, connector_and_impl): + connector, impl = connector_and_impl + ctx = MagicMock() + connector.start_load_kv(ctx, extra_arg="x") + impl.start_load_kv.assert_called_once_with(ctx, extra_arg="x") + + def test_save_kv_layer(self, connector_and_impl): + connector, impl = connector_and_impl + kv_layer = torch.zeros(4, 4) + attn_meta = MagicMock() + connector.save_kv_layer("layer_0", kv_layer, attn_meta) + impl.save_kv_layer.assert_called_once_with("layer_0", kv_layer, attn_meta) + + def test_wait_for_save(self, connector_and_impl): + connector, impl = connector_and_impl + connector.wait_for_save() + impl.wait_for_save.assert_called_once() + + def test_get_finished(self, connector_and_impl): + connector, impl = connector_and_impl + impl.get_finished.return_value = ({"req1"}, None) + result = connector.get_finished({"req1"}) + impl.get_finished.assert_called_once_with({"req1"}) + assert result == ({"req1"}, None) + + def test_register_kv_caches(self, connector_and_impl): + connector, impl = connector_and_impl + kv_caches = {"layer_0": torch.zeros(1)} + connector.register_kv_caches(kv_caches) + impl.register_kv_caches.assert_called_once_with(kv_caches) + + def test_get_num_new_matched_tokens(self, connector_and_impl): + connector, impl = connector_and_impl + req = MagicMock() + impl.get_num_new_matched_tokens.return_value = (10, False) + result = connector.get_num_new_matched_tokens(req, 5) + impl.get_num_new_matched_tokens.assert_called_once_with(req, 5) + assert result == (10, False) + + def test_update_state_after_alloc(self, connector_and_impl): + connector, impl = connector_and_impl + req = MagicMock() + blocks = MagicMock() + connector.update_state_after_alloc(req, blocks, 4) + impl.update_state_after_alloc.assert_called_once_with(req, blocks, 4) + + def test_build_connector_meta(self, connector_and_impl): + connector, impl = connector_and_impl + sched_out = MagicMock() + connector.build_connector_meta(sched_out) + impl.build_connector_meta.assert_called_once_with(sched_out) + + def test_update_connector_output(self, connector_and_impl): + connector, impl = connector_and_impl + out = MagicMock() + connector.update_connector_output(out) + impl.update_connector_output.assert_called_once_with(out) + + def test_request_finished(self, connector_and_impl): + connector, impl = connector_and_impl + req = MagicMock() + impl.request_finished.return_value = (True, {"key": "val"}) + result = connector.request_finished(req, [1, 2, 3]) + impl.request_finished.assert_called_once_with(req, [1, 2, 3]) + assert result == (True, {"key": "val"}) + + def test_take_events(self, connector_and_impl): + connector, impl = connector_and_impl + impl.take_events.return_value = iter([]) + list(connector.take_events()) + impl.take_events.assert_called_once() + + def test_get_kv_connector_stats(self, connector_and_impl): + connector, impl = connector_and_impl + impl.get_kv_connector_stats.return_value = None + result = connector.get_kv_connector_stats() + impl.get_kv_connector_stats.assert_called_once() + assert result is None + + def test_get_block_ids_with_load_errors(self, connector_and_impl): + connector, impl = connector_and_impl + impl.get_block_ids_with_load_errors.return_value = {7, 8} + result = connector.get_block_ids_with_load_errors() + assert result == {7, 8} + + def test_wait_for_layer_load(self, connector_and_impl): + connector, impl = connector_and_impl + connector.wait_for_layer_load("layer_0") + impl.wait_for_layer_load.assert_called_once_with("layer_0") diff --git a/vllm/distributed/kv_transfer/kv_connector/factory.py b/vllm/distributed/kv_transfer/kv_connector/factory.py index d5a40fc63..b677c5885 100644 --- a/vllm/distributed/kv_transfer/kv_connector/factory.py +++ b/vllm/distributed/kv_transfer/kv_connector/factory.py @@ -207,3 +207,9 @@ KVConnectorFactory.register_connector( "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.mooncake_connector", "MooncakeConnector", ) + +KVConnectorFactory.register_connector( + "FlexKVConnectorV1", + "vllm.distributed.kv_transfer.kv_connector.v1.flexkv_connector", + "FlexKVConnectorV1", +) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/flexkv_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/flexkv_connector.py new file mode 100644 index 000000000..556cba963 --- /dev/null +++ b/vllm/distributed/kv_transfer/kv_connector/v1/flexkv_connector.py @@ -0,0 +1,260 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +from collections.abc import Iterable +from typing import TYPE_CHECKING, Any + +import torch + +from vllm.config import VllmConfig +from vllm.distributed.kv_transfer.kv_connector.v1.base import ( + KVConnectorBase_V1, + KVConnectorMetadata, + KVConnectorRole, +) +from vllm.distributed.kv_transfer.kv_connector.v1.metrics import KVConnectorStats +from vllm.logger import init_logger +from vllm.v1.core.sched.output import SchedulerOutput +from vllm.v1.outputs import KVConnectorOutput + +if TYPE_CHECKING: + from vllm.distributed.kv_events import KVCacheEvent + from vllm.forward_context import ForwardContext + from vllm.v1.attention.backend import AttentionMetadata + from vllm.v1.core.kv_cache_manager import KVCacheBlocks + from vllm.v1.kv_cache_interface import KVCacheConfig + from vllm.v1.request import Request + +logger = init_logger(__name__) + + +# FlexKV is a distributed KV Store and multi-level cache management system for +# ultra-large-scale LLM inference. +# GitHub: https://github.com/taco-project/FlexKV +# Install: git clone git@github.com:taco-project/FlexKV.git \ +# && cd FlexKV && bash build.sh +class FlexKVConnectorV1(KVConnectorBase_V1): + """KV Connector that offloads KV cache to FlexKV. + + FlexKV is a distributed KV Store and multi-level cache management system + designed for ultra-large-scale LLM inference. It supports offloading KV + cache to CPU memory, SSD, and remote storage. + + Installation: + See https://github.com/taco-project/FlexKV for installation instructions. + Quick start:: + + git clone git@github.com:taco-project/FlexKV.git + cd FlexKV && bash build.sh + + Configuration: + Pass ``kv_connector="FlexKVConnectorV1"`` via ``--kv-transfer-config``:: + + --kv-transfer-config \ + '{"kv_connector":"FlexKVConnectorV1","kv_role":"kv_both"}' + """ + + def __init__( + self, + vllm_config: "VllmConfig", + role: KVConnectorRole, + kv_cache_config: "KVCacheConfig", + ): + super().__init__( + vllm_config=vllm_config, role=role, kv_cache_config=kv_cache_config + ) + try: + from flexkv.integration.vllm.vllm_v1_adapter import FlexKVConnectorV1Impl + except ImportError as e: + raise ImportError( + "FlexKV is not installed. Please install it to use " + "FlexKVConnectorV1. See https://github.com/taco-project/FlexKV " + "for installation instructions." + ) from e + + self._flexkv_connector = FlexKVConnectorV1Impl(vllm_config, role) + + def shutdown(self): + self._flexkv_connector.shutdown() + + # ============================== + # Worker-side methods + # ============================== + def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None: + """No-op for FlexKV (currently). + + FlexKV manages all KV transfers on the **scheduler side** via + ``build_connector_meta`` (which calls ``launch_tasks``) and + ``update_connector_output`` (which polls ``query_finished_task``). + KV blocks are transferred directly between the FlexKV server and + vLLM's GPU memory without worker-side intervention during the + forward pass — similar to how NIXL operates. + + These worker-side hooks are kept (rather than omitted) to satisfy + the ``KVConnectorBase_V1`` interface contract and to serve as + extension points for a future worker-side layer-pipelining path. + + Args: + forward_context (ForwardContext): the forward context. + **kwargs (Any): additional arguments (unused). + """ + self._flexkv_connector.start_load_kv(forward_context, **kwargs) + + def wait_for_layer_load(self, layer_name: str) -> None: + """No-op for FlexKV (currently). + + FlexKV manages all KV transfers on the scheduler side. + This hook is retained for ``KVConnectorBase_V1`` API compatibility. + + Args: + layer_name: the name of the layer (unused). + """ + self._flexkv_connector.wait_for_layer_load(layer_name) + + def save_kv_layer( + self, + layer_name: str, + kv_layer: torch.Tensor, + attn_metadata: "AttentionMetadata", + **kwargs, + ) -> None: + """No-op for FlexKV (currently). + + FlexKV offloads KV cache asynchronously from the scheduler side + after a request finishes (see ``request_finished``). It does not + intercept individual layer tensors during the forward pass. + + This hook is retained to satisfy ``KVConnectorBase_V1`` and as an + extension point for future per-layer async offload support. + + Args: + layer_name (str): the name of the layer (unused). + kv_layer (torch.Tensor): the paged KV buffer (unused). + attn_metadata (AttentionMetadata): the attention metadata (unused). + **kwargs (Any): additional arguments (unused). + """ + self._flexkv_connector.save_kv_layer( + layer_name, kv_layer, attn_metadata, **kwargs + ) + + def wait_for_save(self): + """No-op for FlexKV (currently). + + KV offload tasks are tracked asynchronously by the scheduler + connector via ``request_finished`` / ``query_finished_task``. + There is no pending worker-side save to wait for at + forward-context exit. + + Retained to satisfy ``KVConnectorBase_V1`` and as an extension + point for future worker-side save-completion signalling. + """ + self._flexkv_connector.wait_for_save() + + def get_finished( + self, finished_req_ids: set[str] + ) -> tuple[set[str] | None, set[str] | None]: + """Notify worker-side connector of requests that have finished + generating tokens. + + Returns: + Tuple of (sending/saving ids, recving/loading ids) for requests + that have finished asynchronous transfer. The finished saves/sends + req ids must belong to a set provided in a call to this method + (this call or a prior one). + """ + return self._flexkv_connector.get_finished(finished_req_ids) + + def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]): + """Initialize with the KV caches. Useful for pre-registering the + KV caches in the KVConnector (e.g. for NIXL). + + Args: + kv_caches: dictionary of layer names to kv cache tensors. + """ + self._flexkv_connector.register_kv_caches(kv_caches) + + # ============================== + # Scheduler-side methods + # ============================== + def get_num_new_matched_tokens( + self, + request: "Request", + num_computed_tokens: int, + ) -> tuple[int, bool]: + """Get the number of new tokens that can be loaded from the + external KV cache beyond ``num_computed_tokens``. + + Args: + request (Request): the request object. + num_computed_tokens (int): the number of locally computed + tokens for this request. + + Returns: + Tuple of (num_external_tokens, is_ready) where + num_external_tokens is the number of additional tokens that + can be loaded from the external KV cache. + """ + return self._flexkv_connector.get_num_new_matched_tokens( + request, num_computed_tokens + ) + + def update_state_after_alloc( + self, request: "Request", blocks: "KVCacheBlocks", num_external_tokens: int + ): + """Update KVConnector state after block allocation.""" + self._flexkv_connector.update_state_after_alloc( + request, blocks, num_external_tokens + ) + + def build_connector_meta( + self, scheduler_output: SchedulerOutput + ) -> KVConnectorMetadata: + """Build the connector metadata for this step. + + This function should NOT modify fields in the scheduler_output. + Also, calling this function will reset the state of the connector. + + Args: + scheduler_output (SchedulerOutput): the scheduler output object. + """ + return self._flexkv_connector.build_connector_meta(scheduler_output) + + def update_connector_output(self, connector_output: KVConnectorOutput): + """Update KVConnector state from worker-side connectors output. + + Args: + connector_output (KVConnectorOutput): the worker-side + connectors output. + """ + self._flexkv_connector.update_connector_output(connector_output) + + def request_finished( + self, + request: "Request", + block_ids: list[int], + ) -> tuple[bool, dict[str, Any] | None]: + """Called when a request has finished, before its blocks are freed. + + Returns: + Tuple of (async_save, kv_transfer_params) where async_save is + True if the request is being saved/sent asynchronously and blocks + should not be freed until the request_id is returned from + :meth:`get_finished`. kv_transfer_params is an optional dict of + KVTransferParams to be included in the request outputs. + """ + return self._flexkv_connector.request_finished(request, block_ids) + + def take_events(self) -> Iterable["KVCacheEvent"]: + """Collect buffered KV cache events. + + Returns: + New KV cache events since the last call. + """ + return self._flexkv_connector.take_events() + + def get_kv_connector_stats(self) -> KVConnectorStats | None: + """Get the KV connector stats collected during the last interval.""" + return self._flexkv_connector.get_kv_connector_stats() + + def get_block_ids_with_load_errors(self) -> set[int]: + """Get the block ids that have failed to load.""" + return self._flexkv_connector.get_block_ids_with_load_errors()