diff --git a/tests/v1/engine/test_engine_core_client.py b/tests/v1/engine/test_engine_core_client.py index d5a904d18..ce0d70cc9 100644 --- a/tests/v1/engine/test_engine_core_client.py +++ b/tests/v1/engine/test_engine_core_client.py @@ -143,6 +143,7 @@ def test_mp_client_uses_env_timeout(monkeypatch: pytest.MonkeyPatch): data_parallel_rank_local=None, data_parallel_hybrid_lb=False, data_parallel_external_lb=False, + local_engines_only=False, ) vllm_config = SimpleNamespace(parallel_config=parallel_config) diff --git a/vllm/config/parallel.py b/vllm/config/parallel.py index 363078aef..26b913b47 100644 --- a/vllm/config/parallel.py +++ b/vllm/config/parallel.py @@ -361,6 +361,14 @@ class ParallelConfig: def num_ubatches(self) -> int: return 2 if self.enable_dbo else self.ubatch_size + @property + def local_engines_only(self) -> bool: + """ + Client manages local+remote EngineCores in pure internal LB case. + Client manages local EngineCores in hybrid and external LB case. + """ + return self.data_parallel_external_lb or self.data_parallel_hybrid_lb + def get_next_dp_init_port(self) -> int: """ We might need to initialize process groups in multiple diff --git a/vllm/entrypoints/cli/serve.py b/vllm/entrypoints/cli/serve.py index 77c7253ae..f06a93913 100644 --- a/vllm/entrypoints/cli/serve.py +++ b/vllm/entrypoints/cli/serve.py @@ -190,9 +190,7 @@ def run_multi_api_server(args: argparse.Namespace): parallel_config = vllm_config.parallel_config dp_rank = parallel_config.data_parallel_rank - external_dp_lb = parallel_config.data_parallel_external_lb - hybrid_dp_lb = parallel_config.data_parallel_hybrid_lb - assert external_dp_lb or hybrid_dp_lb or dp_rank == 0 + assert parallel_config.local_engines_only or dp_rank == 0 api_server_manager: APIServerProcessManager | None = None @@ -218,7 +216,7 @@ def run_multi_api_server(args: argparse.Namespace): # (after the launcher context manager exits), # since we get the front-end stats update address from the coordinator # via the handshake with the local engine. - if dp_rank == 0 or not (external_dp_lb or hybrid_dp_lb): + if dp_rank == 0 or not parallel_config.local_engines_only: # Start API servers using the manager. api_server_manager = APIServerProcessManager(**api_server_manager_kwargs) diff --git a/vllm/v1/engine/coordinator.py b/vllm/v1/engine/coordinator.py index c2a9fe7c0..672d536a5 100644 --- a/vllm/v1/engine/coordinator.py +++ b/vllm/v1/engine/coordinator.py @@ -62,12 +62,10 @@ class DPCoordinator: assert dp_size > 1, "Coordinator only used for data parallel" host = parallel_config.data_parallel_master_ip - external_lb = parallel_config.data_parallel_external_lb - hybrid_lb = parallel_config.data_parallel_hybrid_lb # Assume coordinator is colocated with front-end procs when not in # either external or hybrid DP LB mode. - local_only = not (external_lb or hybrid_lb) + local_only = not parallel_config.local_engines_only front_publish_address = get_engine_client_zmq_addr( local_only=local_only, host=host ) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 905d8df4d..c9a1d53c8 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -507,12 +507,7 @@ class MPClient(EngineCoreClient): offline_mode = parallel_config.data_parallel_rank_local is not None # Client manages local+remote EngineCores in pure internal LB case. # Client manages local EngineCores in hybrid and external LB case. - local_engines_only = ( - parallel_config.data_parallel_hybrid_lb - or parallel_config.data_parallel_external_lb - ) - - num_ranks = dp_local_size if local_engines_only else dp_size + num_ranks = dp_local_size if parallel_config.local_engines_only else dp_size self.engine_ranks_managed = ( [dp_rank] if offline_mode else list(range(dp_rank, dp_rank + num_ranks)) ) diff --git a/vllm/v1/engine/input_processor.py b/vllm/v1/engine/input_processor.py index a6ab9164c..6404cb67e 100644 --- a/vllm/v1/engine/input_processor.py +++ b/vllm/v1/engine/input_processor.py @@ -458,13 +458,14 @@ class InputProcessor: self._validate_lora(lora_request) self._validate_params(params) - data_parallel_size = self.vllm_config.parallel_config.data_parallel_size - if data_parallel_rank is not None and not ( - 0 <= data_parallel_rank < data_parallel_size - ): + parallel_config = self.vllm_config.parallel_config + dp_size = parallel_config.data_parallel_size + dp_local_size = parallel_config.data_parallel_size_local + num_ranks = dp_local_size if parallel_config.local_engines_only else dp_size + if data_parallel_rank is not None and not (0 <= data_parallel_rank < num_ranks): raise ValueError( f"data_parallel_rank {data_parallel_rank} " - f"is out of range [0, {data_parallel_size})." + f"is out of range [0, {num_ranks})." ) if arrival_time is None: diff --git a/vllm/v1/engine/utils.py b/vllm/v1/engine/utils.py index 66212ed7c..5db3a5326 100644 --- a/vllm/v1/engine/utils.py +++ b/vllm/v1/engine/utils.py @@ -787,10 +787,7 @@ def launch_core_engines( local_start_index = parallel_config.data_parallel_rank_local dp_rank = parallel_config.data_parallel_rank host = parallel_config.data_parallel_master_ip - local_engines_only = ( - parallel_config.data_parallel_hybrid_lb - or parallel_config.data_parallel_external_lb - ) + local_engines_only = parallel_config.local_engines_only # In offline mode there is an LLM instance per DP rank and # one core engine per LLM, see