[Core] Multiprocessing Pipeline Parallel support (#6130)
Co-authored-by: Murali Andoorveedu <muralidhar.andoorveedu@centml.ai>
This commit is contained in:
@@ -10,7 +10,8 @@ from vllm.executor.distributed_gpu_executor import ( # yapf: disable
|
||||
from vllm.executor.ray_utils import RayWorkerWrapper, ray
|
||||
from vllm.logger import init_logger
|
||||
from vllm.sequence import ExecuteModelRequest, SamplerOutput
|
||||
from vllm.utils import (error_on_invalid_device_count_status,
|
||||
from vllm.utils import (_run_task_with_lock,
|
||||
error_on_invalid_device_count_status,
|
||||
get_distributed_init_method, get_ip, get_open_port,
|
||||
get_vllm_instance_id, make_async)
|
||||
|
||||
@@ -240,27 +241,14 @@ class RayGPUExecutor(DistributedGPUExecutor):
|
||||
# broadcasted to.
|
||||
self.non_driver_workers: List[RayWorkerWrapper] = []
|
||||
|
||||
tp_driver_worker_ranks = []
|
||||
non_driver_worker_ranks = []
|
||||
for idx, rank in enumerate(worker_ranks[1:]):
|
||||
# Enforce rank order for correct rank to return final output.
|
||||
for rank, worker in sorted(zip(worker_ranks[1:], self.workers)):
|
||||
# We need to skip the driver worker, which we
|
||||
# do by skipping worker_ranks[0] which is always 0.
|
||||
if rank % self.parallel_config.tensor_parallel_size == 0:
|
||||
self.tp_driver_workers.append(self.workers[idx])
|
||||
tp_driver_worker_ranks.append(rank)
|
||||
self.tp_driver_workers.append(worker)
|
||||
else:
|
||||
self.non_driver_workers.append(self.workers[idx])
|
||||
non_driver_worker_ranks.append(rank)
|
||||
|
||||
# Enforce rank order for correct rank to return final output.
|
||||
self.tp_driver_workers = [
|
||||
worker for _, worker in sorted(
|
||||
zip(tp_driver_worker_ranks, self.tp_driver_workers))
|
||||
]
|
||||
self.non_driver_workers = [
|
||||
worker for _, worker in sorted(
|
||||
zip(non_driver_worker_ranks, self.non_driver_workers))
|
||||
]
|
||||
self.non_driver_workers.append(worker)
|
||||
|
||||
def _driver_execute_model(
|
||||
self, execute_model_req: Optional[ExecuteModelRequest]
|
||||
@@ -413,6 +401,7 @@ class RayGPUExecutorAsync(RayGPUExecutor, DistributedGPUExecutorAsync):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.pp_locks: Optional[List[asyncio.Lock]] = None
|
||||
self.use_ray_spmd_worker = envs.VLLM_USE_RAY_SPMD_WORKER
|
||||
if not self.use_ray_compiled_dag:
|
||||
self.driver_exec_method = make_async(
|
||||
@@ -437,6 +426,9 @@ class RayGPUExecutorAsync(RayGPUExecutor, DistributedGPUExecutorAsync):
|
||||
) -> List[SamplerOutput]:
|
||||
assert not self.use_ray_spmd_worker, (
|
||||
"driver_worker does not exist for VLLM_USE_RAY_SPMD_WORKER=1")
|
||||
if not self.tp_driver_workers:
|
||||
return await self.driver_exec_method("execute_model",
|
||||
execute_model_req)
|
||||
if self.pp_locks is None:
|
||||
# This locks each pipeline parallel stage so multiple virtual
|
||||
# engines can't execute on the same stage at the same time
|
||||
@@ -447,15 +439,11 @@ class RayGPUExecutorAsync(RayGPUExecutor, DistributedGPUExecutorAsync):
|
||||
for _ in range(self.parallel_config.pipeline_parallel_size)
|
||||
]
|
||||
|
||||
async def _run_task_with_lock(task, lock, *args, **kwargs):
|
||||
async with lock:
|
||||
return await task(*args, **kwargs)
|
||||
|
||||
tasks = []
|
||||
tasks.append(
|
||||
tasks = [
|
||||
asyncio.create_task(
|
||||
_run_task_with_lock(self.driver_exec_method, self.pp_locks[0],
|
||||
"execute_model", execute_model_req)))
|
||||
"execute_model", execute_model_req))
|
||||
]
|
||||
for pp_rank, driver_worker in enumerate(self.tp_driver_workers,
|
||||
start=1):
|
||||
tasks.append(
|
||||
|
||||
Reference in New Issue
Block a user