[core][distributed] exact ray placement control (#12732)
Signed-off-by: youkaichao <youkaichao@gmail.com>
This commit is contained in:
@@ -129,13 +129,7 @@ class RayDistributedExecutor(DistributedExecutorBase):
|
||||
|
||||
def _init_workers_ray(self, placement_group: "PlacementGroup",
|
||||
**ray_remote_kwargs):
|
||||
if (self.parallel_config.tensor_parallel_size == 1
|
||||
and self.parallel_config.pipeline_parallel_size == 1):
|
||||
# For single GPU case, we use a ray worker with constrained memory.
|
||||
num_gpus = self.cache_config.gpu_memory_utilization
|
||||
else:
|
||||
# Otherwise, the ray workers are allocated with a full GPU.
|
||||
num_gpus = 1
|
||||
num_gpus = envs.VLLM_RAY_PER_WORKER_GPUS
|
||||
|
||||
# The driver dummy worker does not actually use any resources.
|
||||
# It holds the resource for the driver worker.
|
||||
@@ -155,12 +149,29 @@ class RayDistributedExecutor(DistributedExecutorBase):
|
||||
logger.info("use_ray_spmd_worker: %s", self.use_ray_spmd_worker)
|
||||
|
||||
# Create the workers.
|
||||
driver_ip = get_ip()
|
||||
rank = 0
|
||||
bundle_indices: List[int]
|
||||
if envs.VLLM_RAY_BUNDLE_INDICES:
|
||||
# Use the bundle indices specified by the user.
|
||||
bundle_indices = list(
|
||||
map(int, envs.VLLM_RAY_BUNDLE_INDICES.split(",")))
|
||||
assert len(bundle_indices) == self.parallel_config.world_size, \
|
||||
("VLLM_RAY_BUNDLE_INDICES must have the same size"
|
||||
f" as the world size, but got {bundle_indices=} "
|
||||
f"and {self.parallel_config.world_size=}")
|
||||
assert len(set(bundle_indices)) == len(bundle_indices), \
|
||||
("VLLM_RAY_BUNDLE_INDICES cannot have duplicate values,"
|
||||
f" but got {bundle_indices=}")
|
||||
else:
|
||||
# use the first N bundles that have GPU resources.
|
||||
bundle_indices = []
|
||||
for bundle_id, bundle in enumerate(placement_group.bundle_specs):
|
||||
if bundle.get(current_platform.ray_device_key, 0):
|
||||
bundle_indices.append(bundle_id)
|
||||
bundle_indices = bundle_indices[:self.parallel_config.world_size]
|
||||
|
||||
worker_metadata: List[RayWorkerMetaData] = []
|
||||
for bundle_id, bundle in enumerate(placement_group.bundle_specs):
|
||||
if not bundle.get(current_platform.ray_device_key, 0):
|
||||
continue
|
||||
driver_ip = get_ip()
|
||||
for rank, bundle_id in enumerate(bundle_indices):
|
||||
scheduling_strategy = PlacementGroupSchedulingStrategy(
|
||||
placement_group=placement_group,
|
||||
placement_group_capture_child_tasks=True,
|
||||
@@ -187,7 +198,6 @@ class RayDistributedExecutor(DistributedExecutorBase):
|
||||
rpc_rank=rank)
|
||||
worker_metadata.append(
|
||||
RayWorkerMetaData(worker=worker, created_rank=rank))
|
||||
rank += 1
|
||||
|
||||
worker_ips = ray.get([
|
||||
each.worker.get_node_ip.remote() # type: ignore[attr-defined]
|
||||
|
||||
Reference in New Issue
Block a user