[Refactor] Remove dead code in KV connector (#36424)
Signed-off-by: yewentao256 <zhyanwentao@126.com>
This commit is contained in:
@@ -50,7 +50,6 @@ from vllm.distributed.kv_transfer.kv_connector.v1.metrics import (
|
|||||||
from vllm.distributed.parallel_state import (
|
from vllm.distributed.parallel_state import (
|
||||||
get_tensor_model_parallel_rank,
|
get_tensor_model_parallel_rank,
|
||||||
get_tensor_model_parallel_world_size,
|
get_tensor_model_parallel_world_size,
|
||||||
get_tp_group,
|
|
||||||
)
|
)
|
||||||
from vllm.forward_context import ForwardContext
|
from vllm.forward_context import ForwardContext
|
||||||
from vllm.logger import init_logger
|
from vllm.logger import init_logger
|
||||||
@@ -564,7 +563,6 @@ class NixlConnectorScheduler:
|
|||||||
|
|
||||||
# Background thread for handling new handshake requests.
|
# Background thread for handling new handshake requests.
|
||||||
self._nixl_handshake_listener_t: threading.Thread | None = None
|
self._nixl_handshake_listener_t: threading.Thread | None = None
|
||||||
self._encoded_xfer_handshake_metadata: dict[int, Any] = {}
|
|
||||||
self._stop_event = threading.Event()
|
self._stop_event = threading.Event()
|
||||||
|
|
||||||
# Requests that need to start recv/send.
|
# Requests that need to start recv/send.
|
||||||
@@ -650,7 +648,6 @@ class NixlConnectorScheduler:
|
|||||||
tp_rank,
|
tp_rank,
|
||||||
str(len(encoded_data[tp_rank])),
|
str(len(encoded_data[tp_rank])),
|
||||||
)
|
)
|
||||||
self._encoded_xfer_handshake_metadata = encoded_data
|
|
||||||
|
|
||||||
# Only start the listener when we have metadata to serve.
|
# Only start the listener when we have metadata to serve.
|
||||||
if self._nixl_handshake_listener_t is None:
|
if self._nixl_handshake_listener_t is None:
|
||||||
@@ -995,7 +992,7 @@ class NixlConnectorWorker:
|
|||||||
self.engine_id: EngineId = engine_id
|
self.engine_id: EngineId = engine_id
|
||||||
self.tp_rank = get_tensor_model_parallel_rank()
|
self.tp_rank = get_tensor_model_parallel_rank()
|
||||||
self.world_size = get_tensor_model_parallel_world_size()
|
self.world_size = get_tensor_model_parallel_world_size()
|
||||||
self.tp_group = get_tp_group()
|
|
||||||
self.num_blocks = kv_cache_config.num_blocks
|
self.num_blocks = kv_cache_config.num_blocks
|
||||||
self.enable_permute_local_kv = False
|
self.enable_permute_local_kv = False
|
||||||
|
|
||||||
@@ -1064,7 +1061,6 @@ class NixlConnectorWorker:
|
|||||||
# Number of NIXL regions. Currently one region per cache
|
# Number of NIXL regions. Currently one region per cache
|
||||||
# (so 1 per layer for MLA, otherwise 2 per layer)
|
# (so 1 per layer for MLA, otherwise 2 per layer)
|
||||||
self.num_regions = 0
|
self.num_regions = 0
|
||||||
self.num_layers = 0
|
|
||||||
|
|
||||||
# nixl_prepped_dlist_handle.
|
# nixl_prepped_dlist_handle.
|
||||||
self.src_xfer_handles_by_block_size: dict[int, int] = {}
|
self.src_xfer_handles_by_block_size: dict[int, int] = {}
|
||||||
@@ -1108,7 +1104,6 @@ class NixlConnectorWorker:
|
|||||||
|
|
||||||
self.block_size = vllm_config.cache_config.block_size
|
self.block_size = vllm_config.cache_config.block_size
|
||||||
self.model_config = vllm_config.model_config
|
self.model_config = vllm_config.model_config
|
||||||
self.cache_config = vllm_config.cache_config
|
|
||||||
|
|
||||||
self.use_mla = self.model_config.use_mla
|
self.use_mla = self.model_config.use_mla
|
||||||
|
|
||||||
@@ -1540,7 +1535,6 @@ class NixlConnectorWorker:
|
|||||||
|
|
||||||
self.kv_caches_base_addr[self.engine_id][self.tp_rank] = seen_base_addresses
|
self.kv_caches_base_addr[self.engine_id][self.tp_rank] = seen_base_addresses
|
||||||
self.num_regions = len(caches_data)
|
self.num_regions = len(caches_data)
|
||||||
self.num_layers = len(xfer_buffers.keys())
|
|
||||||
|
|
||||||
descs = self.nixl_wrapper.get_reg_descs(caches_data, self.nixl_memory_type)
|
descs = self.nixl_wrapper.get_reg_descs(caches_data, self.nixl_memory_type)
|
||||||
logger.debug("Registering descs: %s", caches_data)
|
logger.debug("Registering descs: %s", caches_data)
|
||||||
|
|||||||
@@ -184,13 +184,11 @@ class Scheduler(SchedulerInterface):
|
|||||||
|
|
||||||
# Encoder-related.
|
# Encoder-related.
|
||||||
# Calculate encoder cache size if applicable
|
# Calculate encoder cache size if applicable
|
||||||
self.supports_mm_inputs = mm_registry.supports_multimodal_inputs(
|
supports_mm_inputs = mm_registry.supports_multimodal_inputs(
|
||||||
vllm_config.model_config
|
vllm_config.model_config
|
||||||
)
|
)
|
||||||
self.mm_budget = mm_budget = (
|
mm_budget = (
|
||||||
MultiModalBudget(vllm_config, mm_registry)
|
MultiModalBudget(vllm_config, mm_registry) if supports_mm_inputs else None
|
||||||
if self.supports_mm_inputs
|
|
||||||
else None
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# NOTE: Text-only encoder-decoder models are implemented as
|
# NOTE: Text-only encoder-decoder models are implemented as
|
||||||
|
|||||||
@@ -148,7 +148,7 @@ class EngineCore:
|
|||||||
if self.scheduler.connector is not None: # type: ignore
|
if self.scheduler.connector is not None: # type: ignore
|
||||||
self.model_executor.init_kv_output_aggregator(self.scheduler.connector) # type: ignore
|
self.model_executor.init_kv_output_aggregator(self.scheduler.connector) # type: ignore
|
||||||
|
|
||||||
self.mm_registry = mm_registry = MULTIMODAL_REGISTRY
|
mm_registry = MULTIMODAL_REGISTRY
|
||||||
self.mm_receiver_cache = mm_registry.engine_receiver_cache_from_config(
|
self.mm_receiver_cache = mm_registry.engine_receiver_cache_from_config(
|
||||||
vllm_config
|
vllm_config
|
||||||
)
|
)
|
||||||
@@ -800,8 +800,6 @@ class EngineCoreProc(EngineCore):
|
|||||||
vllm_config,
|
vllm_config,
|
||||||
client_handshake_address,
|
client_handshake_address,
|
||||||
) as addresses:
|
) as addresses:
|
||||||
self.client_count = len(addresses.outputs)
|
|
||||||
|
|
||||||
# Set up data parallel environment.
|
# Set up data parallel environment.
|
||||||
self.has_coordinator = addresses.coordinator_output is not None
|
self.has_coordinator = addresses.coordinator_output is not None
|
||||||
self.frontend_stats_publish_address = (
|
self.frontend_stats_publish_address = (
|
||||||
|
|||||||
Reference in New Issue
Block a user