[Data-parallel] Allow DP>1 for world_size > num_gpus on node (8) (#26367)

Signed-off-by: Patrick von Platen <patrick.v.platen@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Rui Qiao <ruisearch42@gmail.com>
This commit is contained in:
Patrick von Platen
2025-10-17 17:24:42 +02:00
committed by GitHub
parent 2ba60ec7fe
commit b038d9c40c
4 changed files with 96 additions and 22 deletions

View File

@@ -345,6 +345,7 @@ class CoreEngineActorManager:
world_size = vllm_config.parallel_config.world_size
placement_groups: list[PlacementGroup] = []
local_dp_ranks: list[int] = []
dp_master_ip_key = f"node:{dp_master_ip}"
nodes = sorted(
available_resources.values(), key=lambda x: dp_master_ip_key not in x
@@ -355,9 +356,25 @@ class CoreEngineActorManager:
dp_master_ip,
)
device_str = current_platform.ray_device_key
n_node_devices: list[int] = [
int(node_resources[device_str])
for node_resources in nodes
if device_str in node_resources
]
assert n_node_devices, f"No {device_str} found in Ray cluster."
max_device_per_node = max(n_node_devices)
pack_strategy = envs.VLLM_RAY_DP_PACK_STRATEGY
_supported_pack_strategies = ("strict", "fill", "span")
if pack_strategy not in _supported_pack_strategies:
raise ValueError(
f"{envs.VLLM_RAY_DP_PACK_STRATEGY} is not supported. "
"Make sure to set `VLLM_RAY_DP_PACK_STRATEGY` "
f"to one of {_supported_pack_strategies}"
)
all2all_backend = vllm_config.parallel_config.all2all_backend
if envs.VLLM_RAY_DP_PACK_STRATEGY == "fill" and (
if pack_strategy == "fill" and (
all2all_backend == "deepep_high_throughput"
or all2all_backend == "deepep_low_latency"
):
@@ -367,12 +384,42 @@ class CoreEngineActorManager:
"does not guarantee that. "
"Please use VLLM_RAY_DP_PACK_STRATEGY=strict instead."
)
logger.info(
"Using '%s' DP packing strategy based on VLLM_RAY_DP_PACK_STRATEGY",
envs.VLLM_RAY_DP_PACK_STRATEGY,
)
strict_local_size = envs.VLLM_RAY_DP_PACK_STRATEGY == "strict"
if pack_strategy in ("strict", "fill"):
placement_strategy = "STRICT_PACK"
else:
placement_strategy = "PACK"
assert world_size > max_device_per_node, (
f"World size {world_size} is smaller than the "
"maximum number of devices per node "
f"{max_device_per_node}. Make sure to set "
"`VLLM_RAY_DP_PACK_STRATEGY` to `strict` or `fill`"
)
# if we need multiple nodes per dp group, we require for now that
# available nodes are homogenous
assert set(n_node_devices) == {max_device_per_node}, (
f"Nodes are not homogenous, {nodes}"
)
assert world_size % max_device_per_node == 0, (
f"For multi-node data parallel groups, world_size ({world_size}) must "
f"be a multiple of number of devices per node ({max_device_per_node})."
)
assert len(n_node_devices) * max_device_per_node >= world_size * dp_size, (
f"Not enough total available nodes ({len(n_node_devices)}) "
f"and devices per node ({max_device_per_node}) "
f"to satisfy required world size {world_size} and data parallel size "
f"{dp_size}"
)
assert dp_size_local == 1, (
f"data-parallel-size-local {dp_size_local} should be set as the "
"default (1) for VLLM_RAY_DP_PACK_STRATEGY=span. "
"The actual data-parallel-size-local will be auto determined."
)
# bundles collected for a single DP rank from multiple nodes,
# for "span" pack strategy
collected_bundles = []
for node_resources in nodes:
node_ip_keys = [
key
@@ -386,14 +433,14 @@ class CoreEngineActorManager:
node_ip_key = node_ip_keys[0]
node_ip = node_ip_key.split(":")[1]
# For now, each DP rank can only be assigned to one node
# TODO(rui): support allocating a single DP rank
# to multiple nodes
dp_size_available = (
int(node_resources[device_str]) // world_size
if device_str in node_resources
else 0
)
n_device_on_node = int(node_resources.get(device_str, 0))
if pack_strategy == "span" and n_device_on_node != 0:
# Strictly speaking,
# dp_size_available = n_device_on_node / world_size
# and is a fraction, but we use 1 for easier processing
dp_size_available = 1
else:
dp_size_available = n_device_on_node // world_size
if node_ip == dp_master_ip:
if dp_size_available < dp_size_local:
@@ -405,7 +452,7 @@ class CoreEngineActorManager:
dp_size_available,
)
dp_size_to_allocate = dp_size_local
elif strict_local_size:
elif pack_strategy == "strict":
if dp_size_available < dp_size_local:
logger.info(
"Skipping node %s as %s DP ranks could not fit, "
@@ -417,15 +464,31 @@ class CoreEngineActorManager:
continue
dp_size_to_allocate = dp_size_local
else:
# for "pack_strategy" in "fill" and "span"
# we always take everything that's available
dp_size_to_allocate = dp_size_available
for i in range(dp_size_to_allocate):
bundles = [{device_str: 1.0, "node:" + node_ip: 0.001}] * world_size + [
{"CPU": 1.0}
]
device_bundle = [{device_str: 1.0, "node:" + node_ip: 0.001}]
if pack_strategy == "span":
collected_bundles += device_bundle * n_device_on_node
assert len(collected_bundles) <= world_size, (
"collected_bundles should be <= world_size, "
f"but got {len(collected_bundles)=} and {world_size=}"
)
# we only create a placement group if we collected enough devices
if len(collected_bundles) < world_size:
continue
bundles = collected_bundles + [{"CPU": 1.0}]
collected_bundles = []
else:
bundles = device_bundle * world_size + [{"CPU": 1.0}]
pg = ray.util.placement_group(
name=f"dp_rank_{len(placement_groups)}",
strategy="STRICT_PACK",
strategy=placement_strategy,
bundles=bundles,
)
placement_groups.append(pg)