diff --git a/.buildkite/test_areas/distributed.yaml b/.buildkite/test_areas/distributed.yaml index cfa9b848e..92648c2f2 100644 --- a/.buildkite/test_areas/distributed.yaml +++ b/.buildkite/test_areas/distributed.yaml @@ -224,6 +224,20 @@ steps: commands: - ./.buildkite/scripts/run-multi-node-test.sh /vllm-workspace/tests 2 2 $IMAGE_TAG "VLLM_TEST_SAME_HOST=0 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_same_node.py | grep 'Same node test passed' && NUM_NODES=2 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_node_count.py | grep 'Node count test passed' && python3 ../examples/offline_inference/data_parallel.py -dp=2 -tp=1 --dp-num-nodes=2 --dp-node-rank=0 --dp-master-addr=192.168.10.10 --dp-master-port=12345 --enforce-eager --trust-remote-code && VLLM_MULTI_NODE=1 pytest -v -s distributed/test_multi_node_assignment.py && VLLM_MULTI_NODE=1 pytest -v -s distributed/test_pipeline_parallel.py" "VLLM_TEST_SAME_HOST=0 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_same_node.py | grep 'Same node test passed' && NUM_NODES=2 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_node_count.py | grep 'Node count test passed' && python3 ../examples/offline_inference/data_parallel.py -dp=2 -tp=1 --dp-num-nodes=2 --dp-node-rank=1 --dp-master-addr=192.168.10.10 --dp-master-port=12345 --enforce-eager --trust-remote-code" +- label: MessageQueue TCP Multi-Node (2 GPUs) + timeout_in_minutes: 10 + working_dir: "/vllm-workspace/tests" + num_devices: 1 + num_nodes: 2 + no_plugin: true + optional: true + source_file_dependencies: + - vllm/distributed/device_communicators/shm_broadcast.py + - vllm/distributed/parallel_state.py + - tests/distributed/test_mq_tcp_multinode.py + commands: + - ./.buildkite/scripts/run-multi-node-test.sh /vllm-workspace/tests 2 1 $IMAGE_TAG "torchrun --nnodes 2 --nproc-per-node=1 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_mq_tcp_multinode.py" "torchrun --nnodes 2 --nproc-per-node=1 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_mq_tcp_multinode.py" + - label: Distributed NixlConnector PD accuracy (4 GPUs) timeout_in_minutes: 30 working_dir: "/vllm-workspace/tests" @@ -294,3 +308,23 @@ steps: commands: - pytest -v -s distributed/test_pp_cudagraph.py - pytest -v -s distributed/test_pipeline_parallel.py + +- label: RayExecutorV2 (4 GPUs) + timeout_in_minutes: 60 + working_dir: "/vllm-workspace/tests" + num_devices: 4 + source_file_dependencies: + - vllm/v1/executor/ray_executor_v2.py + - vllm/v1/executor/abstract.py + - vllm/v1/executor/multiproc_executor.py + - tests/distributed/test_ray_v2_executor.py + - tests/distributed/test_ray_v2_executor_e2e.py + - tests/distributed/test_pipeline_parallel.py + - tests/basic_correctness/test_basic_correctness.py + commands: + - export VLLM_USE_RAY_V2_EXECUTOR_BACKEND=1 + - export NCCL_CUMEM_HOST_ENABLE=0 + - pytest -v -s distributed/test_ray_v2_executor.py + - pytest -v -s distributed/test_ray_v2_executor_e2e.py + - pytest -v -s distributed/test_pipeline_parallel.py -k "ray" + - TARGET_TEST_SUITE=L4 pytest -v -s basic_correctness/test_basic_correctness.py -k "ray" diff --git a/tests/distributed/conftest.py b/tests/distributed/conftest.py index 9c146a332..da661c5e1 100644 --- a/tests/distributed/conftest.py +++ b/tests/distributed/conftest.py @@ -1,5 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import os import random import msgspec @@ -166,3 +167,31 @@ class MockSubscriber: self.sub.close() for replay in self.replay_sockets: replay.close() + + +@pytest.fixture +def enable_ray_v2_backend(): + """Set env vars for the Ray V2 executor backend and shut down Ray + between tests.""" + import ray + + saved = { + "VLLM_USE_RAY_V2_EXECUTOR_BACKEND": os.environ.get( + "VLLM_USE_RAY_V2_EXECUTOR_BACKEND" + ), + "VLLM_ENABLE_V1_MULTIPROCESSING": os.environ.get( + "VLLM_ENABLE_V1_MULTIPROCESSING" + ), + } + os.environ["VLLM_USE_RAY_V2_EXECUTOR_BACKEND"] = "1" + os.environ["VLLM_ENABLE_V1_MULTIPROCESSING"] = "0" + if ray.is_initialized(): + ray.shutdown() + try: + yield + finally: + if ray.is_initialized(): + ray.shutdown() + os.environ.update({k: v for k, v in saved.items() if v is not None}) + for key in (k for k, v in saved.items() if v is None): + os.environ.pop(key, None) diff --git a/tests/distributed/test_mq_tcp_multinode.py b/tests/distributed/test_mq_tcp_multinode.py new file mode 100644 index 000000000..135ef11d7 --- /dev/null +++ b/tests/distributed/test_mq_tcp_multinode.py @@ -0,0 +1,119 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +Multi-node integration test for MessageQueue TCP fallback. + +Verifies that when writer and readers span separate nodes (Docker containers +with isolated /dev/shm), `create_from_process_group` correctly detects +cross-node ranks via `in_the_same_node_as()` and falls back to ZMQ TCP +transport — and that data actually arrives. +""" + +import numpy as np +import torch.distributed as dist + +from vllm.distributed.device_communicators.shm_broadcast import MessageQueue +from vllm.distributed.parallel_state import in_the_same_node_as + + +def main(): + dist.init_process_group(backend="gloo") + + rank = dist.get_rank() + world_size = dist.get_world_size() + assert world_size >= 2, ( + f"Need at least 2 ranks across nodes, got world_size={world_size}" + ) + + # Verify that in_the_same_node_as detects cross-node correctly + status = in_the_same_node_as(dist.group.WORLD, source_rank=0) + local_count = sum(status) + print( + f"[Rank {rank}] in_the_same_node_as(source=0): {status} " + f"(local={local_count}/{world_size})" + ) + # With 2 Docker containers (1 proc each), rank 0 and rank 1 + # should be on different nodes. + assert local_count < world_size, ( + f"Expected cross-node ranks but all {world_size} ranks appear local." + ) + + # Create MessageQueue + writer_rank = 0 + mq = MessageQueue.create_from_process_group( + dist.group.WORLD, + max_chunk_bytes=1024 * 1024, # 1 MiB + max_chunks=10, + writer_rank=writer_rank, + ) + + # Verify the transport path selection + if rank == writer_rank: + print( + f"[Rank {rank}] Writer: n_local_reader={mq.n_local_reader}, " + f"n_remote_reader={mq.n_remote_reader}" + ) + assert mq.n_remote_reader > 0, ( + "Writer should have at least 1 remote (TCP) reader in a multi-node setup." + ) + else: + if status[rank]: + assert mq._is_local_reader, ( + f"Rank {rank} is on the same node as writer but is not a local reader." + ) + print(f"[Rank {rank}] Reader: local (shared memory)") + else: + assert mq._is_remote_reader, ( + f"Rank {rank} is on a different node but is not a remote (TCP) reader." + ) + print(f"[Rank {rank}] Reader: remote (TCP)") + + # Test data transfer: simple objects + dist.barrier() + if rank == writer_rank: + mq.enqueue("hello_from_node0") + else: + msg = mq.dequeue(timeout=10) + assert msg == "hello_from_node0" + dist.barrier() + print(f"[Rank {rank}] Simple object test passed") + + # Test data transfer: numpy arrays + np.random.seed(42) + arrays = [ + np.random.randint(0, 100, size=np.random.randint(100, 5000)) for _ in range(100) + ] + + dist.barrier() + if rank == writer_rank: + for arr in arrays: + mq.enqueue(arr) + else: + for i, expected in enumerate(arrays): + received = mq.dequeue(timeout=10) + assert np.array_equal(expected, received), ( + f"Array mismatch at index {i}: " + f"expected shape {expected.shape}, got shape {received.shape}" + ) + dist.barrier() + print(f"[Rank {rank}] Numpy array test passed") + + # Test data transfer: large payload (> max_chunk_bytes) + dist.barrier() + big_array = np.zeros(200_000, dtype=np.int64) # ~1.6 MiB > 1 MiB chunk + if rank == writer_rank: + mq.enqueue(big_array) + else: + received = mq.dequeue(timeout=10) + assert np.array_equal(big_array, received) + dist.barrier() + print(f"[Rank {rank}] Large payload test passed") + + # Done -- cleanup + dist.barrier() + print(f"[Rank {rank}] All MessageQueue TCP multi-node tests passed!") + dist.destroy_process_group() + + +if __name__ == "__main__": + main() diff --git a/tests/distributed/test_ray_v2_executor.py b/tests/distributed/test_ray_v2_executor.py new file mode 100644 index 000000000..5daec22df --- /dev/null +++ b/tests/distributed/test_ray_v2_executor.py @@ -0,0 +1,345 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +""" +Integration tests for RayExecutorV2 at the executor level. +Validates executor initialization, placement group support, RPC calls, +and distributed execution with various TP/PP configurations. +""" + +import gc +import threading +from unittest.mock import patch + +import pytest +import ray + +from vllm import LLM +from vllm.config import VllmConfig +from vllm.engine.arg_utils import EngineArgs +from vllm.v1.executor.ray_executor_v2 import RayExecutorV2 + +pytestmark = pytest.mark.usefixtures("enable_ray_v2_backend") + +MODEL = "facebook/opt-125m" + + +def create_vllm_config( + tensor_parallel_size: int = 1, + pipeline_parallel_size: int = 1, + max_model_len: int = 256, + gpu_memory_utilization: float = 0.3, + placement_group=None, +) -> VllmConfig: + engine_args = EngineArgs( + model=MODEL, + tensor_parallel_size=tensor_parallel_size, + pipeline_parallel_size=pipeline_parallel_size, + max_model_len=max_model_len, + gpu_memory_utilization=gpu_memory_utilization, + distributed_executor_backend="ray", + enforce_eager=True, + ) + vllm_config = engine_args.create_engine_config() + + if placement_group is not None: + vllm_config.parallel_config.placement_group = placement_group + + return vllm_config + + +def ensure_ray_initialized(): + if not ray.is_initialized(): + ray.init(ignore_reinit_error=True) + + +@pytest.fixture +def create_placement_group(request): + ensure_ray_initialized() + num_gpus = request.param + bundles = [{"GPU": 1, "CPU": 1} for _ in range(num_gpus)] + pg = ray.util.placement_group(bundles, strategy="PACK") + ray.get(pg.ready()) + yield pg + ray.util.remove_placement_group(pg) + + +@pytest.fixture +def executor(request): + """Create a RayExecutorV2 and shut it down after the test.""" + executor = RayExecutorV2(vllm_config=request.param) + yield executor + executor.shutdown() + + +def assert_executor(executor, tp_size, pp_size): + """Common assertions for executor initialization tests.""" + world_size = tp_size * pp_size + expected_output_rank = (pp_size - 1) * tp_size + + assert executor.world_size == world_size + assert len(executor.ray_worker_handles) == world_size + assert len(executor.response_mqs) == world_size + assert executor._get_output_rank() == expected_output_rank + + if pp_size > 1: + assert executor.max_concurrent_batches == pp_size + + executor.check_health() + assert not executor.is_failed + + ranks = sorted(h.rank for h in executor.ray_worker_handles) + assert ranks == list(range(world_size)) + + for handle in executor.ray_worker_handles: + assert handle.node_id is not None + + +@pytest.mark.parametrize("tp_size, pp_size", [(1, 1), (2, 1), (4, 1), (2, 2)]) +def test_ray_v2_executor(tp_size, pp_size): + """Validate RayExecutorV2 with various TP/PP configs.""" + vllm_config = create_vllm_config( + tensor_parallel_size=tp_size, + pipeline_parallel_size=pp_size, + ) + executor = RayExecutorV2(vllm_config=vllm_config) + try: + assert_executor(executor, tp_size, pp_size) + finally: + executor.shutdown() + + +@pytest.mark.parametrize( + "tp_size, pp_size, create_placement_group", + [(2, 1, 2), (4, 1, 4), (2, 2, 4)], + indirect=["create_placement_group"], +) +def test_ray_v2_executor_pg(tp_size, pp_size, create_placement_group): + """Validate RayExecutorV2 with various TP/PP configs using external PG.""" + vllm_config = create_vllm_config( + tensor_parallel_size=tp_size, + pipeline_parallel_size=pp_size, + placement_group=create_placement_group, + ) + executor = RayExecutorV2(vllm_config=vllm_config) + try: + assert_executor(executor, tp_size, pp_size) + finally: + executor.shutdown() + + +@pytest.mark.parametrize( + "executor", + [create_vllm_config(tensor_parallel_size=2)], + indirect=True, +) +def test_ray_v2_executor_failure_callback(executor): + """Validate failure callback registration.""" + callback_invoked = False + + def test_callback(): + nonlocal callback_invoked + callback_invoked = True + + executor.register_failure_callback(test_callback) + assert not callback_invoked + + executor.is_failed = True + executor.register_failure_callback(test_callback) + assert callback_invoked + + +@pytest.mark.parametrize( + "executor", + [create_vllm_config(tensor_parallel_size=2)], + indirect=True, +) +def test_ray_v2_executor_collective_rpc(executor): + """Validate collective RPC calls through MessageQueue.""" + executor.check_health() + assert not executor.is_failed + assert executor.rpc_broadcast_mq is not None + + +@pytest.mark.parametrize( + "executor", + [create_vllm_config(tensor_parallel_size=2)], + indirect=True, +) +def test_ray_v2_executor_driver_node_rank_0(executor): + """Validate that driver node workers get the lowest ranks.""" + driver_node = ray.get_runtime_context().get_node_id() + + for handle in executor.ray_worker_handles: + assert handle.node_id == driver_node + + rank0_handle = next(h for h in executor.ray_worker_handles if h.rank == 0) + assert rank0_handle.node_id == driver_node + + +@pytest.mark.parametrize( + "executor", + [create_vllm_config(tensor_parallel_size=2)], + indirect=True, +) +def test_ray_v2_executor_worker_death(executor): + """Validate executor detects worker death via ray.wait().""" + callback_event = threading.Event() + + def on_failure(): + callback_event.set() + + executor.register_failure_callback(on_failure) + assert not executor.is_failed + + # Kill one worker actor externally + victim = executor.ray_worker_handles[1].actor + ray.kill(victim, no_restart=True) + + # Monitor thread should detect the death and invoke callback + assert callback_event.wait(timeout=30) + assert executor.is_failed + assert executor.shutting_down + + +def test_ray_v2_executor_shutdown(): + """Validate graceful shutdown: ray.kill() terminates all worker actors.""" + executor = RayExecutorV2(vllm_config=create_vllm_config(tensor_parallel_size=2)) + assert executor.rpc_broadcast_mq is not None + assert len(executor.response_mqs) == executor.world_size + + actors = [h.actor for h in executor.ray_worker_handles] + executor.shutdown() + + for actor in actors: + with pytest.raises(ray.exceptions.RayActorError): + ray.get(actor.wait_for_init.remote(), timeout=5) + + assert executor.rpc_broadcast_mq is None + assert len(executor.response_mqs) == 0 + + +@pytest.mark.parametrize( + "executor", + [create_vllm_config(tensor_parallel_size=2)], + indirect=True, +) +def test_ray_v2_run_refs_stored_for_monitoring(executor): + """Validate worker handles store run_ref for monitoring.""" + for handle in executor.ray_worker_handles: + assert handle.run_ref is not None + ready, _ = ray.wait([handle.run_ref], timeout=0) + assert len(ready) == 0, "run_ref should be pending" + + +@pytest.mark.parametrize("tp_size, pp_size", [(2, 1), (2, 2)]) +def test_ray_v2_single_node_generation(tp_size, pp_size): + """End-to-end LLM generation with RayExecutorV2.""" + + llm = LLM( + model=MODEL, + tensor_parallel_size=tp_size, + pipeline_parallel_size=pp_size, + distributed_executor_backend="ray", + enforce_eager=True, + max_model_len=256, + gpu_memory_utilization=0.3, + ) + try: + prompts = [ + "Hello, my name is", + "The capital of France is", + "The future of AI is", + ] + outputs = llm.generate(prompts) + + assert len(outputs) == len(prompts) + for output in outputs: + assert len(output.outputs) > 0 + assert len(output.outputs[0].text) > 0 + finally: + llm.llm_engine.model_executor.shutdown() + del llm + gc.collect() + + +@pytest.mark.parametrize( + "bundle_indices, expected_bundle_ids, create_placement_group", + [("2,3", [2, 3], 4), ("3,2", [3, 2], 4)], + indirect=["create_placement_group"], +) +def test_ray_v2_bundle_indices_env( + bundle_indices, expected_bundle_ids, create_placement_group, monkeypatch +): + """Validate explicit VLLM_RAY_BUNDLE_INDICES bundle placement.""" + monkeypatch.setenv("VLLM_RAY_BUNDLE_INDICES", bundle_indices) + vllm_config = create_vllm_config( + tensor_parallel_size=2, + placement_group=create_placement_group, + ) + executor = RayExecutorV2(vllm_config=vllm_config) + try: + actual = [ + h.bundle_id_idx + for h in sorted(executor.ray_worker_handles, key=lambda h: h.rank) + ] + assert actual == expected_bundle_ids + assert_executor(executor, tp_size=2, pp_size=1) + finally: + executor.shutdown() + + +@pytest.mark.parametrize( + "bundle_indices, expected_error, create_placement_group", + [ + ("1,1", "cannot have duplicate values,", 4), + ("0,1,2", "must have the same size", 4), + ], + indirect=["create_placement_group"], +) +def test_ray_v2_invalid_bundle_indices( + bundle_indices, expected_error, create_placement_group, monkeypatch +): + """Validate invalid bundle indices are rejected.""" + monkeypatch.setenv("VLLM_RAY_BUNDLE_INDICES", bundle_indices) + vllm_config = create_vllm_config( + tensor_parallel_size=2, placement_group=create_placement_group + ) + with pytest.raises(AssertionError, match=expected_error): + RayExecutorV2(vllm_config=vllm_config) + + +@pytest.mark.parametrize("tp_size, pp_size", [(2, 1), (2, 2)]) +def test_ray_v2_single_node_generation_with_pg(tp_size, pp_size): + """E2E LLM generation with a user-provided placement group.""" + ensure_ray_initialized() + bundles = [{"GPU": 1, "CPU": 1} for _ in range(tp_size * pp_size)] + pg = ray.util.placement_group(bundles, strategy="PACK") + ray.get(pg.ready()) + + try: + with patch.object(ray.util, "get_current_placement_group", return_value=pg): + llm = LLM( + model=MODEL, + tensor_parallel_size=tp_size, + pipeline_parallel_size=pp_size, + distributed_executor_backend="ray", + enforce_eager=True, + max_model_len=256, + gpu_memory_utilization=0.3, + ) + prompts = [ + "Hello, my name is", + "The capital of France is", + "The future of AI is", + ] + outputs = llm.generate(prompts) + + assert len(outputs) == len(prompts) + for output in outputs: + assert len(output.outputs) > 0 + assert len(output.outputs[0].text) > 0 + finally: + llm.llm_engine.model_executor.shutdown() + del llm + gc.collect() diff --git a/tests/distributed/test_ray_v2_executor_e2e.py b/tests/distributed/test_ray_v2_executor_e2e.py new file mode 100644 index 000000000..fb5830132 --- /dev/null +++ b/tests/distributed/test_ray_v2_executor_e2e.py @@ -0,0 +1,209 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +Orchestration-level integration tests for RayExecutorV2. +""" + +import gc +import os +import pathlib + +import pytest +import ray + +pytestmark = pytest.mark.usefixtures("enable_ray_v2_backend") + +MODEL = "facebook/opt-125m" + + +def _get_env_var(worker, name): + return os.environ.get(name) + + +def _ray_init(): + """Start Ray with the project root on workers' PYTHONPATH. + + Without this, workers cannot unpickle actor classes defined in the + ``tests`` package, causing FunctionActorManager to fall back to + TemporaryActor which drops async method signatures.""" + project_root = str(pathlib.Path(__file__).resolve().parents[2]) + ray.init( + ignore_reinit_error=True, + runtime_env={"env_vars": {"PYTHONPATH": project_root}}, + ) + + +@pytest.fixture +def ray_init(): + _ray_init() + + +class _AsyncLLMActor: + def start(self, pg, bundle_indices=None, ray_runtime_env=None): + os.environ["VLLM_USE_RAY_V2_EXECUTOR_BACKEND"] = "1" + # Needed so collective_rpc can pickle _get_env_var over the + # AsyncLLM -> EngineCore ZMQ boundary. + os.environ["VLLM_ALLOW_INSECURE_SERIALIZATION"] = "1" + if bundle_indices is not None: + os.environ["VLLM_RAY_BUNDLE_INDICES"] = bundle_indices + else: + os.environ.pop("VLLM_RAY_BUNDLE_INDICES", None) + + from vllm.engine.arg_utils import AsyncEngineArgs + from vllm.v1.engine.async_llm import AsyncLLM + from vllm.v1.executor.abstract import Executor + + engine_args = AsyncEngineArgs( + model=MODEL, + tensor_parallel_size=2, + distributed_executor_backend="ray", + enforce_eager=True, + max_model_len=256, + gpu_memory_utilization=0.8, + ) + vllm_config = engine_args.create_engine_config() + vllm_config.parallel_config.placement_group = pg + if ray_runtime_env is not None: + vllm_config.parallel_config.ray_runtime_env = ray_runtime_env + + executor_class = Executor.get_class(vllm_config) + self.engine = AsyncLLM( + vllm_config=vllm_config, + executor_class=executor_class, + log_stats=False, + log_requests=False, + ) + + async def generate(self, prompt): + from vllm.sampling_params import SamplingParams + + params = SamplingParams(max_tokens=16) + result = None + async for output in self.engine.generate( + prompt, params, request_id="test_request_id" + ): + result = output + assert result is not None + return result.outputs[0].text + + async def generate_and_get_worker_envs(self, prompt, env_names): + from vllm.sampling_params import SamplingParams + + params = SamplingParams(max_tokens=16) + result = None + async for output in self.engine.generate( + prompt, params, request_id="test_request_id" + ): + result = output + assert result is not None + text = result.outputs[0].text + + env_results = {} + for name in env_names: + vals = await self.engine.collective_rpc( + _get_env_var, timeout=10, args=(name,) + ) + env_results[name] = vals + return text, env_results + + def shutdown(self): + if engine := getattr(self, "engine", None): + engine.shutdown() + del self.engine + gc.collect() + + +AsyncLLMActor = ray.remote(num_cpus=0, max_concurrency=1)(_AsyncLLMActor) + + +def test_multi_replicas(ray_init): + pg1 = ray.util.placement_group([{"GPU": 1, "CPU": 1}] * 2, strategy="PACK") + pg2 = ray.util.placement_group([{"GPU": 1, "CPU": 1}] * 2, strategy="PACK") + ray.get([pg1.ready(), pg2.ready()]) + + actor1 = AsyncLLMActor.remote() + actor2 = AsyncLLMActor.remote() + + ray.get(actor1.start.remote(pg1)) + ray.get(actor2.start.remote(pg2)) + + out1, out2 = ray.get( + [ + actor1.generate.remote("Hello world"), + actor2.generate.remote("Hello world"), + ] + ) + assert len(out1) > 0 + assert len(out2) > 0 + + +def test_multi_replicas_with_bundle_indices(ray_init): + pg = ray.util.placement_group([{"GPU": 1, "CPU": 1}] * 4, strategy="PACK") + ray.get(pg.ready()) + + actor1 = AsyncLLMActor.remote() + actor2 = AsyncLLMActor.remote() + + ray.get(actor1.start.remote(pg, bundle_indices="2,1")) + ray.get(actor2.start.remote(pg, bundle_indices="0,3")) + + out1, out2 = ray.get( + [ + actor1.generate.remote("Hello world"), + actor2.generate.remote("Hello world"), + ] + ) + assert len(out1) > 0 + assert len(out2) > 0 + + +def test_env_var_and_runtime_env_propagation(): + """ + Verify env vars (NCCL_, HF_) and parallel_config.ray_runtime_env + propagate to RayWorkerProc actors. + """ + sentinel_vars = { + "NCCL_DEBUG": "INFO", + "HF_TOKEN": "test_sentinel_token", + } + for k, v in sentinel_vars.items(): + os.environ[k] = v + + try: + # Called directly (not via the ray_init fixture) because sentinel + # env vars must be in os.environ before ray.init() so that Ray + # worker processes inherit them. + _ray_init() + + pg = ray.util.placement_group([{"GPU": 1, "CPU": 1}] * 2, strategy="PACK") + ray.get(pg.ready()) + + # Include the project root so that RayWorkerProc actors can + # unpickle _get_env_var. + project_root = str(pathlib.Path(__file__).resolve().parents[2]) + ray_runtime_env = { + "env_vars": { + "RAY_RUNTIME_ENV_TEST": "ray_runtime_env", + "PYTHONPATH": project_root, + }, + } + + actor = AsyncLLMActor.remote() + ray.get(actor.start.remote(pg, ray_runtime_env=ray_runtime_env)) + + all_env_names = list(sentinel_vars) + ["RAY_RUNTIME_ENV_TEST"] + text, env_results = ray.get( + actor.generate_and_get_worker_envs.remote("Hello world", all_env_names) + ) + assert len(text) > 0 + + for name, expected in sentinel_vars.items(): + for val in env_results[name]: + assert val == expected + + for val in env_results["RAY_RUNTIME_ENV_TEST"]: + assert val == "ray_runtime_env" + + finally: + for k in sentinel_vars: + os.environ.pop(k, None) diff --git a/tests/test_ray_env_utils.py b/tests/test_ray_env_utils.py new file mode 100644 index 000000000..d311de41b --- /dev/null +++ b/tests/test_ray_env_utils.py @@ -0,0 +1,51 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Tests for vllm.v1.executor.ray_env_utils.""" + +import os +from unittest.mock import patch + +from vllm.v1.executor.ray_env_utils import get_driver_env_vars + +WORKER_VARS: set[str] = { + "CUDA_VISIBLE_DEVICES", + "LOCAL_RANK", +} + + +class TestDefaultPropagation: + """All env vars are propagated unless explicitly excluded.""" + + @patch.dict(os.environ, {"NCCL_DEBUG": "INFO"}, clear=False) + def test_nccl_prefix(self): + assert get_driver_env_vars(WORKER_VARS)["NCCL_DEBUG"] == "INFO" + + @patch.dict(os.environ, {"HF_TOKEN": "secret"}, clear=False) + def test_hf_token(self): + assert "HF_TOKEN" in get_driver_env_vars(WORKER_VARS) + + @patch.dict(os.environ, {"LMCACHE_LOCAL_CPU": "True"}, clear=False) + def test_lmcache_prefix(self): + assert "LMCACHE_LOCAL_CPU" in get_driver_env_vars(WORKER_VARS) + + @patch.dict(os.environ, {"PYTHONHASHSEED": "42"}, clear=False) + def test_pythonhashseed(self): + assert get_driver_env_vars(WORKER_VARS)["PYTHONHASHSEED"] == "42" + + @patch.dict(os.environ, {"MYLIB_FOO": "bar"}, clear=False) + def test_arbitrary_var_propagated(self): + assert get_driver_env_vars(WORKER_VARS)["MYLIB_FOO"] == "bar" + + +class TestExclusion: + @patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0,1"}, clear=False) + def test_worker_specific_excluded(self): + assert "CUDA_VISIBLE_DEVICES" not in get_driver_env_vars(WORKER_VARS) + + @patch.dict(os.environ, {"LMCACHE_LOCAL_CPU": "True"}, clear=False) + @patch( + "vllm.v1.executor.ray_env_utils.RAY_NON_CARRY_OVER_ENV_VARS", + {"LMCACHE_LOCAL_CPU"}, + ) + def test_non_carry_over_blacklist(self): + assert "LMCACHE_LOCAL_CPU" not in get_driver_env_vars(WORKER_VARS) diff --git a/tests/utils_/test_ray_utils.py b/tests/utils_/test_ray_utils.py new file mode 100644 index 000000000..0872ae941 --- /dev/null +++ b/tests/utils_/test_ray_utils.py @@ -0,0 +1,100 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +from unittest.mock import MagicMock, patch + +import pytest + +from vllm.v1.executor.ray_utils import get_bundles_sorted_by_node + +NODE_A = "node_a" +NODE_B = "node_b" +NODE_C = "node_c" + +IP_A = "10.0.0.1" +IP_B = "10.0.0.2" +IP_C = "10.0.0.3" + +NODE_ID_TO_IP = {NODE_A: IP_A, NODE_B: IP_B, NODE_C: IP_C} + +MOCK_RAY_NODES = [ + {"NodeID": NODE_A, "NodeManagerAddress": IP_A, "Alive": True}, + {"NodeID": NODE_B, "NodeManagerAddress": IP_B, "Alive": True}, + {"NodeID": NODE_C, "NodeManagerAddress": IP_C, "Alive": True}, +] + + +@pytest.mark.parametrize( + "bundles_to_node_id,bundle_specs,expected", + [ + pytest.param( + {0: NODE_C, 1: NODE_A, 2: NODE_B, 3: NODE_C, 4: NODE_A, 5: NODE_B}, + [{"GPU": 1}] * 6, + [ + (1, NODE_A, IP_A), + (4, NODE_A, IP_A), + (2, NODE_B, IP_B), + (5, NODE_B, IP_B), + (0, NODE_C, IP_C), + (3, NODE_C, IP_C), + ], + ), + pytest.param( + {0: NODE_B, 1: NODE_B, 2: NODE_A, 3: NODE_A}, + [{"GPU": 1}] * 4, + [ + (2, NODE_A, IP_A), + (3, NODE_A, IP_A), + (0, NODE_B, IP_B), + (1, NODE_B, IP_B), + ], + ), + pytest.param( + {0: NODE_C, 1: NODE_B, 2: NODE_C, 3: NODE_B}, + [{"GPU": 1}] * 4, + [ + (1, NODE_B, IP_B), + (3, NODE_B, IP_B), + (0, NODE_C, IP_C), + (2, NODE_C, IP_C), + ], + ), + pytest.param( + {0: NODE_A, 1: NODE_A, 2: NODE_A}, + [{"GPU": 1}] * 3, + [(0, NODE_A, IP_A), (1, NODE_A, IP_A), (2, NODE_A, IP_A)], + ), + pytest.param( + {}, + [], + [], + ), + pytest.param( + {0: NODE_A, 1: NODE_B, 2: NODE_A}, + [{"CPU": 1}, {"GPU": 1}, {"GPU": 1}], + [(2, NODE_A, IP_A), (1, NODE_B, IP_B)], + ), + ], +) +def test_get_bundles_sorted_by_node(bundles_to_node_id, bundle_specs, expected): + mock_pg = MagicMock() + mock_pg.bundle_specs = bundle_specs + + mock_ctx = MagicMock() + mock_ctx.get_node_id.return_value = NODE_A + + with ( + patch( + "vllm.v1.executor.ray_utils.placement_group_table", + return_value={"bundles_to_node_id": bundles_to_node_id}, + ), + patch("vllm.v1.executor.ray_utils.ray") as mock_ray, + patch("vllm.v1.executor.ray_utils.current_platform") as mock_platform, + ): + mock_ray.get_runtime_context.return_value = mock_ctx + mock_ray.nodes.return_value = MOCK_RAY_NODES + mock_platform.ray_device_key = "GPU" + + result = get_bundles_sorted_by_node(mock_pg) + + assert result == expected diff --git a/vllm/envs.py b/vllm/envs.py index ec8d66314..0d68b0f97 100755 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -59,6 +59,7 @@ if TYPE_CHECKING: VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE: Literal["auto", "nccl", "shm"] = "auto" VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM: bool = False VLLM_USE_RAY_WRAPPED_PP_COMM: bool = True + VLLM_USE_RAY_V2_EXECUTOR_BACKEND: bool = False VLLM_XLA_USE_SPMD: bool = False VLLM_WORKER_MULTIPROC_METHOD: Literal["fork", "spawn"] = "fork" VLLM_ASSETS_CACHE: str = os.path.join(VLLM_CACHE_ROOT, "assets") @@ -753,6 +754,12 @@ environment_variables: dict[str, Callable[[], Any]] = { "VLLM_USE_RAY_WRAPPED_PP_COMM": lambda: bool( int(os.getenv("VLLM_USE_RAY_WRAPPED_PP_COMM", "1")) ), + # When True and distributed_executor_backend="ray", use RayExecutorV2 + # (MQ-based) instead of RayDistributedExecutor (compiled-graph backend). + # TODO (jeffreywang): Enabled by default in vLLM 0.20.0. + "VLLM_USE_RAY_V2_EXECUTOR_BACKEND": lambda: bool( + int(os.getenv("VLLM_USE_RAY_V2_EXECUTOR_BACKEND", "0")) + ), # Use dedicated multiprocess context for workers. # Both spawn and fork work "VLLM_WORKER_MULTIPROC_METHOD": env_with_choices( diff --git a/vllm/v1/executor/abstract.py b/vllm/v1/executor/abstract.py index 2c3538d9a..ae5af0d6f 100644 --- a/vllm/v1/executor/abstract.py +++ b/vllm/v1/executor/abstract.py @@ -7,6 +7,7 @@ from concurrent.futures import Future from functools import cached_property from typing import TYPE_CHECKING, Literal, TypeVar, overload +import vllm.envs as envs from vllm.config import VllmConfig from vllm.distributed.kv_transfer.kv_connector.utils import KVOutputAggregator from vllm.distributed.kv_transfer.kv_connector.v1.base import ( @@ -57,9 +58,14 @@ class Executor(ABC): ) executor_class = distributed_executor_backend elif distributed_executor_backend == "ray": - from vllm.v1.executor.ray_executor import RayDistributedExecutor + if envs.VLLM_USE_RAY_V2_EXECUTOR_BACKEND: + from vllm.v1.executor.ray_executor_v2 import RayExecutorV2 - executor_class = RayDistributedExecutor + executor_class = RayExecutorV2 + else: + from vllm.v1.executor.ray_executor import RayDistributedExecutor + + executor_class = RayDistributedExecutor elif distributed_executor_backend == "mp": from vllm.v1.executor.multiproc_executor import MultiprocExecutor diff --git a/vllm/v1/executor/ray_env_utils.py b/vllm/v1/executor/ray_env_utils.py new file mode 100644 index 000000000..6ce12b8ca --- /dev/null +++ b/vllm/v1/executor/ray_env_utils.py @@ -0,0 +1,18 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import os + +from vllm.ray.ray_env import RAY_NON_CARRY_OVER_ENV_VARS + + +def get_driver_env_vars( + worker_specific_vars: set[str], +) -> dict[str, str]: + """Return driver env vars to propagate to Ray workers. + + Returns everything from ``os.environ`` except ``worker_specific_vars`` + and user-configured exclusions (``RAY_NON_CARRY_OVER_ENV_VARS``). + """ + exclude_vars = worker_specific_vars | RAY_NON_CARRY_OVER_ENV_VARS + + return {key: value for key, value in os.environ.items() if key not in exclude_vars} diff --git a/vllm/v1/executor/ray_executor.py b/vllm/v1/executor/ray_executor.py index c4e5e7bc6..1dda2c294 100644 --- a/vllm/v1/executor/ray_executor.py +++ b/vllm/v1/executor/ray_executor.py @@ -23,6 +23,7 @@ from vllm.v1.core.sched.output import GrammarOutput, SchedulerOutput from vllm.v1.engine import ReconfigureDistributedRequest, ReconfigureRankType from vllm.v1.executor.abstract import Executor from vllm.v1.executor.ray_utils import ( + WORKER_SPECIFIC_ENV_VARS, FutureWrapper, RayWorkerWrapper, initialize_ray_cluster, @@ -62,17 +63,6 @@ class RayWorkerMetaData: class RayDistributedExecutor(Executor): """Ray-based distributed executor""" - # These env vars are worker-specific, therefore are NOT copied - # from the driver to the workers - WORKER_SPECIFIC_ENV_VARS = { - "VLLM_HOST_IP", - "VLLM_HOST_PORT", - "LOCAL_RANK", - "CUDA_VISIBLE_DEVICES", - "HIP_VISIBLE_DEVICES", - "ROCR_VISIBLE_DEVICES", - } - uses_ray: bool = True supports_pp: bool = True @@ -335,7 +325,7 @@ class RayDistributedExecutor(Executor): # Environment variables to copy from driver to workers env_vars_to_copy = get_env_vars_to_copy( - exclude_vars=self.WORKER_SPECIFIC_ENV_VARS, + exclude_vars=WORKER_SPECIFIC_ENV_VARS, additional_vars=set(current_platform.additional_env_vars), destination="workers", ) diff --git a/vllm/v1/executor/ray_executor_v2.py b/vllm/v1/executor/ray_executor_v2.py new file mode 100644 index 000000000..255bcb5fc --- /dev/null +++ b/vllm/v1/executor/ray_executor_v2.py @@ -0,0 +1,524 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import copy +import os +import threading +import weakref +from collections import defaultdict, deque +from dataclasses import dataclass +from typing import Any + +import vllm.envs as envs +from vllm.config import VllmConfig +from vllm.distributed.device_communicators.shm_broadcast import ( + Handle, + MessageQueue, +) +from vllm.logger import init_logger +from vllm.platforms import current_platform +from vllm.utils.network_utils import ( + get_distributed_init_method, + get_open_port, +) +from vllm.v1.executor.multiproc_executor import ( + FutureWrapper, + MultiprocExecutor, + WorkerProc, +) +from vllm.v1.executor.ray_env_utils import get_driver_env_vars +from vllm.v1.executor.ray_utils import ( + WORKER_SPECIFIC_ENV_VARS, + build_actor_name, + get_bundles_for_indices, + get_bundles_sorted_by_node, + initialize_ray_cluster, + ray, +) + +if ray is not None: + from ray.actor import ActorHandle + from ray.types import ObjectRef + from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy +else: + ActorHandle = None + +logger = init_logger(__name__) + + +@dataclass +class RayWorkerHandle: + """Handle for a Ray worker actor, compatible with MultiprocExecutor.""" + + actor: ActorHandle + """Ray worker actor""" + + rank: int + """Rank of the worker""" + + local_rank: int + """Local rank of the worker""" + + node_id: str + """Node ID of the worker""" + + bundle_id_idx: int = -1 + """Placement group bundle index for the worker""" + + run_ref: ObjectRef | None = None + """run() ObjectRef used as a sentinel for health monitoring""" + + def run(self): + """Start the worker's busy loop""" + self.run_ref = self.actor.run.remote() + + +class RayWorkerProc(WorkerProc): + """Worker process that runs inside a Ray actor. + + Initialization is split into two phases: + 1. __init__: lightweight setup, stores init args (no device/model init) + 2. initialize_worker: called after GPU IDs are discovered, completes + the full WorkerProc initialization with the correct local_rank and + CUDA_VISIBLE_DEVICES. + + CUDA_VISIBLE_DEVICES setup flow: + + 1. RayExecutorV2 enables RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES so Ray does + not set CUDA_VISIBLE_DEVICES on RayWorkerProc actors at creation time. + 2. Each actor is scheduled with a placement group and bundle index; Ray resolves + the physical GPU ID for that bundle at placement time. + 3. After placement, the worker discovers that GPU ID and sets + CUDA_VISIBLE_DEVICES before finishing WorkerProc initialization. + + There is no workaround for this unset-and-reset sequence when the placement group + is externally managed: scheduling must complete before CUDA_VISIBLE_DEVICES can + match the GPU tied to the worker's bundle. + + This sequence allows multiple vLLM instances to coexist on the same node: + each instance is unaware which physical devices others hold, and the + externally managed placement group avoids CUDA_VISIBLE_DEVICES conflicts + by binding workers to specific placement group bundles. + """ + + def __init__( + self, + vllm_config: VllmConfig, + rank: int, + distributed_init_method: str, + input_shm_handle: Handle, + is_driver_worker: bool, + is_driver_node: bool = False, + ): + # Defer WorkerProc.__init__ until GPU IDs are known. + self._is_driver_node = is_driver_node + self._init_kwargs = dict( + vllm_config=vllm_config, + rank=rank, + distributed_init_method=distributed_init_method, + input_shm_handle=input_shm_handle, + shared_worker_lock=None, + is_driver_worker=is_driver_worker, + ) + + def get_node_and_gpu_ids(self) -> tuple[str, list[int]]: + """Return (node_id, gpu_ids) assigned to this actor by Ray.""" + node_id = ray.get_runtime_context().get_node_id() + device_key = current_platform.ray_device_key + if not device_key: + raise RuntimeError( + f"current platform {current_platform.device_name} does not support ray." + ) + gpu_ids = ray.get_runtime_context().get_accelerator_ids()[device_key] + return node_id, [int(x) for x in gpu_ids] + + def initialize_worker( + self, + local_rank: int, + env_vars: dict[str, str], + driver_env_vars: dict[str, str] | None = None, + ) -> None: + """Complete initialization after GPU assignment is known. + + *driver_env_vars* are applied with ``setdefault`` — they fill + in missing vars but never overwrite node-local values. + *env_vars* (e.g. CUDA_VISIBLE_DEVICES) always overwrite. + """ + if driver_env_vars: + for key, value in driver_env_vars.items(): + os.environ.setdefault(key, value) + for key, value in env_vars.items(): + os.environ[key] = value + + self.local_rank = local_rank + super().__init__( + local_rank=local_rank, + **self._init_kwargs, + ) + + def _init_message_queues( + self, input_shm_handle: Handle, vllm_config: VllmConfig + ) -> None: + """ + Workers on the same node as the executor use shared memory for + both the broadcast (input) MQ and the response MQ. Workers on + different nodes use TCP (n_local_reader=0). + """ + self.rpc_broadcast_mq = MessageQueue.create_from_handle( + input_shm_handle, self.worker.rank + ) + + n_local = 1 if self._is_driver_node else 0 + # Use ray.util.get_node_ip_address() to get Ray's internal IP. + # get_ip() returns host's external IP which is typically not + # routable between nodes within the cluster. + self.worker_response_mq = MessageQueue( + n_reader=1, + n_local_reader=n_local, + connect_ip=ray.util.get_node_ip_address(), + ) + self.peer_response_handles: list[dict] = [] + + def wait_for_init(self) -> dict: + """Respond to the driver's wait_until_ready() barrier.""" + assert self.worker_response_mq is not None + return { + "status": self.READY_STR, + "handle": self.worker_response_mq.export_handle(), + } + + def run(self) -> None: + """Main entry point called via actor.run.remote().""" + try: + assert self.rpc_broadcast_mq is not None + self.rpc_broadcast_mq.wait_until_ready() + assert self.worker_response_mq is not None + self.worker_response_mq.wait_until_ready() + + self.worker_busy_loop() + except Exception as e: + logger.exception("RayWorkerProc failed: %s", e) + raise + finally: + self.shutdown() + + +class RayExecutorV2(MultiprocExecutor): + """Ray-based distributed executor using MessageQueue communication. + + Inherits from MultiprocExecutor to reuse the MQ-based control plane + and NCCL data plane. Workers are Ray actors. + + Async scheduling is enabled, inherited from MultiprocExecutor. + This is cricitcal for RayExecutorV2 to be performant. + """ + + uses_ray: bool = True + supports_pp: bool = True + + def __init__(self, vllm_config: VllmConfig): + super().__init__(vllm_config) + + def _build_runtime_env(self) -> dict: + """Build a runtime_env dict for RayWorkerProc actors. + + Driver env vars are applied separately via initialize_worker + with setdefault semantics. + """ + base = self.parallel_config.ray_runtime_env + runtime_env: dict = copy.deepcopy(dict(base)) if base else {} + + env_vars = runtime_env.setdefault("env_vars", {}) + env_vars.update({v: "1" for v in current_platform.ray_noset_device_env_vars}) + if self.parallel_config.ray_workers_use_nsight: + runtime_env["nsight"] = { + "t": "cuda,cudnn,cublas", + "o": "'worker_process_%p'", + "cuda-graph-trace": "node", + } + return runtime_env + + @staticmethod + def _get_actor_resource_kwargs() -> dict[str, Any]: + """Return Ray actor resource kwargs for the current platform.""" + num_devices = envs.VLLM_RAY_PER_WORKER_GPUS + device_key = current_platform.ray_device_key + if device_key == "GPU": + return {"num_gpus": num_devices} + return {"num_gpus": 0, "resources": {device_key: num_devices}} + + def _init_executor(self) -> None: + """Initialize the RayExecutorV2 executor.""" + self._finalizer = weakref.finalize(self, self.shutdown) + self.is_failed = False + self.failure_callback = None + self.shutting_down = False + self.shutdown_lock = threading.Lock() + + # Step 1: Initialize Ray cluster and retrieve placement group + if ray is None: + raise ImportError("Using Ray backend requires installation of ray.") + initialize_ray_cluster(self.parallel_config, require_gpu_on_driver=False) + placement_group = self.parallel_config.placement_group + + tp_size, pp_size, pcp_size = self._get_parallel_sizes() + assert self.world_size == tp_size * pp_size * pcp_size, ( + f"world_size ({self.world_size}) must be equal to the " + f"tensor_parallel_size ({tp_size}) x pipeline" + f"_parallel_size ({pp_size}) x prefill_context" + f"_parallel_size ({pcp_size}). " + ) + + # Step 2: Build bundle assignments for worker rank placement + # while respecting VLLM_RAY_BUNDLE_INDICES. + if envs.VLLM_RAY_BUNDLE_INDICES: + bundle_to_node_id = get_bundles_for_indices( + placement_group, + list(map(int, envs.VLLM_RAY_BUNDLE_INDICES.split(","))), + self.world_size, + ) + else: + bundle_to_node_id = get_bundles_sorted_by_node(placement_group) + driver_node = ray.get_runtime_context().get_node_id() + + bundle_assignments: list[dict[str, Any]] = [] + for rank, (bundle_id_idx, node_id, node_ip) in enumerate(bundle_to_node_id): + bundle_assignments.append( + { + "rank": rank, + "bundle_id_idx": bundle_id_idx, + "node_id": node_id, + "node_ip": node_ip, + } + ) + + # Step 3: Resolve the IP for torch.distributed TCPStore. + # The TCPStore server runs on rank 0's node, so all workers + # must be able to reach this address. + dist_ip = bundle_assignments[0]["node_ip"] + distributed_init_method = get_distributed_init_method(dist_ip, get_open_port()) + + # Step 4: Create broadcast MessageQueue. + # Workers on the driver node use shared memory; the rest use TCP. + max_chunk_bytes = envs.VLLM_MQ_MAX_CHUNK_BYTES_MB * 1024 * 1024 + n_local = sum(1 for a in bundle_assignments if a["node_id"] == driver_node) + self.rpc_broadcast_mq = MessageQueue( + self.world_size, + n_local, + max_chunk_bytes=max_chunk_bytes, + connect_ip=ray.util.get_node_ip_address(), + ) + scheduler_output_handle = self.rpc_broadcast_mq.export_handle() + + # Step 5: Spawn RayWorkerProc actors into PG bundles (deferred init). + # Workers are created lightweight here; full initialization happens + # in Step 7 after GPU IDs are discovered. + self.ray_worker_handles: list[RayWorkerHandle] = [] + instance_id = self.vllm_config.instance_id + + # Collect driver env vars and apply but don't overwrite node-local values. + self.driver_env_vars = get_driver_env_vars( + worker_specific_vars=WORKER_SPECIFIC_ENV_VARS, + ) + + runtime_env = self._build_runtime_env() + resource_kwargs = self._get_actor_resource_kwargs() + + for bundle_idx in range(self.world_size): + bundle = bundle_assignments[bundle_idx] + is_driver_worker = self._is_driver_worker(bundle["rank"]) + is_driver_node = bundle["node_id"] == driver_node + + scheduling_strategy = PlacementGroupSchedulingStrategy( + placement_group=placement_group, + placement_group_bundle_index=bundle["bundle_id_idx"], + ) + + actor_name = build_actor_name( + instance_id, bundle["rank"], tp_size, pp_size, pcp_size + ) + + actor = ( + ray.remote(RayWorkerProc) + .options( + name=actor_name, + num_cpus=0, + **resource_kwargs, + scheduling_strategy=scheduling_strategy, + runtime_env=runtime_env, + ) + .remote( + vllm_config=self.vllm_config, + rank=bundle["rank"], + distributed_init_method=distributed_init_method, + input_shm_handle=scheduler_output_handle, + is_driver_worker=is_driver_worker, + is_driver_node=is_driver_node, + ) + ) + + handle = RayWorkerHandle( + actor=actor, + rank=bundle["rank"], + local_rank=-1, # Set in Step 7 after GPU ID discovery + node_id=bundle["node_id"], + bundle_id_idx=bundle["bundle_id_idx"], + ) + self.ray_worker_handles.append(handle) + + # Step 6: Discover GPU IDs assigned to each worker via Ray runtime context. + worker_node_and_gpu_ids = ray.get( + [h.actor.get_node_and_gpu_ids.remote() for h in self.ray_worker_handles] + ) + + node_workers: dict[str, list[int]] = defaultdict(list) + node_gpus: dict[str, list[int]] = defaultdict(list) + for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids): + node_workers[node_id].append(i) + node_gpus[node_id].extend(gpu_ids) + for node_id, gpu_ids in node_gpus.items(): + node_gpus[node_id] = sorted(gpu_ids) + + # Step 7: Initialize workers with correct local_rank and + # CUDA_VISIBLE_DEVICES. Each worker sees all GPUs assigned to + # this executor on its node; local_rank indexes into that set. + init_worker_refs = [] + for i, (node_id, _) in enumerate(worker_node_and_gpu_ids): + local_rank = node_workers[node_id].index(i) + worker_env_vars = { + current_platform.device_control_env_var: ",".join( + map(str, node_gpus[node_id]) + ), + } + self.ray_worker_handles[i].local_rank = local_rank + init_worker_refs.append( + self.ray_worker_handles[i].actor.initialize_worker.remote( + local_rank, worker_env_vars, self.driver_env_vars + ) + ) + ray.get(init_worker_refs) + + # Step 8: Collect response MQ handles + init_results = ray.get( + [h.actor.wait_for_init.remote() for h in self.ray_worker_handles] + ) + + self.response_mqs: list[MessageQueue] = [] + for i, result in enumerate(init_results): + if result["status"] != RayWorkerProc.READY_STR: + raise RuntimeError(f"Worker {i} failed to initialize: {result}") + self.response_mqs.append( + MessageQueue.create_from_handle(result["handle"], 0) + ) + + # Step 9: Start run() before wait_until_ready() to avoid + # deadlock — workers send subscriptions inside run(). + for handle in self.ray_worker_handles: + handle.run() + + # Step 10: wait_until_ready() barrier + self.rpc_broadcast_mq.wait_until_ready() + for response_mq in self.response_mqs: + response_mq.wait_until_ready() + + self.futures_queue = deque[tuple[FutureWrapper, Any]]() + self._post_init_executor() + + self.start_worker_monitor() + self.output_rank = self._get_output_rank() + + def start_worker_monitor(self, inline=False) -> None: + """Monitor worker liveness via ray.wait() on run() ObjectRefs.""" + run_refs = [h.run_ref for h in self.ray_worker_handles if h.run_ref is not None] + if not run_refs: + raise RuntimeError("Ray workers have not started successfully.") + + self_ref = weakref.ref(self) + ref_to_rank = { + h.run_ref: h.rank for h in self.ray_worker_handles if h.run_ref is not None + } + + def _should_stop() -> bool: + executor = self_ref() + return not executor or executor.shutting_down + + def monitor_workers(): + # Poll with a timeout rather than blocking on ray.wait() + # because a blocking call would segfault if Ray is torn down + # while this thread is inside it. + while not _should_stop() and ray.is_initialized(): + try: + done, _ = ray.wait(run_refs, num_returns=1, timeout=5.0) + except Exception: + logger.exception( + "RayWorkerMonitor: unexpected error, exiting monitor thread" + ) + return + if not done or _should_stop(): + continue + + dead_ranks = [ref_to_rank[r] for r in done] + executor = self_ref() + if not executor: + return + executor.is_failed = True + logger.error( + "RayWorkerProc rank=%s died unexpectedly, shutting down executor.", + dead_ranks, + ) + executor.shutdown() + if executor.failure_callback is not None: + callback = executor.failure_callback + executor.failure_callback = None + callback() + return + + t = threading.Thread( + target=monitor_workers, daemon=True, name="RayWorkerMonitor" + ) + t.start() + self._monitor_thread = t + + def _join_monitor_thread(self) -> None: + """Wait for the monitor thread to exit. + + Must be called before tearing down Ray resources — the monitor + may be inside ray.wait() which would segfault if Ray is shut + down underneath it. When the monitor itself calls shutdown() + on worker death, we skip the join because the thread is about + to return anyway. + """ + monitor = getattr(self, "_monitor_thread", None) + if ( + monitor is not None + and monitor.is_alive() + and threading.current_thread() is not monitor + ): + monitor.join(timeout=10) + + def shutdown(self) -> None: + """Properly shut down the executor and its workers.""" + lock = getattr(self, "shutdown_lock", None) + if lock is None: + return + + with lock: + if getattr(self, "shutting_down", False): + return + self.shutting_down = True + + self._join_monitor_thread() + + for handle in getattr(self, "ray_worker_handles", []): + try: + ray.kill(handle.actor) + logger.debug("Killed actor rank=%d", handle.rank) + except Exception: + logger.exception("Failed to kill actor rank=%d", handle.rank) + + if rpc_broadcast_mq := getattr(self, "rpc_broadcast_mq", None): + rpc_broadcast_mq.shutdown() + self.rpc_broadcast_mq = None + + for mq in getattr(self, "response_mqs", []): + mq.shutdown() + self.response_mqs = [] diff --git a/vllm/v1/executor/ray_utils.py b/vllm/v1/executor/ray_utils.py index c363a837b..67b43bdc1 100644 --- a/vllm/v1/executor/ray_utils.py +++ b/vllm/v1/executor/ray_utils.py @@ -26,6 +26,17 @@ if TYPE_CHECKING: logger = init_logger(__name__) PG_WAIT_TIMEOUT = 1800 +# Env vars that are worker-specific and must NOT be copied from the +# driver to Ray workers — they are set per-worker after GPU discovery. +WORKER_SPECIFIC_ENV_VARS: set[str] = { + "VLLM_HOST_IP", + "VLLM_HOST_PORT", + "LOCAL_RANK", + "CUDA_VISIBLE_DEVICES", + "HIP_VISIBLE_DEVICES", + "ROCR_VISIBLE_DEVICES", +} + try: import ray from ray.util import placement_group_table @@ -51,6 +62,8 @@ try: # that thread. self.compiled_dag_cuda_device_set = False + rpc_rank: int + def adjust_rank(self, rank_mapping: dict[int, int]) -> None: """ Adjust the rpc_rank based on the given mapping. @@ -214,13 +227,17 @@ def assert_ray_available(): def _verify_bundles( - placement_group: "PlacementGroup", parallel_config: ParallelConfig, device_str: str + placement_group: "PlacementGroup", + parallel_config: ParallelConfig, + device_str: str, + require_gpu_on_driver: bool = True, ): """Verify a given placement group has bundles located in the right place. There are 2 rules. - Warn if all tensor parallel workers cannot fit in a single node. - - Fail if driver node is not included in a placement group. + - Fail if driver node is not included in a placement group + (only when require_gpu_on_driver is True). """ assert ray.is_initialized(), ( "Ray is not initialized although distributed-executor-backend is ray." @@ -237,7 +254,7 @@ def _verify_bundles( node_id_to_bundle[node_id].append(bundles[bundle_idx]) driver_node_id = ray.get_runtime_context().get_node_id() - if driver_node_id not in node_id_to_bundle: + if require_gpu_on_driver and driver_node_id not in node_id_to_bundle: raise RuntimeError( f"driver node id {driver_node_id} is not included in a placement " f"group {placement_group.id}. Node id -> bundles " @@ -266,6 +283,115 @@ def _verify_bundles( ) +def build_actor_name( + instance_id: str, + rank: int, + tp_size: int, + pp_size: int, + pcp_size: int, +) -> str: + """Build a descriptive Ray actor name for dashboard visibility.""" + name = f"vllm_Worker_{instance_id}" + if tp_size > 1: + name += f"_TP{rank % tp_size}" + if pp_size > 1: + name += f"_PP{(rank // tp_size) % pp_size}" + if pcp_size > 1: + name += f"_PCP{rank // (tp_size * pp_size)}" + return name + + +def get_bundles_for_indices( + placement_group: "PlacementGroup", + bundle_indices: list[int], + world_size: int, +) -> list[tuple[int, str, str]]: + """ + Return GPU bundle indices paired with node IDs and node IPs for + explicit bundle indices specified via VLLM_RAY_BUNDLE_INDICES. + """ + assert len(bundle_indices) == world_size, ( + "VLLM_RAY_BUNDLE_INDICES must have the same size" + f" as the world size, but got {bundle_indices=} " + f"and {world_size=}" + ) + assert len(set(bundle_indices)) == len(bundle_indices), ( + "VLLM_RAY_BUNDLE_INDICES cannot have duplicate values," + f" but got {bundle_indices=}" + ) + + pg_data = placement_group_table(placement_group) + pg_bundle_to_node = pg_data["bundles_to_node_id"] + node_id_to_ip = { + n["NodeID"]: n["NodeManagerAddress"] for n in ray.nodes() if n["Alive"] + } + return [ + (bid, pg_bundle_to_node[bid], node_id_to_ip[pg_bundle_to_node[bid]]) + for bid in bundle_indices + ] + + +def get_bundles_sorted_by_node( + placement_group: "PlacementGroup", +) -> list[tuple[int, str, str]]: + """ + Return GPU bundle indices paired with node IDs and node IPs, + sorted driver-first. + + This utility has to be invoked from the driver node. + + Example: 3-node cluster, driver on node-A, PG bundles spread + across nodes: + + Input: [ + (0, node-C), + (1, node-A), + (2, node-B), + (3, node-C), + (4, node-A), + (5, node-B), + ] + Output: [ + (1, node-A), + (4, node-A), + (2, node-B), + (5, node-B), + (0, node-C), + (3, node-C), + ] + """ + pg_data = placement_group_table(placement_group) + bundle_to_node = pg_data["bundles_to_node_id"] + + ray_device_key = current_platform.ray_device_key + if not ray_device_key: + raise ValueError( + f"current platform {current_platform.device_name} does not support ray." + ) + + node_id_to_ip = { + n["NodeID"]: n["NodeManagerAddress"] for n in ray.nodes() if n["Alive"] + } + + bundle_specs = placement_group.bundle_specs + assert bundle_specs is not None + bundle_to_node_id: list[tuple[int, str, str]] = [] + for bundle_idx, bundle in enumerate(bundle_specs): + if bundle.get(ray_device_key): + node_id = bundle_to_node.get(bundle_idx) + bundle_to_node_id.append((bundle_idx, node_id, node_id_to_ip[node_id])) + + driver_node = ray.get_runtime_context().get_node_id() + + def _sort_key(item): + _, node_id, _ = item + return (0 if node_id == driver_node else 1, node_id) + + bundle_to_node_id.sort(key=_sort_key) + + return bundle_to_node_id + + def _wait_until_pg_ready(current_placement_group: "PlacementGroup"): """Wait until a placement group is ready. @@ -352,6 +478,7 @@ def _wait_until_pg_removed(current_placement_group: "PlacementGroup"): def initialize_ray_cluster( parallel_config: ParallelConfig, ray_address: str | None = None, + require_gpu_on_driver: bool = True, ): """Initialize the distributed cluster with Ray. @@ -363,10 +490,18 @@ def initialize_ray_cluster( parallel_config: The configurations for parallel execution. ray_address: The address of the Ray cluster. If None, uses the default Ray cluster address. + require_gpu_on_driver: If True (default), require at least one GPU + on the current (driver) node and pin the first PG bundle to it. + Set to False for executors like RayExecutorV2 where all GPU work + is delegated to remote Ray actors. """ assert_ray_available() from vllm.platforms import current_platform + # Disable Ray usage stats collection + if os.environ.get("RAY_USAGE_STATS_ENABLED", "0") != "1": + os.environ["RAY_USAGE_STATS_ENABLED"] = "0" + # Prevalidate GPU requirements before Ray processing if current_platform.is_cuda() and parallel_config.world_size > 1: available_gpus = current_platform.device_count() @@ -459,16 +594,20 @@ def initialize_ray_cluster( current_ip = get_ip() current_node_id = ray.get_runtime_context().get_node_id() current_node_resource = available_resources_per_node()[current_node_id] - if current_node_resource.get(device_str, 0) < 1: - raise ValueError( - f"Current node has no {device_str} available. " - f"{current_node_resource=}. vLLM engine cannot start without " - f"{device_str}. Make sure you have at least 1 {device_str} " - f"available in a node {current_node_id=} {current_ip=}." - ) - # This way, at least bundle is required to be created in a current - # node. - placement_group_specs[0][f"node:{current_ip}"] = 0.001 + # TODO (jeffreywang): require_gpu_on_driver should be always False + # after deprecating RayDistributedExecutor. + if require_gpu_on_driver: + if current_node_resource.get(device_str, 0) < 1: + raise ValueError( + f"Current node has no {device_str} available. " + f"{current_node_resource=}. vLLM engine cannot start " + f"without {device_str}. Make sure you have at least 1 " + f"{device_str} available in a node " + f"{current_node_id=} {current_ip=}." + ) + # This way, at least bundle is required to be created in a + # current node. + placement_group_specs[0][f"node:{current_ip}"] = 0.001 # By default, Ray packs resources as much as possible. current_placement_group = ray.util.placement_group( @@ -477,7 +616,9 @@ def initialize_ray_cluster( _wait_until_pg_ready(current_placement_group) assert current_placement_group is not None - _verify_bundles(current_placement_group, parallel_config, device_str) + _verify_bundles( + current_placement_group, parallel_config, device_str, require_gpu_on_driver + ) # Set the placement group in the parallel config parallel_config.placement_group = current_placement_group diff --git a/vllm/v1/worker/worker_base.py b/vllm/v1/worker/worker_base.py index 041fff637..6a80e3f37 100644 --- a/vllm/v1/worker/worker_base.py +++ b/vllm/v1/worker/worker_base.py @@ -195,8 +195,8 @@ class WorkerWrapperBase: All workers have rpc_rank=0, but they have different ranks in the TP group. """ - self.rpc_rank = rpc_rank - self.global_rank = self.rpc_rank if global_rank is None else global_rank + self.rpc_rank: int = rpc_rank + self.global_rank: int = self.rpc_rank if global_rank is None else global_rank # Initialized after init_worker is called self.worker: WorkerBase