diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py index 2afdac38c..5f14c733a 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py @@ -101,7 +101,11 @@ def extract_world_size_and_kv_rank( def create_scheduler_adapter( - server_url: str, zmq_context: zmq.Context, vllm_config: VllmConfig + server_url: str, + zmq_context: zmq.Context, + vllm_config: VllmConfig, + mq_timeout: float, + heartbeat_interval: float, ) -> LMCacheMPSchedulerAdapter: world_size, kv_rank = extract_world_size_and_kv_rank( vllm_config.parallel_config.world_size, @@ -123,12 +127,18 @@ def create_scheduler_adapter( world_size, kv_rank, vllm_config.cache_config.block_size, + mq_timeout=mq_timeout, + heartbeat_interval=heartbeat_interval, **kwargs, ) def create_worker_adapter( - server_url: str, zmq_context: zmq.Context, vllm_config: VllmConfig + server_url: str, + zmq_context: zmq.Context, + vllm_config: VllmConfig, + mq_timeout: float, + heartbeat_interval: float, ) -> LMCacheMPWorkerAdapter: world_size, kv_rank = extract_world_size_and_kv_rank( vllm_config.parallel_config.world_size, @@ -142,6 +152,8 @@ def create_worker_adapter( world_size, kv_rank, vllm_config.cache_config.block_size, + mq_timeout=mq_timeout, + heartbeat_interval=heartbeat_interval, ) @@ -413,6 +425,9 @@ class LMCacheMPConnector(KVConnectorBase_V1): Extra configs (kv_transfer_config.extra_config): - lmcache.mp.host: the host of the LMCache server. - lmcache.mp.port: the port of the LMCache server. + - lmcache.mp.mq_timeout: timeout (seconds) for message queue requests. + - lmcache.mp.heartbeat_interval: interval (seconds) between server + heartbeat pings. """ def __init__( @@ -430,17 +445,35 @@ class LMCacheMPConnector(KVConnectorBase_V1): server_port = vllm_config.kv_transfer_config.get_from_extra_config( "lmcache.mp.port", 5555 ) + mq_timeout = float( + vllm_config.kv_transfer_config.get_from_extra_config( + "lmcache.mp.mq_timeout", 300.0 + ) + ) + heartbeat_interval = float( + vllm_config.kv_transfer_config.get_from_extra_config( + "lmcache.mp.heartbeat_interval", 10.0 + ) + ) server_url = f"{server_host}:{server_port}" zmq_context = zmq.Context.instance() if self.role == KVConnectorRole.SCHEDULER: self.scheduler_adapter = create_scheduler_adapter( - server_url, zmq_context, vllm_config + server_url, + zmq_context, + vllm_config, + mq_timeout, + heartbeat_interval, ) self.request_trackers: dict[str, LMCacheMPRequestTracker] = {} elif self.role == KVConnectorRole.WORKER: self.worker_adapter = create_worker_adapter( - server_url, zmq_context, vllm_config + server_url, + zmq_context, + vllm_config, + mq_timeout, + heartbeat_interval, ) else: raise ValueError(f"Unknown KVConnectorRole: {self.role}") @@ -616,8 +649,7 @@ class LMCacheMPConnector(KVConnectorBase_V1): - Sync loading: failed blocks should be reported in the forward pass in which they are detected. """ - # TODO: add error tracking - return set() + return self.worker_adapter.get_block_ids_with_load_errors() def shutdown(self): """