[Feat][Executor] Introduce RayExecutorV2 (#36836)

Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
This commit is contained in:
Jeffrey Wang
2026-04-01 14:34:29 -07:00
committed by GitHub
parent cb268e4e55
commit de5e6c44c6
14 changed files with 1603 additions and 30 deletions

View File

@@ -224,6 +224,20 @@ steps:
commands:
- ./.buildkite/scripts/run-multi-node-test.sh /vllm-workspace/tests 2 2 $IMAGE_TAG "VLLM_TEST_SAME_HOST=0 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_same_node.py | grep 'Same node test passed' && NUM_NODES=2 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_node_count.py | grep 'Node count test passed' && python3 ../examples/offline_inference/data_parallel.py -dp=2 -tp=1 --dp-num-nodes=2 --dp-node-rank=0 --dp-master-addr=192.168.10.10 --dp-master-port=12345 --enforce-eager --trust-remote-code && VLLM_MULTI_NODE=1 pytest -v -s distributed/test_multi_node_assignment.py && VLLM_MULTI_NODE=1 pytest -v -s distributed/test_pipeline_parallel.py" "VLLM_TEST_SAME_HOST=0 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_same_node.py | grep 'Same node test passed' && NUM_NODES=2 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_node_count.py | grep 'Node count test passed' && python3 ../examples/offline_inference/data_parallel.py -dp=2 -tp=1 --dp-num-nodes=2 --dp-node-rank=1 --dp-master-addr=192.168.10.10 --dp-master-port=12345 --enforce-eager --trust-remote-code"
- label: MessageQueue TCP Multi-Node (2 GPUs)
timeout_in_minutes: 10
working_dir: "/vllm-workspace/tests"
num_devices: 1
num_nodes: 2
no_plugin: true
optional: true
source_file_dependencies:
- vllm/distributed/device_communicators/shm_broadcast.py
- vllm/distributed/parallel_state.py
- tests/distributed/test_mq_tcp_multinode.py
commands:
- ./.buildkite/scripts/run-multi-node-test.sh /vllm-workspace/tests 2 1 $IMAGE_TAG "torchrun --nnodes 2 --nproc-per-node=1 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_mq_tcp_multinode.py" "torchrun --nnodes 2 --nproc-per-node=1 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_mq_tcp_multinode.py"
- label: Distributed NixlConnector PD accuracy (4 GPUs)
timeout_in_minutes: 30
working_dir: "/vllm-workspace/tests"
@@ -294,3 +308,23 @@ steps:
commands:
- pytest -v -s distributed/test_pp_cudagraph.py
- pytest -v -s distributed/test_pipeline_parallel.py
- label: RayExecutorV2 (4 GPUs)
timeout_in_minutes: 60
working_dir: "/vllm-workspace/tests"
num_devices: 4
source_file_dependencies:
- vllm/v1/executor/ray_executor_v2.py
- vllm/v1/executor/abstract.py
- vllm/v1/executor/multiproc_executor.py
- tests/distributed/test_ray_v2_executor.py
- tests/distributed/test_ray_v2_executor_e2e.py
- tests/distributed/test_pipeline_parallel.py
- tests/basic_correctness/test_basic_correctness.py
commands:
- export VLLM_USE_RAY_V2_EXECUTOR_BACKEND=1
- export NCCL_CUMEM_HOST_ENABLE=0
- pytest -v -s distributed/test_ray_v2_executor.py
- pytest -v -s distributed/test_ray_v2_executor_e2e.py
- pytest -v -s distributed/test_pipeline_parallel.py -k "ray"
- TARGET_TEST_SUITE=L4 pytest -v -s basic_correctness/test_basic_correctness.py -k "ray"

View File

@@ -1,5 +1,6 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import os
import random
import msgspec
@@ -166,3 +167,31 @@ class MockSubscriber:
self.sub.close()
for replay in self.replay_sockets:
replay.close()
@pytest.fixture
def enable_ray_v2_backend():
"""Set env vars for the Ray V2 executor backend and shut down Ray
between tests."""
import ray
saved = {
"VLLM_USE_RAY_V2_EXECUTOR_BACKEND": os.environ.get(
"VLLM_USE_RAY_V2_EXECUTOR_BACKEND"
),
"VLLM_ENABLE_V1_MULTIPROCESSING": os.environ.get(
"VLLM_ENABLE_V1_MULTIPROCESSING"
),
}
os.environ["VLLM_USE_RAY_V2_EXECUTOR_BACKEND"] = "1"
os.environ["VLLM_ENABLE_V1_MULTIPROCESSING"] = "0"
if ray.is_initialized():
ray.shutdown()
try:
yield
finally:
if ray.is_initialized():
ray.shutdown()
os.environ.update({k: v for k, v in saved.items() if v is not None})
for key in (k for k, v in saved.items() if v is None):
os.environ.pop(key, None)

View File

@@ -0,0 +1,119 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
Multi-node integration test for MessageQueue TCP fallback.
Verifies that when writer and readers span separate nodes (Docker containers
with isolated /dev/shm), `create_from_process_group` correctly detects
cross-node ranks via `in_the_same_node_as()` and falls back to ZMQ TCP
transport — and that data actually arrives.
"""
import numpy as np
import torch.distributed as dist
from vllm.distributed.device_communicators.shm_broadcast import MessageQueue
from vllm.distributed.parallel_state import in_the_same_node_as
def main():
dist.init_process_group(backend="gloo")
rank = dist.get_rank()
world_size = dist.get_world_size()
assert world_size >= 2, (
f"Need at least 2 ranks across nodes, got world_size={world_size}"
)
# Verify that in_the_same_node_as detects cross-node correctly
status = in_the_same_node_as(dist.group.WORLD, source_rank=0)
local_count = sum(status)
print(
f"[Rank {rank}] in_the_same_node_as(source=0): {status} "
f"(local={local_count}/{world_size})"
)
# With 2 Docker containers (1 proc each), rank 0 and rank 1
# should be on different nodes.
assert local_count < world_size, (
f"Expected cross-node ranks but all {world_size} ranks appear local."
)
# Create MessageQueue
writer_rank = 0
mq = MessageQueue.create_from_process_group(
dist.group.WORLD,
max_chunk_bytes=1024 * 1024, # 1 MiB
max_chunks=10,
writer_rank=writer_rank,
)
# Verify the transport path selection
if rank == writer_rank:
print(
f"[Rank {rank}] Writer: n_local_reader={mq.n_local_reader}, "
f"n_remote_reader={mq.n_remote_reader}"
)
assert mq.n_remote_reader > 0, (
"Writer should have at least 1 remote (TCP) reader in a multi-node setup."
)
else:
if status[rank]:
assert mq._is_local_reader, (
f"Rank {rank} is on the same node as writer but is not a local reader."
)
print(f"[Rank {rank}] Reader: local (shared memory)")
else:
assert mq._is_remote_reader, (
f"Rank {rank} is on a different node but is not a remote (TCP) reader."
)
print(f"[Rank {rank}] Reader: remote (TCP)")
# Test data transfer: simple objects
dist.barrier()
if rank == writer_rank:
mq.enqueue("hello_from_node0")
else:
msg = mq.dequeue(timeout=10)
assert msg == "hello_from_node0"
dist.barrier()
print(f"[Rank {rank}] Simple object test passed")
# Test data transfer: numpy arrays
np.random.seed(42)
arrays = [
np.random.randint(0, 100, size=np.random.randint(100, 5000)) for _ in range(100)
]
dist.barrier()
if rank == writer_rank:
for arr in arrays:
mq.enqueue(arr)
else:
for i, expected in enumerate(arrays):
received = mq.dequeue(timeout=10)
assert np.array_equal(expected, received), (
f"Array mismatch at index {i}: "
f"expected shape {expected.shape}, got shape {received.shape}"
)
dist.barrier()
print(f"[Rank {rank}] Numpy array test passed")
# Test data transfer: large payload (> max_chunk_bytes)
dist.barrier()
big_array = np.zeros(200_000, dtype=np.int64) # ~1.6 MiB > 1 MiB chunk
if rank == writer_rank:
mq.enqueue(big_array)
else:
received = mq.dequeue(timeout=10)
assert np.array_equal(big_array, received)
dist.barrier()
print(f"[Rank {rank}] Large payload test passed")
# Done -- cleanup
dist.barrier()
print(f"[Rank {rank}] All MessageQueue TCP multi-node tests passed!")
dist.destroy_process_group()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,345 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
Integration tests for RayExecutorV2 at the executor level.
Validates executor initialization, placement group support, RPC calls,
and distributed execution with various TP/PP configurations.
"""
import gc
import threading
from unittest.mock import patch
import pytest
import ray
from vllm import LLM
from vllm.config import VllmConfig
from vllm.engine.arg_utils import EngineArgs
from vllm.v1.executor.ray_executor_v2 import RayExecutorV2
pytestmark = pytest.mark.usefixtures("enable_ray_v2_backend")
MODEL = "facebook/opt-125m"
def create_vllm_config(
tensor_parallel_size: int = 1,
pipeline_parallel_size: int = 1,
max_model_len: int = 256,
gpu_memory_utilization: float = 0.3,
placement_group=None,
) -> VllmConfig:
engine_args = EngineArgs(
model=MODEL,
tensor_parallel_size=tensor_parallel_size,
pipeline_parallel_size=pipeline_parallel_size,
max_model_len=max_model_len,
gpu_memory_utilization=gpu_memory_utilization,
distributed_executor_backend="ray",
enforce_eager=True,
)
vllm_config = engine_args.create_engine_config()
if placement_group is not None:
vllm_config.parallel_config.placement_group = placement_group
return vllm_config
def ensure_ray_initialized():
if not ray.is_initialized():
ray.init(ignore_reinit_error=True)
@pytest.fixture
def create_placement_group(request):
ensure_ray_initialized()
num_gpus = request.param
bundles = [{"GPU": 1, "CPU": 1} for _ in range(num_gpus)]
pg = ray.util.placement_group(bundles, strategy="PACK")
ray.get(pg.ready())
yield pg
ray.util.remove_placement_group(pg)
@pytest.fixture
def executor(request):
"""Create a RayExecutorV2 and shut it down after the test."""
executor = RayExecutorV2(vllm_config=request.param)
yield executor
executor.shutdown()
def assert_executor(executor, tp_size, pp_size):
"""Common assertions for executor initialization tests."""
world_size = tp_size * pp_size
expected_output_rank = (pp_size - 1) * tp_size
assert executor.world_size == world_size
assert len(executor.ray_worker_handles) == world_size
assert len(executor.response_mqs) == world_size
assert executor._get_output_rank() == expected_output_rank
if pp_size > 1:
assert executor.max_concurrent_batches == pp_size
executor.check_health()
assert not executor.is_failed
ranks = sorted(h.rank for h in executor.ray_worker_handles)
assert ranks == list(range(world_size))
for handle in executor.ray_worker_handles:
assert handle.node_id is not None
@pytest.mark.parametrize("tp_size, pp_size", [(1, 1), (2, 1), (4, 1), (2, 2)])
def test_ray_v2_executor(tp_size, pp_size):
"""Validate RayExecutorV2 with various TP/PP configs."""
vllm_config = create_vllm_config(
tensor_parallel_size=tp_size,
pipeline_parallel_size=pp_size,
)
executor = RayExecutorV2(vllm_config=vllm_config)
try:
assert_executor(executor, tp_size, pp_size)
finally:
executor.shutdown()
@pytest.mark.parametrize(
"tp_size, pp_size, create_placement_group",
[(2, 1, 2), (4, 1, 4), (2, 2, 4)],
indirect=["create_placement_group"],
)
def test_ray_v2_executor_pg(tp_size, pp_size, create_placement_group):
"""Validate RayExecutorV2 with various TP/PP configs using external PG."""
vllm_config = create_vllm_config(
tensor_parallel_size=tp_size,
pipeline_parallel_size=pp_size,
placement_group=create_placement_group,
)
executor = RayExecutorV2(vllm_config=vllm_config)
try:
assert_executor(executor, tp_size, pp_size)
finally:
executor.shutdown()
@pytest.mark.parametrize(
"executor",
[create_vllm_config(tensor_parallel_size=2)],
indirect=True,
)
def test_ray_v2_executor_failure_callback(executor):
"""Validate failure callback registration."""
callback_invoked = False
def test_callback():
nonlocal callback_invoked
callback_invoked = True
executor.register_failure_callback(test_callback)
assert not callback_invoked
executor.is_failed = True
executor.register_failure_callback(test_callback)
assert callback_invoked
@pytest.mark.parametrize(
"executor",
[create_vllm_config(tensor_parallel_size=2)],
indirect=True,
)
def test_ray_v2_executor_collective_rpc(executor):
"""Validate collective RPC calls through MessageQueue."""
executor.check_health()
assert not executor.is_failed
assert executor.rpc_broadcast_mq is not None
@pytest.mark.parametrize(
"executor",
[create_vllm_config(tensor_parallel_size=2)],
indirect=True,
)
def test_ray_v2_executor_driver_node_rank_0(executor):
"""Validate that driver node workers get the lowest ranks."""
driver_node = ray.get_runtime_context().get_node_id()
for handle in executor.ray_worker_handles:
assert handle.node_id == driver_node
rank0_handle = next(h for h in executor.ray_worker_handles if h.rank == 0)
assert rank0_handle.node_id == driver_node
@pytest.mark.parametrize(
"executor",
[create_vllm_config(tensor_parallel_size=2)],
indirect=True,
)
def test_ray_v2_executor_worker_death(executor):
"""Validate executor detects worker death via ray.wait()."""
callback_event = threading.Event()
def on_failure():
callback_event.set()
executor.register_failure_callback(on_failure)
assert not executor.is_failed
# Kill one worker actor externally
victim = executor.ray_worker_handles[1].actor
ray.kill(victim, no_restart=True)
# Monitor thread should detect the death and invoke callback
assert callback_event.wait(timeout=30)
assert executor.is_failed
assert executor.shutting_down
def test_ray_v2_executor_shutdown():
"""Validate graceful shutdown: ray.kill() terminates all worker actors."""
executor = RayExecutorV2(vllm_config=create_vllm_config(tensor_parallel_size=2))
assert executor.rpc_broadcast_mq is not None
assert len(executor.response_mqs) == executor.world_size
actors = [h.actor for h in executor.ray_worker_handles]
executor.shutdown()
for actor in actors:
with pytest.raises(ray.exceptions.RayActorError):
ray.get(actor.wait_for_init.remote(), timeout=5)
assert executor.rpc_broadcast_mq is None
assert len(executor.response_mqs) == 0
@pytest.mark.parametrize(
"executor",
[create_vllm_config(tensor_parallel_size=2)],
indirect=True,
)
def test_ray_v2_run_refs_stored_for_monitoring(executor):
"""Validate worker handles store run_ref for monitoring."""
for handle in executor.ray_worker_handles:
assert handle.run_ref is not None
ready, _ = ray.wait([handle.run_ref], timeout=0)
assert len(ready) == 0, "run_ref should be pending"
@pytest.mark.parametrize("tp_size, pp_size", [(2, 1), (2, 2)])
def test_ray_v2_single_node_generation(tp_size, pp_size):
"""End-to-end LLM generation with RayExecutorV2."""
llm = LLM(
model=MODEL,
tensor_parallel_size=tp_size,
pipeline_parallel_size=pp_size,
distributed_executor_backend="ray",
enforce_eager=True,
max_model_len=256,
gpu_memory_utilization=0.3,
)
try:
prompts = [
"Hello, my name is",
"The capital of France is",
"The future of AI is",
]
outputs = llm.generate(prompts)
assert len(outputs) == len(prompts)
for output in outputs:
assert len(output.outputs) > 0
assert len(output.outputs[0].text) > 0
finally:
llm.llm_engine.model_executor.shutdown()
del llm
gc.collect()
@pytest.mark.parametrize(
"bundle_indices, expected_bundle_ids, create_placement_group",
[("2,3", [2, 3], 4), ("3,2", [3, 2], 4)],
indirect=["create_placement_group"],
)
def test_ray_v2_bundle_indices_env(
bundle_indices, expected_bundle_ids, create_placement_group, monkeypatch
):
"""Validate explicit VLLM_RAY_BUNDLE_INDICES bundle placement."""
monkeypatch.setenv("VLLM_RAY_BUNDLE_INDICES", bundle_indices)
vllm_config = create_vllm_config(
tensor_parallel_size=2,
placement_group=create_placement_group,
)
executor = RayExecutorV2(vllm_config=vllm_config)
try:
actual = [
h.bundle_id_idx
for h in sorted(executor.ray_worker_handles, key=lambda h: h.rank)
]
assert actual == expected_bundle_ids
assert_executor(executor, tp_size=2, pp_size=1)
finally:
executor.shutdown()
@pytest.mark.parametrize(
"bundle_indices, expected_error, create_placement_group",
[
("1,1", "cannot have duplicate values,", 4),
("0,1,2", "must have the same size", 4),
],
indirect=["create_placement_group"],
)
def test_ray_v2_invalid_bundle_indices(
bundle_indices, expected_error, create_placement_group, monkeypatch
):
"""Validate invalid bundle indices are rejected."""
monkeypatch.setenv("VLLM_RAY_BUNDLE_INDICES", bundle_indices)
vllm_config = create_vllm_config(
tensor_parallel_size=2, placement_group=create_placement_group
)
with pytest.raises(AssertionError, match=expected_error):
RayExecutorV2(vllm_config=vllm_config)
@pytest.mark.parametrize("tp_size, pp_size", [(2, 1), (2, 2)])
def test_ray_v2_single_node_generation_with_pg(tp_size, pp_size):
"""E2E LLM generation with a user-provided placement group."""
ensure_ray_initialized()
bundles = [{"GPU": 1, "CPU": 1} for _ in range(tp_size * pp_size)]
pg = ray.util.placement_group(bundles, strategy="PACK")
ray.get(pg.ready())
try:
with patch.object(ray.util, "get_current_placement_group", return_value=pg):
llm = LLM(
model=MODEL,
tensor_parallel_size=tp_size,
pipeline_parallel_size=pp_size,
distributed_executor_backend="ray",
enforce_eager=True,
max_model_len=256,
gpu_memory_utilization=0.3,
)
prompts = [
"Hello, my name is",
"The capital of France is",
"The future of AI is",
]
outputs = llm.generate(prompts)
assert len(outputs) == len(prompts)
for output in outputs:
assert len(output.outputs) > 0
assert len(output.outputs[0].text) > 0
finally:
llm.llm_engine.model_executor.shutdown()
del llm
gc.collect()

View File

@@ -0,0 +1,209 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
Orchestration-level integration tests for RayExecutorV2.
"""
import gc
import os
import pathlib
import pytest
import ray
pytestmark = pytest.mark.usefixtures("enable_ray_v2_backend")
MODEL = "facebook/opt-125m"
def _get_env_var(worker, name):
return os.environ.get(name)
def _ray_init():
"""Start Ray with the project root on workers' PYTHONPATH.
Without this, workers cannot unpickle actor classes defined in the
``tests`` package, causing FunctionActorManager to fall back to
TemporaryActor which drops async method signatures."""
project_root = str(pathlib.Path(__file__).resolve().parents[2])
ray.init(
ignore_reinit_error=True,
runtime_env={"env_vars": {"PYTHONPATH": project_root}},
)
@pytest.fixture
def ray_init():
_ray_init()
class _AsyncLLMActor:
def start(self, pg, bundle_indices=None, ray_runtime_env=None):
os.environ["VLLM_USE_RAY_V2_EXECUTOR_BACKEND"] = "1"
# Needed so collective_rpc can pickle _get_env_var over the
# AsyncLLM -> EngineCore ZMQ boundary.
os.environ["VLLM_ALLOW_INSECURE_SERIALIZATION"] = "1"
if bundle_indices is not None:
os.environ["VLLM_RAY_BUNDLE_INDICES"] = bundle_indices
else:
os.environ.pop("VLLM_RAY_BUNDLE_INDICES", None)
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.v1.engine.async_llm import AsyncLLM
from vllm.v1.executor.abstract import Executor
engine_args = AsyncEngineArgs(
model=MODEL,
tensor_parallel_size=2,
distributed_executor_backend="ray",
enforce_eager=True,
max_model_len=256,
gpu_memory_utilization=0.8,
)
vllm_config = engine_args.create_engine_config()
vllm_config.parallel_config.placement_group = pg
if ray_runtime_env is not None:
vllm_config.parallel_config.ray_runtime_env = ray_runtime_env
executor_class = Executor.get_class(vllm_config)
self.engine = AsyncLLM(
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=False,
log_requests=False,
)
async def generate(self, prompt):
from vllm.sampling_params import SamplingParams
params = SamplingParams(max_tokens=16)
result = None
async for output in self.engine.generate(
prompt, params, request_id="test_request_id"
):
result = output
assert result is not None
return result.outputs[0].text
async def generate_and_get_worker_envs(self, prompt, env_names):
from vllm.sampling_params import SamplingParams
params = SamplingParams(max_tokens=16)
result = None
async for output in self.engine.generate(
prompt, params, request_id="test_request_id"
):
result = output
assert result is not None
text = result.outputs[0].text
env_results = {}
for name in env_names:
vals = await self.engine.collective_rpc(
_get_env_var, timeout=10, args=(name,)
)
env_results[name] = vals
return text, env_results
def shutdown(self):
if engine := getattr(self, "engine", None):
engine.shutdown()
del self.engine
gc.collect()
AsyncLLMActor = ray.remote(num_cpus=0, max_concurrency=1)(_AsyncLLMActor)
def test_multi_replicas(ray_init):
pg1 = ray.util.placement_group([{"GPU": 1, "CPU": 1}] * 2, strategy="PACK")
pg2 = ray.util.placement_group([{"GPU": 1, "CPU": 1}] * 2, strategy="PACK")
ray.get([pg1.ready(), pg2.ready()])
actor1 = AsyncLLMActor.remote()
actor2 = AsyncLLMActor.remote()
ray.get(actor1.start.remote(pg1))
ray.get(actor2.start.remote(pg2))
out1, out2 = ray.get(
[
actor1.generate.remote("Hello world"),
actor2.generate.remote("Hello world"),
]
)
assert len(out1) > 0
assert len(out2) > 0
def test_multi_replicas_with_bundle_indices(ray_init):
pg = ray.util.placement_group([{"GPU": 1, "CPU": 1}] * 4, strategy="PACK")
ray.get(pg.ready())
actor1 = AsyncLLMActor.remote()
actor2 = AsyncLLMActor.remote()
ray.get(actor1.start.remote(pg, bundle_indices="2,1"))
ray.get(actor2.start.remote(pg, bundle_indices="0,3"))
out1, out2 = ray.get(
[
actor1.generate.remote("Hello world"),
actor2.generate.remote("Hello world"),
]
)
assert len(out1) > 0
assert len(out2) > 0
def test_env_var_and_runtime_env_propagation():
"""
Verify env vars (NCCL_, HF_) and parallel_config.ray_runtime_env
propagate to RayWorkerProc actors.
"""
sentinel_vars = {
"NCCL_DEBUG": "INFO",
"HF_TOKEN": "test_sentinel_token",
}
for k, v in sentinel_vars.items():
os.environ[k] = v
try:
# Called directly (not via the ray_init fixture) because sentinel
# env vars must be in os.environ before ray.init() so that Ray
# worker processes inherit them.
_ray_init()
pg = ray.util.placement_group([{"GPU": 1, "CPU": 1}] * 2, strategy="PACK")
ray.get(pg.ready())
# Include the project root so that RayWorkerProc actors can
# unpickle _get_env_var.
project_root = str(pathlib.Path(__file__).resolve().parents[2])
ray_runtime_env = {
"env_vars": {
"RAY_RUNTIME_ENV_TEST": "ray_runtime_env",
"PYTHONPATH": project_root,
},
}
actor = AsyncLLMActor.remote()
ray.get(actor.start.remote(pg, ray_runtime_env=ray_runtime_env))
all_env_names = list(sentinel_vars) + ["RAY_RUNTIME_ENV_TEST"]
text, env_results = ray.get(
actor.generate_and_get_worker_envs.remote("Hello world", all_env_names)
)
assert len(text) > 0
for name, expected in sentinel_vars.items():
for val in env_results[name]:
assert val == expected
for val in env_results["RAY_RUNTIME_ENV_TEST"]:
assert val == "ray_runtime_env"
finally:
for k in sentinel_vars:
os.environ.pop(k, None)

View File

@@ -0,0 +1,51 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Tests for vllm.v1.executor.ray_env_utils."""
import os
from unittest.mock import patch
from vllm.v1.executor.ray_env_utils import get_driver_env_vars
WORKER_VARS: set[str] = {
"CUDA_VISIBLE_DEVICES",
"LOCAL_RANK",
}
class TestDefaultPropagation:
"""All env vars are propagated unless explicitly excluded."""
@patch.dict(os.environ, {"NCCL_DEBUG": "INFO"}, clear=False)
def test_nccl_prefix(self):
assert get_driver_env_vars(WORKER_VARS)["NCCL_DEBUG"] == "INFO"
@patch.dict(os.environ, {"HF_TOKEN": "secret"}, clear=False)
def test_hf_token(self):
assert "HF_TOKEN" in get_driver_env_vars(WORKER_VARS)
@patch.dict(os.environ, {"LMCACHE_LOCAL_CPU": "True"}, clear=False)
def test_lmcache_prefix(self):
assert "LMCACHE_LOCAL_CPU" in get_driver_env_vars(WORKER_VARS)
@patch.dict(os.environ, {"PYTHONHASHSEED": "42"}, clear=False)
def test_pythonhashseed(self):
assert get_driver_env_vars(WORKER_VARS)["PYTHONHASHSEED"] == "42"
@patch.dict(os.environ, {"MYLIB_FOO": "bar"}, clear=False)
def test_arbitrary_var_propagated(self):
assert get_driver_env_vars(WORKER_VARS)["MYLIB_FOO"] == "bar"
class TestExclusion:
@patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0,1"}, clear=False)
def test_worker_specific_excluded(self):
assert "CUDA_VISIBLE_DEVICES" not in get_driver_env_vars(WORKER_VARS)
@patch.dict(os.environ, {"LMCACHE_LOCAL_CPU": "True"}, clear=False)
@patch(
"vllm.v1.executor.ray_env_utils.RAY_NON_CARRY_OVER_ENV_VARS",
{"LMCACHE_LOCAL_CPU"},
)
def test_non_carry_over_blacklist(self):
assert "LMCACHE_LOCAL_CPU" not in get_driver_env_vars(WORKER_VARS)

View File

@@ -0,0 +1,100 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from unittest.mock import MagicMock, patch
import pytest
from vllm.v1.executor.ray_utils import get_bundles_sorted_by_node
NODE_A = "node_a"
NODE_B = "node_b"
NODE_C = "node_c"
IP_A = "10.0.0.1"
IP_B = "10.0.0.2"
IP_C = "10.0.0.3"
NODE_ID_TO_IP = {NODE_A: IP_A, NODE_B: IP_B, NODE_C: IP_C}
MOCK_RAY_NODES = [
{"NodeID": NODE_A, "NodeManagerAddress": IP_A, "Alive": True},
{"NodeID": NODE_B, "NodeManagerAddress": IP_B, "Alive": True},
{"NodeID": NODE_C, "NodeManagerAddress": IP_C, "Alive": True},
]
@pytest.mark.parametrize(
"bundles_to_node_id,bundle_specs,expected",
[
pytest.param(
{0: NODE_C, 1: NODE_A, 2: NODE_B, 3: NODE_C, 4: NODE_A, 5: NODE_B},
[{"GPU": 1}] * 6,
[
(1, NODE_A, IP_A),
(4, NODE_A, IP_A),
(2, NODE_B, IP_B),
(5, NODE_B, IP_B),
(0, NODE_C, IP_C),
(3, NODE_C, IP_C),
],
),
pytest.param(
{0: NODE_B, 1: NODE_B, 2: NODE_A, 3: NODE_A},
[{"GPU": 1}] * 4,
[
(2, NODE_A, IP_A),
(3, NODE_A, IP_A),
(0, NODE_B, IP_B),
(1, NODE_B, IP_B),
],
),
pytest.param(
{0: NODE_C, 1: NODE_B, 2: NODE_C, 3: NODE_B},
[{"GPU": 1}] * 4,
[
(1, NODE_B, IP_B),
(3, NODE_B, IP_B),
(0, NODE_C, IP_C),
(2, NODE_C, IP_C),
],
),
pytest.param(
{0: NODE_A, 1: NODE_A, 2: NODE_A},
[{"GPU": 1}] * 3,
[(0, NODE_A, IP_A), (1, NODE_A, IP_A), (2, NODE_A, IP_A)],
),
pytest.param(
{},
[],
[],
),
pytest.param(
{0: NODE_A, 1: NODE_B, 2: NODE_A},
[{"CPU": 1}, {"GPU": 1}, {"GPU": 1}],
[(2, NODE_A, IP_A), (1, NODE_B, IP_B)],
),
],
)
def test_get_bundles_sorted_by_node(bundles_to_node_id, bundle_specs, expected):
mock_pg = MagicMock()
mock_pg.bundle_specs = bundle_specs
mock_ctx = MagicMock()
mock_ctx.get_node_id.return_value = NODE_A
with (
patch(
"vllm.v1.executor.ray_utils.placement_group_table",
return_value={"bundles_to_node_id": bundles_to_node_id},
),
patch("vllm.v1.executor.ray_utils.ray") as mock_ray,
patch("vllm.v1.executor.ray_utils.current_platform") as mock_platform,
):
mock_ray.get_runtime_context.return_value = mock_ctx
mock_ray.nodes.return_value = MOCK_RAY_NODES
mock_platform.ray_device_key = "GPU"
result = get_bundles_sorted_by_node(mock_pg)
assert result == expected

View File

@@ -59,6 +59,7 @@ if TYPE_CHECKING:
VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE: Literal["auto", "nccl", "shm"] = "auto"
VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM: bool = False
VLLM_USE_RAY_WRAPPED_PP_COMM: bool = True
VLLM_USE_RAY_V2_EXECUTOR_BACKEND: bool = False
VLLM_XLA_USE_SPMD: bool = False
VLLM_WORKER_MULTIPROC_METHOD: Literal["fork", "spawn"] = "fork"
VLLM_ASSETS_CACHE: str = os.path.join(VLLM_CACHE_ROOT, "assets")
@@ -753,6 +754,12 @@ environment_variables: dict[str, Callable[[], Any]] = {
"VLLM_USE_RAY_WRAPPED_PP_COMM": lambda: bool(
int(os.getenv("VLLM_USE_RAY_WRAPPED_PP_COMM", "1"))
),
# When True and distributed_executor_backend="ray", use RayExecutorV2
# (MQ-based) instead of RayDistributedExecutor (compiled-graph backend).
# TODO (jeffreywang): Enabled by default in vLLM 0.20.0.
"VLLM_USE_RAY_V2_EXECUTOR_BACKEND": lambda: bool(
int(os.getenv("VLLM_USE_RAY_V2_EXECUTOR_BACKEND", "0"))
),
# Use dedicated multiprocess context for workers.
# Both spawn and fork work
"VLLM_WORKER_MULTIPROC_METHOD": env_with_choices(

View File

@@ -7,6 +7,7 @@ from concurrent.futures import Future
from functools import cached_property
from typing import TYPE_CHECKING, Literal, TypeVar, overload
import vllm.envs as envs
from vllm.config import VllmConfig
from vllm.distributed.kv_transfer.kv_connector.utils import KVOutputAggregator
from vllm.distributed.kv_transfer.kv_connector.v1.base import (
@@ -57,6 +58,11 @@ class Executor(ABC):
)
executor_class = distributed_executor_backend
elif distributed_executor_backend == "ray":
if envs.VLLM_USE_RAY_V2_EXECUTOR_BACKEND:
from vllm.v1.executor.ray_executor_v2 import RayExecutorV2
executor_class = RayExecutorV2
else:
from vllm.v1.executor.ray_executor import RayDistributedExecutor
executor_class = RayDistributedExecutor

View File

@@ -0,0 +1,18 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import os
from vllm.ray.ray_env import RAY_NON_CARRY_OVER_ENV_VARS
def get_driver_env_vars(
worker_specific_vars: set[str],
) -> dict[str, str]:
"""Return driver env vars to propagate to Ray workers.
Returns everything from ``os.environ`` except ``worker_specific_vars``
and user-configured exclusions (``RAY_NON_CARRY_OVER_ENV_VARS``).
"""
exclude_vars = worker_specific_vars | RAY_NON_CARRY_OVER_ENV_VARS
return {key: value for key, value in os.environ.items() if key not in exclude_vars}

View File

@@ -23,6 +23,7 @@ from vllm.v1.core.sched.output import GrammarOutput, SchedulerOutput
from vllm.v1.engine import ReconfigureDistributedRequest, ReconfigureRankType
from vllm.v1.executor.abstract import Executor
from vllm.v1.executor.ray_utils import (
WORKER_SPECIFIC_ENV_VARS,
FutureWrapper,
RayWorkerWrapper,
initialize_ray_cluster,
@@ -62,17 +63,6 @@ class RayWorkerMetaData:
class RayDistributedExecutor(Executor):
"""Ray-based distributed executor"""
# These env vars are worker-specific, therefore are NOT copied
# from the driver to the workers
WORKER_SPECIFIC_ENV_VARS = {
"VLLM_HOST_IP",
"VLLM_HOST_PORT",
"LOCAL_RANK",
"CUDA_VISIBLE_DEVICES",
"HIP_VISIBLE_DEVICES",
"ROCR_VISIBLE_DEVICES",
}
uses_ray: bool = True
supports_pp: bool = True
@@ -335,7 +325,7 @@ class RayDistributedExecutor(Executor):
# Environment variables to copy from driver to workers
env_vars_to_copy = get_env_vars_to_copy(
exclude_vars=self.WORKER_SPECIFIC_ENV_VARS,
exclude_vars=WORKER_SPECIFIC_ENV_VARS,
additional_vars=set(current_platform.additional_env_vars),
destination="workers",
)

View File

@@ -0,0 +1,524 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import copy
import os
import threading
import weakref
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Any
import vllm.envs as envs
from vllm.config import VllmConfig
from vllm.distributed.device_communicators.shm_broadcast import (
Handle,
MessageQueue,
)
from vllm.logger import init_logger
from vllm.platforms import current_platform
from vllm.utils.network_utils import (
get_distributed_init_method,
get_open_port,
)
from vllm.v1.executor.multiproc_executor import (
FutureWrapper,
MultiprocExecutor,
WorkerProc,
)
from vllm.v1.executor.ray_env_utils import get_driver_env_vars
from vllm.v1.executor.ray_utils import (
WORKER_SPECIFIC_ENV_VARS,
build_actor_name,
get_bundles_for_indices,
get_bundles_sorted_by_node,
initialize_ray_cluster,
ray,
)
if ray is not None:
from ray.actor import ActorHandle
from ray.types import ObjectRef
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
else:
ActorHandle = None
logger = init_logger(__name__)
@dataclass
class RayWorkerHandle:
"""Handle for a Ray worker actor, compatible with MultiprocExecutor."""
actor: ActorHandle
"""Ray worker actor"""
rank: int
"""Rank of the worker"""
local_rank: int
"""Local rank of the worker"""
node_id: str
"""Node ID of the worker"""
bundle_id_idx: int = -1
"""Placement group bundle index for the worker"""
run_ref: ObjectRef | None = None
"""run() ObjectRef used as a sentinel for health monitoring"""
def run(self):
"""Start the worker's busy loop"""
self.run_ref = self.actor.run.remote()
class RayWorkerProc(WorkerProc):
"""Worker process that runs inside a Ray actor.
Initialization is split into two phases:
1. __init__: lightweight setup, stores init args (no device/model init)
2. initialize_worker: called after GPU IDs are discovered, completes
the full WorkerProc initialization with the correct local_rank and
CUDA_VISIBLE_DEVICES.
CUDA_VISIBLE_DEVICES setup flow:
1. RayExecutorV2 enables RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES so Ray does
not set CUDA_VISIBLE_DEVICES on RayWorkerProc actors at creation time.
2. Each actor is scheduled with a placement group and bundle index; Ray resolves
the physical GPU ID for that bundle at placement time.
3. After placement, the worker discovers that GPU ID and sets
CUDA_VISIBLE_DEVICES before finishing WorkerProc initialization.
There is no workaround for this unset-and-reset sequence when the placement group
is externally managed: scheduling must complete before CUDA_VISIBLE_DEVICES can
match the GPU tied to the worker's bundle.
This sequence allows multiple vLLM instances to coexist on the same node:
each instance is unaware which physical devices others hold, and the
externally managed placement group avoids CUDA_VISIBLE_DEVICES conflicts
by binding workers to specific placement group bundles.
"""
def __init__(
self,
vllm_config: VllmConfig,
rank: int,
distributed_init_method: str,
input_shm_handle: Handle,
is_driver_worker: bool,
is_driver_node: bool = False,
):
# Defer WorkerProc.__init__ until GPU IDs are known.
self._is_driver_node = is_driver_node
self._init_kwargs = dict(
vllm_config=vllm_config,
rank=rank,
distributed_init_method=distributed_init_method,
input_shm_handle=input_shm_handle,
shared_worker_lock=None,
is_driver_worker=is_driver_worker,
)
def get_node_and_gpu_ids(self) -> tuple[str, list[int]]:
"""Return (node_id, gpu_ids) assigned to this actor by Ray."""
node_id = ray.get_runtime_context().get_node_id()
device_key = current_platform.ray_device_key
if not device_key:
raise RuntimeError(
f"current platform {current_platform.device_name} does not support ray."
)
gpu_ids = ray.get_runtime_context().get_accelerator_ids()[device_key]
return node_id, [int(x) for x in gpu_ids]
def initialize_worker(
self,
local_rank: int,
env_vars: dict[str, str],
driver_env_vars: dict[str, str] | None = None,
) -> None:
"""Complete initialization after GPU assignment is known.
*driver_env_vars* are applied with ``setdefault`` — they fill
in missing vars but never overwrite node-local values.
*env_vars* (e.g. CUDA_VISIBLE_DEVICES) always overwrite.
"""
if driver_env_vars:
for key, value in driver_env_vars.items():
os.environ.setdefault(key, value)
for key, value in env_vars.items():
os.environ[key] = value
self.local_rank = local_rank
super().__init__(
local_rank=local_rank,
**self._init_kwargs,
)
def _init_message_queues(
self, input_shm_handle: Handle, vllm_config: VllmConfig
) -> None:
"""
Workers on the same node as the executor use shared memory for
both the broadcast (input) MQ and the response MQ. Workers on
different nodes use TCP (n_local_reader=0).
"""
self.rpc_broadcast_mq = MessageQueue.create_from_handle(
input_shm_handle, self.worker.rank
)
n_local = 1 if self._is_driver_node else 0
# Use ray.util.get_node_ip_address() to get Ray's internal IP.
# get_ip() returns host's external IP which is typically not
# routable between nodes within the cluster.
self.worker_response_mq = MessageQueue(
n_reader=1,
n_local_reader=n_local,
connect_ip=ray.util.get_node_ip_address(),
)
self.peer_response_handles: list[dict] = []
def wait_for_init(self) -> dict:
"""Respond to the driver's wait_until_ready() barrier."""
assert self.worker_response_mq is not None
return {
"status": self.READY_STR,
"handle": self.worker_response_mq.export_handle(),
}
def run(self) -> None:
"""Main entry point called via actor.run.remote()."""
try:
assert self.rpc_broadcast_mq is not None
self.rpc_broadcast_mq.wait_until_ready()
assert self.worker_response_mq is not None
self.worker_response_mq.wait_until_ready()
self.worker_busy_loop()
except Exception as e:
logger.exception("RayWorkerProc failed: %s", e)
raise
finally:
self.shutdown()
class RayExecutorV2(MultiprocExecutor):
"""Ray-based distributed executor using MessageQueue communication.
Inherits from MultiprocExecutor to reuse the MQ-based control plane
and NCCL data plane. Workers are Ray actors.
Async scheduling is enabled, inherited from MultiprocExecutor.
This is cricitcal for RayExecutorV2 to be performant.
"""
uses_ray: bool = True
supports_pp: bool = True
def __init__(self, vllm_config: VllmConfig):
super().__init__(vllm_config)
def _build_runtime_env(self) -> dict:
"""Build a runtime_env dict for RayWorkerProc actors.
Driver env vars are applied separately via initialize_worker
with setdefault semantics.
"""
base = self.parallel_config.ray_runtime_env
runtime_env: dict = copy.deepcopy(dict(base)) if base else {}
env_vars = runtime_env.setdefault("env_vars", {})
env_vars.update({v: "1" for v in current_platform.ray_noset_device_env_vars})
if self.parallel_config.ray_workers_use_nsight:
runtime_env["nsight"] = {
"t": "cuda,cudnn,cublas",
"o": "'worker_process_%p'",
"cuda-graph-trace": "node",
}
return runtime_env
@staticmethod
def _get_actor_resource_kwargs() -> dict[str, Any]:
"""Return Ray actor resource kwargs for the current platform."""
num_devices = envs.VLLM_RAY_PER_WORKER_GPUS
device_key = current_platform.ray_device_key
if device_key == "GPU":
return {"num_gpus": num_devices}
return {"num_gpus": 0, "resources": {device_key: num_devices}}
def _init_executor(self) -> None:
"""Initialize the RayExecutorV2 executor."""
self._finalizer = weakref.finalize(self, self.shutdown)
self.is_failed = False
self.failure_callback = None
self.shutting_down = False
self.shutdown_lock = threading.Lock()
# Step 1: Initialize Ray cluster and retrieve placement group
if ray is None:
raise ImportError("Using Ray backend requires installation of ray.")
initialize_ray_cluster(self.parallel_config, require_gpu_on_driver=False)
placement_group = self.parallel_config.placement_group
tp_size, pp_size, pcp_size = self._get_parallel_sizes()
assert self.world_size == tp_size * pp_size * pcp_size, (
f"world_size ({self.world_size}) must be equal to the "
f"tensor_parallel_size ({tp_size}) x pipeline"
f"_parallel_size ({pp_size}) x prefill_context"
f"_parallel_size ({pcp_size}). "
)
# Step 2: Build bundle assignments for worker rank placement
# while respecting VLLM_RAY_BUNDLE_INDICES.
if envs.VLLM_RAY_BUNDLE_INDICES:
bundle_to_node_id = get_bundles_for_indices(
placement_group,
list(map(int, envs.VLLM_RAY_BUNDLE_INDICES.split(","))),
self.world_size,
)
else:
bundle_to_node_id = get_bundles_sorted_by_node(placement_group)
driver_node = ray.get_runtime_context().get_node_id()
bundle_assignments: list[dict[str, Any]] = []
for rank, (bundle_id_idx, node_id, node_ip) in enumerate(bundle_to_node_id):
bundle_assignments.append(
{
"rank": rank,
"bundle_id_idx": bundle_id_idx,
"node_id": node_id,
"node_ip": node_ip,
}
)
# Step 3: Resolve the IP for torch.distributed TCPStore.
# The TCPStore server runs on rank 0's node, so all workers
# must be able to reach this address.
dist_ip = bundle_assignments[0]["node_ip"]
distributed_init_method = get_distributed_init_method(dist_ip, get_open_port())
# Step 4: Create broadcast MessageQueue.
# Workers on the driver node use shared memory; the rest use TCP.
max_chunk_bytes = envs.VLLM_MQ_MAX_CHUNK_BYTES_MB * 1024 * 1024
n_local = sum(1 for a in bundle_assignments if a["node_id"] == driver_node)
self.rpc_broadcast_mq = MessageQueue(
self.world_size,
n_local,
max_chunk_bytes=max_chunk_bytes,
connect_ip=ray.util.get_node_ip_address(),
)
scheduler_output_handle = self.rpc_broadcast_mq.export_handle()
# Step 5: Spawn RayWorkerProc actors into PG bundles (deferred init).
# Workers are created lightweight here; full initialization happens
# in Step 7 after GPU IDs are discovered.
self.ray_worker_handles: list[RayWorkerHandle] = []
instance_id = self.vllm_config.instance_id
# Collect driver env vars and apply but don't overwrite node-local values.
self.driver_env_vars = get_driver_env_vars(
worker_specific_vars=WORKER_SPECIFIC_ENV_VARS,
)
runtime_env = self._build_runtime_env()
resource_kwargs = self._get_actor_resource_kwargs()
for bundle_idx in range(self.world_size):
bundle = bundle_assignments[bundle_idx]
is_driver_worker = self._is_driver_worker(bundle["rank"])
is_driver_node = bundle["node_id"] == driver_node
scheduling_strategy = PlacementGroupSchedulingStrategy(
placement_group=placement_group,
placement_group_bundle_index=bundle["bundle_id_idx"],
)
actor_name = build_actor_name(
instance_id, bundle["rank"], tp_size, pp_size, pcp_size
)
actor = (
ray.remote(RayWorkerProc)
.options(
name=actor_name,
num_cpus=0,
**resource_kwargs,
scheduling_strategy=scheduling_strategy,
runtime_env=runtime_env,
)
.remote(
vllm_config=self.vllm_config,
rank=bundle["rank"],
distributed_init_method=distributed_init_method,
input_shm_handle=scheduler_output_handle,
is_driver_worker=is_driver_worker,
is_driver_node=is_driver_node,
)
)
handle = RayWorkerHandle(
actor=actor,
rank=bundle["rank"],
local_rank=-1, # Set in Step 7 after GPU ID discovery
node_id=bundle["node_id"],
bundle_id_idx=bundle["bundle_id_idx"],
)
self.ray_worker_handles.append(handle)
# Step 6: Discover GPU IDs assigned to each worker via Ray runtime context.
worker_node_and_gpu_ids = ray.get(
[h.actor.get_node_and_gpu_ids.remote() for h in self.ray_worker_handles]
)
node_workers: dict[str, list[int]] = defaultdict(list)
node_gpus: dict[str, list[int]] = defaultdict(list)
for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids):
node_workers[node_id].append(i)
node_gpus[node_id].extend(gpu_ids)
for node_id, gpu_ids in node_gpus.items():
node_gpus[node_id] = sorted(gpu_ids)
# Step 7: Initialize workers with correct local_rank and
# CUDA_VISIBLE_DEVICES. Each worker sees all GPUs assigned to
# this executor on its node; local_rank indexes into that set.
init_worker_refs = []
for i, (node_id, _) in enumerate(worker_node_and_gpu_ids):
local_rank = node_workers[node_id].index(i)
worker_env_vars = {
current_platform.device_control_env_var: ",".join(
map(str, node_gpus[node_id])
),
}
self.ray_worker_handles[i].local_rank = local_rank
init_worker_refs.append(
self.ray_worker_handles[i].actor.initialize_worker.remote(
local_rank, worker_env_vars, self.driver_env_vars
)
)
ray.get(init_worker_refs)
# Step 8: Collect response MQ handles
init_results = ray.get(
[h.actor.wait_for_init.remote() for h in self.ray_worker_handles]
)
self.response_mqs: list[MessageQueue] = []
for i, result in enumerate(init_results):
if result["status"] != RayWorkerProc.READY_STR:
raise RuntimeError(f"Worker {i} failed to initialize: {result}")
self.response_mqs.append(
MessageQueue.create_from_handle(result["handle"], 0)
)
# Step 9: Start run() before wait_until_ready() to avoid
# deadlock — workers send subscriptions inside run().
for handle in self.ray_worker_handles:
handle.run()
# Step 10: wait_until_ready() barrier
self.rpc_broadcast_mq.wait_until_ready()
for response_mq in self.response_mqs:
response_mq.wait_until_ready()
self.futures_queue = deque[tuple[FutureWrapper, Any]]()
self._post_init_executor()
self.start_worker_monitor()
self.output_rank = self._get_output_rank()
def start_worker_monitor(self, inline=False) -> None:
"""Monitor worker liveness via ray.wait() on run() ObjectRefs."""
run_refs = [h.run_ref for h in self.ray_worker_handles if h.run_ref is not None]
if not run_refs:
raise RuntimeError("Ray workers have not started successfully.")
self_ref = weakref.ref(self)
ref_to_rank = {
h.run_ref: h.rank for h in self.ray_worker_handles if h.run_ref is not None
}
def _should_stop() -> bool:
executor = self_ref()
return not executor or executor.shutting_down
def monitor_workers():
# Poll with a timeout rather than blocking on ray.wait()
# because a blocking call would segfault if Ray is torn down
# while this thread is inside it.
while not _should_stop() and ray.is_initialized():
try:
done, _ = ray.wait(run_refs, num_returns=1, timeout=5.0)
except Exception:
logger.exception(
"RayWorkerMonitor: unexpected error, exiting monitor thread"
)
return
if not done or _should_stop():
continue
dead_ranks = [ref_to_rank[r] for r in done]
executor = self_ref()
if not executor:
return
executor.is_failed = True
logger.error(
"RayWorkerProc rank=%s died unexpectedly, shutting down executor.",
dead_ranks,
)
executor.shutdown()
if executor.failure_callback is not None:
callback = executor.failure_callback
executor.failure_callback = None
callback()
return
t = threading.Thread(
target=monitor_workers, daemon=True, name="RayWorkerMonitor"
)
t.start()
self._monitor_thread = t
def _join_monitor_thread(self) -> None:
"""Wait for the monitor thread to exit.
Must be called before tearing down Ray resources — the monitor
may be inside ray.wait() which would segfault if Ray is shut
down underneath it. When the monitor itself calls shutdown()
on worker death, we skip the join because the thread is about
to return anyway.
"""
monitor = getattr(self, "_monitor_thread", None)
if (
monitor is not None
and monitor.is_alive()
and threading.current_thread() is not monitor
):
monitor.join(timeout=10)
def shutdown(self) -> None:
"""Properly shut down the executor and its workers."""
lock = getattr(self, "shutdown_lock", None)
if lock is None:
return
with lock:
if getattr(self, "shutting_down", False):
return
self.shutting_down = True
self._join_monitor_thread()
for handle in getattr(self, "ray_worker_handles", []):
try:
ray.kill(handle.actor)
logger.debug("Killed actor rank=%d", handle.rank)
except Exception:
logger.exception("Failed to kill actor rank=%d", handle.rank)
if rpc_broadcast_mq := getattr(self, "rpc_broadcast_mq", None):
rpc_broadcast_mq.shutdown()
self.rpc_broadcast_mq = None
for mq in getattr(self, "response_mqs", []):
mq.shutdown()
self.response_mqs = []

View File

@@ -26,6 +26,17 @@ if TYPE_CHECKING:
logger = init_logger(__name__)
PG_WAIT_TIMEOUT = 1800
# Env vars that are worker-specific and must NOT be copied from the
# driver to Ray workers — they are set per-worker after GPU discovery.
WORKER_SPECIFIC_ENV_VARS: set[str] = {
"VLLM_HOST_IP",
"VLLM_HOST_PORT",
"LOCAL_RANK",
"CUDA_VISIBLE_DEVICES",
"HIP_VISIBLE_DEVICES",
"ROCR_VISIBLE_DEVICES",
}
try:
import ray
from ray.util import placement_group_table
@@ -51,6 +62,8 @@ try:
# that thread.
self.compiled_dag_cuda_device_set = False
rpc_rank: int
def adjust_rank(self, rank_mapping: dict[int, int]) -> None:
"""
Adjust the rpc_rank based on the given mapping.
@@ -214,13 +227,17 @@ def assert_ray_available():
def _verify_bundles(
placement_group: "PlacementGroup", parallel_config: ParallelConfig, device_str: str
placement_group: "PlacementGroup",
parallel_config: ParallelConfig,
device_str: str,
require_gpu_on_driver: bool = True,
):
"""Verify a given placement group has bundles located in the right place.
There are 2 rules.
- Warn if all tensor parallel workers cannot fit in a single node.
- Fail if driver node is not included in a placement group.
- Fail if driver node is not included in a placement group
(only when require_gpu_on_driver is True).
"""
assert ray.is_initialized(), (
"Ray is not initialized although distributed-executor-backend is ray."
@@ -237,7 +254,7 @@ def _verify_bundles(
node_id_to_bundle[node_id].append(bundles[bundle_idx])
driver_node_id = ray.get_runtime_context().get_node_id()
if driver_node_id not in node_id_to_bundle:
if require_gpu_on_driver and driver_node_id not in node_id_to_bundle:
raise RuntimeError(
f"driver node id {driver_node_id} is not included in a placement "
f"group {placement_group.id}. Node id -> bundles "
@@ -266,6 +283,115 @@ def _verify_bundles(
)
def build_actor_name(
instance_id: str,
rank: int,
tp_size: int,
pp_size: int,
pcp_size: int,
) -> str:
"""Build a descriptive Ray actor name for dashboard visibility."""
name = f"vllm_Worker_{instance_id}"
if tp_size > 1:
name += f"_TP{rank % tp_size}"
if pp_size > 1:
name += f"_PP{(rank // tp_size) % pp_size}"
if pcp_size > 1:
name += f"_PCP{rank // (tp_size * pp_size)}"
return name
def get_bundles_for_indices(
placement_group: "PlacementGroup",
bundle_indices: list[int],
world_size: int,
) -> list[tuple[int, str, str]]:
"""
Return GPU bundle indices paired with node IDs and node IPs for
explicit bundle indices specified via VLLM_RAY_BUNDLE_INDICES.
"""
assert len(bundle_indices) == world_size, (
"VLLM_RAY_BUNDLE_INDICES must have the same size"
f" as the world size, but got {bundle_indices=} "
f"and {world_size=}"
)
assert len(set(bundle_indices)) == len(bundle_indices), (
"VLLM_RAY_BUNDLE_INDICES cannot have duplicate values,"
f" but got {bundle_indices=}"
)
pg_data = placement_group_table(placement_group)
pg_bundle_to_node = pg_data["bundles_to_node_id"]
node_id_to_ip = {
n["NodeID"]: n["NodeManagerAddress"] for n in ray.nodes() if n["Alive"]
}
return [
(bid, pg_bundle_to_node[bid], node_id_to_ip[pg_bundle_to_node[bid]])
for bid in bundle_indices
]
def get_bundles_sorted_by_node(
placement_group: "PlacementGroup",
) -> list[tuple[int, str, str]]:
"""
Return GPU bundle indices paired with node IDs and node IPs,
sorted driver-first.
This utility has to be invoked from the driver node.
Example: 3-node cluster, driver on node-A, PG bundles spread
across nodes:
Input: [
(0, node-C),
(1, node-A),
(2, node-B),
(3, node-C),
(4, node-A),
(5, node-B),
]
Output: [
(1, node-A),
(4, node-A),
(2, node-B),
(5, node-B),
(0, node-C),
(3, node-C),
]
"""
pg_data = placement_group_table(placement_group)
bundle_to_node = pg_data["bundles_to_node_id"]
ray_device_key = current_platform.ray_device_key
if not ray_device_key:
raise ValueError(
f"current platform {current_platform.device_name} does not support ray."
)
node_id_to_ip = {
n["NodeID"]: n["NodeManagerAddress"] for n in ray.nodes() if n["Alive"]
}
bundle_specs = placement_group.bundle_specs
assert bundle_specs is not None
bundle_to_node_id: list[tuple[int, str, str]] = []
for bundle_idx, bundle in enumerate(bundle_specs):
if bundle.get(ray_device_key):
node_id = bundle_to_node.get(bundle_idx)
bundle_to_node_id.append((bundle_idx, node_id, node_id_to_ip[node_id]))
driver_node = ray.get_runtime_context().get_node_id()
def _sort_key(item):
_, node_id, _ = item
return (0 if node_id == driver_node else 1, node_id)
bundle_to_node_id.sort(key=_sort_key)
return bundle_to_node_id
def _wait_until_pg_ready(current_placement_group: "PlacementGroup"):
"""Wait until a placement group is ready.
@@ -352,6 +478,7 @@ def _wait_until_pg_removed(current_placement_group: "PlacementGroup"):
def initialize_ray_cluster(
parallel_config: ParallelConfig,
ray_address: str | None = None,
require_gpu_on_driver: bool = True,
):
"""Initialize the distributed cluster with Ray.
@@ -363,10 +490,18 @@ def initialize_ray_cluster(
parallel_config: The configurations for parallel execution.
ray_address: The address of the Ray cluster. If None, uses
the default Ray cluster address.
require_gpu_on_driver: If True (default), require at least one GPU
on the current (driver) node and pin the first PG bundle to it.
Set to False for executors like RayExecutorV2 where all GPU work
is delegated to remote Ray actors.
"""
assert_ray_available()
from vllm.platforms import current_platform
# Disable Ray usage stats collection
if os.environ.get("RAY_USAGE_STATS_ENABLED", "0") != "1":
os.environ["RAY_USAGE_STATS_ENABLED"] = "0"
# Prevalidate GPU requirements before Ray processing
if current_platform.is_cuda() and parallel_config.world_size > 1:
available_gpus = current_platform.device_count()
@@ -459,15 +594,19 @@ def initialize_ray_cluster(
current_ip = get_ip()
current_node_id = ray.get_runtime_context().get_node_id()
current_node_resource = available_resources_per_node()[current_node_id]
# TODO (jeffreywang): require_gpu_on_driver should be always False
# after deprecating RayDistributedExecutor.
if require_gpu_on_driver:
if current_node_resource.get(device_str, 0) < 1:
raise ValueError(
f"Current node has no {device_str} available. "
f"{current_node_resource=}. vLLM engine cannot start without "
f"{device_str}. Make sure you have at least 1 {device_str} "
f"available in a node {current_node_id=} {current_ip=}."
f"{current_node_resource=}. vLLM engine cannot start "
f"without {device_str}. Make sure you have at least 1 "
f"{device_str} available in a node "
f"{current_node_id=} {current_ip=}."
)
# This way, at least bundle is required to be created in a current
# node.
# This way, at least bundle is required to be created in a
# current node.
placement_group_specs[0][f"node:{current_ip}"] = 0.001
# By default, Ray packs resources as much as possible.
@@ -477,7 +616,9 @@ def initialize_ray_cluster(
_wait_until_pg_ready(current_placement_group)
assert current_placement_group is not None
_verify_bundles(current_placement_group, parallel_config, device_str)
_verify_bundles(
current_placement_group, parallel_config, device_str, require_gpu_on_driver
)
# Set the placement group in the parallel config
parallel_config.placement_group = current_placement_group

View File

@@ -195,8 +195,8 @@ class WorkerWrapperBase:
All workers have rpc_rank=0, but they have different ranks in the TP
group.
"""
self.rpc_rank = rpc_rank
self.global_rank = self.rpc_rank if global_rank is None else global_rank
self.rpc_rank: int = rpc_rank
self.global_rank: int = self.rpc_rank if global_rank is None else global_rank
# Initialized after init_worker is called
self.worker: WorkerBase