[BugFix] Set CUDA_VISIBLE_DEVICES before spawning the subprocesses (#21211)

Signed-off-by: Yinghai Lu <yinghai@thinkingmachines.ai>
Signed-off-by: Nick Hill <nhill@redhat.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Co-authored-by: Nick Hill <nhill@redhat.com>
Co-authored-by: Rui Qiao <ruisearch42@gmail.com>
This commit is contained in:
Yinghai Lu
2025-07-23 21:44:04 -07:00
committed by GitHub
parent dc2f159f8a
commit 11ef7a611e
2 changed files with 69 additions and 26 deletions

View File

@@ -10,12 +10,14 @@ from enum import Enum, auto
from multiprocessing import Process, connection
from multiprocessing.process import BaseProcess
from typing import TYPE_CHECKING, Callable, Optional, Union
from unittest.mock import patch
import msgspec
import zmq
from vllm.config import CacheConfig, ParallelConfig, VllmConfig
from vllm.logger import init_logger
from vllm.platforms import current_platform
from vllm.ray.ray_env import get_env_vars_to_copy
from vllm.utils import get_mp_context, get_open_zmq_ipc_path, zmq_socket_ctx
from vllm.v1.engine.coordinator import DPCoordinator
@@ -105,10 +107,13 @@ class CoreEngineProcManager:
"client_handshake_address"] = client_handshake_address
self.processes: list[BaseProcess] = []
local_dp_ranks = []
for index in range(local_engine_count):
local_index = local_start_index + index
global_index = start_index + index
# Start EngineCore in background process.
local_dp_ranks.append(local_index)
self.processes.append(
context.Process(target=target_fn,
name=f"EngineCore_{global_index}",
@@ -118,9 +123,14 @@ class CoreEngineProcManager:
}))
self._finalizer = weakref.finalize(self, shutdown, self.processes)
data_parallel = vllm_config.parallel_config.data_parallel_size > 1
try:
for proc in self.processes:
proc.start()
for proc, local_dp_rank in zip(self.processes, local_dp_ranks):
with set_device_control_env_var(
vllm_config, local_dp_rank) if (
data_parallel) else contextlib.nullcontext():
proc.start()
finally:
# Kill other procs if not all are running.
if self.finished_procs():
@@ -145,6 +155,30 @@ class CoreEngineProcManager:
}
@contextlib.contextmanager
def set_device_control_env_var(vllm_config: VllmConfig,
local_dp_rank: int) -> Iterator[None]:
"""
Temporarily set CUDA_VISIBLE_DEVICES or equivalent
for engine subprocess.
"""
world_size = vllm_config.parallel_config.world_size
evar = current_platform.device_control_env_var
try:
value = ",".join(
str(current_platform.device_id_to_physical_device_id(i))
for i in range(local_dp_rank * world_size, (local_dp_rank + 1) *
world_size))
except IndexError as e:
raise Exception(f"Error setting {evar}: "
f"local range: [{local_dp_rank * world_size}, "
f"{(local_dp_rank + 1) * world_size}) "
"base value: "
f"\"{os.getenv(evar)}\"") from e
with patch.dict(os.environ, values=((evar, value), )):
yield
class CoreEngineActorManager:
"""
Utility class to handle creation, readiness, and shutdown
@@ -215,10 +249,9 @@ class CoreEngineActorManager:
self.placement_group_is_local = []
refs = []
for index in range(dp_size):
local_index = local_dp_ranks[index]
for index, local_index, pg in zip(range(dp_size), local_dp_ranks,
placement_groups):
dp_vllm_config = copy.deepcopy(vllm_config)
pg = placement_groups[index]
dp_vllm_config.parallel_config.placement_group = pg
local_client = index < local_engine_count
actor = ray.remote(DPEngineCoreActor).options(
@@ -264,7 +297,6 @@ class CoreEngineActorManager:
local_engine_count = \
vllm_config.parallel_config.data_parallel_size_local
nodes = list_nodes()
nodes = sorted(list_nodes(),
key=lambda node: node.node_ip != dp_master_ip)
assert nodes[0].node_ip == dp_master_ip, (