import asyncio import copy import os import pickle from collections import defaultdict from typing import TYPE_CHECKING, Any, Dict, List, Optional from vllm.config import (CacheConfig, DeviceConfig, LoRAConfig, ModelConfig, ParallelConfig, SchedulerConfig, VisionLanguageConfig) from vllm.engine.ray_utils import RayWorkerVllm, ray from vllm.executor.executor_base import ExecutorAsyncBase, ExecutorBase from vllm.executor.utils import check_block_size_valid from vllm.logger import init_logger from vllm.lora.request import LoRARequest from vllm.sequence import SamplerOutput, SequenceGroupMetadata from vllm.utils import (get_distributed_init_method, get_ip, get_open_port, make_async, set_cuda_visible_devices) if ray is not None: from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy if TYPE_CHECKING: from ray.util.placement_group import PlacementGroup logger = init_logger(__name__) # If the env var is set, it uses the Ray's compiled DAG API # which optimizes the control plane overhead. # Run vLLM with VLLM_USE_RAY_COMPILED_DAG=1 to enable it. USE_RAY_COMPILED_DAG = bool(os.getenv("VLLM_USE_RAY_COMPILED_DAG", 0)) class RayGPUExecutor(ExecutorBase): def __init__( self, model_config: ModelConfig, cache_config: CacheConfig, parallel_config: ParallelConfig, scheduler_config: SchedulerConfig, device_config: DeviceConfig, lora_config: Optional[LoRAConfig], vision_language_config: Optional[VisionLanguageConfig], ) -> None: self.model_config = model_config self.cache_config = cache_config self.lora_config = lora_config self.parallel_config = parallel_config self.scheduler_config = scheduler_config self.device_config = device_config self.vision_language_config = vision_language_config assert self.parallel_config.worker_use_ray placement_group = self.parallel_config.placement_group # Disable Ray usage stats collection. ray_usage = os.environ.get("RAY_USAGE_STATS_ENABLED", "0") if ray_usage != "1": os.environ["RAY_USAGE_STATS_ENABLED"] = "0" # Create the parallel GPU workers. self._init_workers_ray(placement_group) # Profile the memory usage and initialize the cache. self._init_cache() self.forward_dag = None if USE_RAY_COMPILED_DAG: self.forward_dag = self._compiled_ray_dag() def _init_workers_ray(self, placement_group: "PlacementGroup", **ray_remote_kwargs): if self.parallel_config.tensor_parallel_size == 1: # For single GPU case, we use a ray worker with constrained memory. num_gpus = self.cache_config.gpu_memory_utilization else: # Otherwise, the ray workers are allocated with a full GPU. num_gpus = 1 # The driver dummy worker does not actually use any resources. # It holds the resource for the driver worker. self.driver_dummy_worker: RayWorkerVllm = None # The remaining workers are the actual ray actors. self.workers: List[RayWorkerVllm] = [] # Create the workers. driver_ip = get_ip() for bundle_id, bundle in enumerate(placement_group.bundle_specs): if not bundle.get("GPU", 0): continue scheduling_strategy = PlacementGroupSchedulingStrategy( placement_group=placement_group, placement_group_capture_child_tasks=True, placement_group_bundle_index=bundle_id, ) worker = ray.remote( num_cpus=0, num_gpus=num_gpus, scheduling_strategy=scheduling_strategy, **ray_remote_kwargs, )(RayWorkerVllm).remote(self.model_config.trust_remote_code) worker_ip = ray.get(worker.get_node_ip.remote()) if worker_ip == driver_ip and self.driver_dummy_worker is None: # If the worker is on the same node as the driver, we use it # as the resource holder for the driver process. self.driver_dummy_worker = worker else: # Else, added to the list of workers. self.workers.append(worker) if self.driver_dummy_worker is None: raise ValueError( "Ray does not allocate any GPUs on the driver node. Consider " "adjusting the Ray placement group or running the driver on a " "GPU node.") # Get the set of GPU IDs used on each node. driver_node_id, driver_gpu_ids = ray.get( self.driver_dummy_worker.get_node_and_gpu_ids.remote()) worker_node_and_gpu_ids = ray.get( [worker.get_node_and_gpu_ids.remote() for worker in self.workers]) node_workers = defaultdict(list) node_gpus = defaultdict(list) node_workers[driver_node_id].append(0) node_gpus[driver_node_id].extend(driver_gpu_ids) for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids, start=1): node_workers[node_id].append(i) node_gpus[node_id].extend(gpu_ids) for node_id, gpu_ids in node_gpus.items(): node_gpus[node_id] = sorted(gpu_ids) # Set CUDA_VISIBLE_DEVICES for the driver and workers. set_cuda_visible_devices(node_gpus[driver_node_id]) for worker, (node_id, _) in zip(self.workers, worker_node_and_gpu_ids): worker.set_cuda_visible_devices.remote(node_gpus[node_id]) distributed_init_method = get_distributed_init_method( driver_ip, get_open_port()) # Lazy import the Worker to avoid importing torch.cuda/xformers # before CUDA_VISIBLE_DEVICES is set in the Worker from vllm.worker.worker import Worker model_config = copy.deepcopy(self.model_config) parallel_config = copy.deepcopy(self.parallel_config) scheduler_config = copy.deepcopy(self.scheduler_config) device_config = copy.deepcopy(self.device_config) lora_config = copy.deepcopy(self.lora_config) kv_cache_dtype = self.cache_config.cache_dtype # Initialize the actual workers with the Worker class. for rank, (worker, (node_id, _)) in enumerate( zip(self.workers, worker_node_and_gpu_ids), start=1, ): local_rank = node_workers[node_id].index(rank) worker.init_worker.remote( lambda rank=rank, local_rank=local_rank: Worker( model_config, parallel_config, scheduler_config, device_config, local_rank, rank, distributed_init_method, lora_config=lora_config, kv_cache_dtype=kv_cache_dtype, )) # Initialize the driver worker with the Worker class. driver_rank = 0 driver_local_rank = node_workers[driver_node_id].index(driver_rank) self.driver_worker = Worker( self.model_config, self.parallel_config, self.scheduler_config, self.device_config, driver_local_rank, driver_rank, distributed_init_method, lora_config=self.lora_config, vision_language_config=self.vision_language_config, kv_cache_dtype=kv_cache_dtype, is_driver_worker=True, ) # FIXME(woosuk): We are not properly initializing pynccl when # we have multiple nodes. self._run_workers("init_device") self._run_workers( "load_model", max_concurrent_workers=self.parallel_config. max_parallel_loading_workers, ) def _init_cache(self) -> None: """Profiles the memory usage and initializes the KV cache. The engine will first conduct a profiling of the existing memory usage. Then, it calculate the maximum possible number of GPU and CPU blocks that can be allocated with the remaining free memory. More details can be found in the :meth:`~vllm.worker.worker.Worker.profile_num_available_blocks` method from class :class:`~vllm.worker.Worker`. Afterwards, as there may be multiple workers, we take the minimum number of blocks across all workers to ensure this can be applied to all of them. Finally, the engine will initialize the KV cache with the calculated number of blocks. .. tip:: You may limit the usage of GPU memory by adjusting the `gpu_memory_utilization` parameter. """ # Get the maximum number of blocks that can be allocated on GPU and CPU. num_blocks = self._run_workers( "profile_num_available_blocks", block_size=self.cache_config.block_size, gpu_memory_utilization=self.cache_config.gpu_memory_utilization, cpu_swap_space=self.cache_config.swap_space_bytes, cache_dtype=self.cache_config.cache_dtype, ) # Since we use a shared centralized controller, we take the minimum # number of blocks across all workers to make sure all the memory # operators can be applied to all workers. num_gpu_blocks = min(b[0] for b in num_blocks) num_cpu_blocks = min(b[1] for b in num_blocks) logger.info(f"# GPU blocks: {num_gpu_blocks}, " f"# CPU blocks: {num_cpu_blocks}") check_block_size_valid(num_gpu_blocks, self.cache_config.block_size, self.model_config.max_model_len) self.cache_config.num_gpu_blocks = num_gpu_blocks self.cache_config.num_cpu_blocks = num_cpu_blocks # Initialize the cache. self._run_workers("init_cache_engine", cache_config=self.cache_config) # Warm up the model. This includes capturing the model into CUDA graph # if enforce_eager is False. self._run_workers("warm_up_model") def execute_model(self, seq_group_metadata_list: List[SequenceGroupMetadata], blocks_to_swap_in: Dict[int, int], blocks_to_swap_out: Dict[int, int], blocks_to_copy: Dict[int, List[int]]) -> SamplerOutput: all_outputs = self._run_workers( "execute_model", driver_kwargs={ "seq_group_metadata_list": seq_group_metadata_list, "blocks_to_swap_in": blocks_to_swap_in, "blocks_to_swap_out": blocks_to_swap_out, "blocks_to_copy": blocks_to_copy, }, use_ray_compiled_dag=USE_RAY_COMPILED_DAG) # Only the driver worker returns the sampling results. output = all_outputs[0] return output def add_lora(self, lora_request: LoRARequest) -> bool: assert lora_request.lora_int_id > 0, "lora_id must be greater than 0." return self._run_workers( "add_lora", lora_request=lora_request, ) def remove_lora(self, lora_id: int) -> bool: assert lora_id > 0, "lora_id must be greater than 0." return self._run_workers( "remove_lora", lora_id=lora_id, ) def list_loras(self) -> List[int]: return self._run_workers("list_loras") def _run_workers( self, method: str, *args, driver_args: Optional[List[Any]] = None, driver_kwargs: Optional[Dict[str, Any]] = None, max_concurrent_workers: Optional[int] = None, use_ray_compiled_dag: bool = False, **kwargs, ) -> Any: """Runs the given method on all workers.""" if max_concurrent_workers: raise NotImplementedError( "max_concurrent_workers is not supported yet.") if use_ray_compiled_dag: # Right now, compiled DAG can only accept a single # input. TODO(sang): Fix it. output_channels = self.forward_dag.execute(1) else: # Start the ray workers first. ray_worker_outputs = [ worker.execute_method.remote(method, *args, **kwargs) for worker in self.workers ] if driver_args is None: driver_args = args if driver_kwargs is None: driver_kwargs = kwargs # Start the driver worker after all the ray workers. driver_worker_output = getattr(self.driver_worker, method)(*driver_args, **driver_kwargs) # Get the results of the ray workers. if self.workers: if use_ray_compiled_dag: try: ray_worker_outputs = [ pickle.loads(chan.begin_read()) for chan in output_channels ] finally: # Has to call end_read in order to reuse the DAG. for chan in output_channels: chan.end_read() else: ray_worker_outputs = ray.get(ray_worker_outputs) return [driver_worker_output] + ray_worker_outputs def _compiled_ray_dag(self): import pkg_resources required_version = "2.9" current_version = pkg_resources.get_distribution("ray").version if current_version < required_version: raise ValueError(f"Ray version {required_version} or greater is " f"required, but found {current_version}") from ray.dag import InputNode, MultiOutputNode assert self.parallel_config.worker_use_ray # Right now, compiled DAG requires at least 1 arg. We send # a dummy value for now. It will be fixed soon. with InputNode() as input_data: forward_dag = MultiOutputNode([ worker.execute_model_compiled_dag_remote.bind(input_data) for worker in self.workers ]) return forward_dag.experimental_compile() def check_health(self) -> None: """Raises an error if engine is unhealthy.""" self._check_if_any_actor_is_dead() def _check_if_any_actor_is_dead(self): if not self.workers: return dead_actors = [] for actor in self.workers: actor_state = ray.state.actors(actor._ray_actor_id.hex()) # pylint: disable=protected-access if actor_state["State"] == "DEAD": dead_actors.append(actor) if dead_actors: raise RuntimeError("At least one Worker is dead. " f"Dead Workers: {dead_actors}. ") class RayGPUExecutorAsync(RayGPUExecutor, ExecutorAsyncBase): async def _run_workers_async( self, method: str, *args, driver_args: Optional[List[Any]] = None, driver_kwargs: Optional[Dict[str, Any]] = None, **kwargs, ) -> Any: """Runs the given method on all workers.""" coros = [] if driver_args is None: driver_args = args if driver_kwargs is None: driver_kwargs = kwargs # Run the driver worker asynchronously. driver_executor = make_async(getattr(self.driver_worker, method)) coros.append(driver_executor(*driver_args, **driver_kwargs)) # Run the ray workers asynchronously. for worker in self.workers: coros.append(worker.execute_method.remote(method, *args, **kwargs)) all_outputs = await asyncio.gather(*coros) return all_outputs async def execute_model_async( self, seq_group_metadata_list: List[SequenceGroupMetadata], blocks_to_swap_in: Dict[int, int], blocks_to_swap_out: Dict[int, int], blocks_to_copy: Dict[int, List[int]], ) -> SamplerOutput: all_outputs = await self._run_workers_async( "execute_model", driver_kwargs={ "seq_group_metadata_list": seq_group_metadata_list, "blocks_to_swap_in": blocks_to_swap_in, "blocks_to_swap_out": blocks_to_swap_out, "blocks_to_copy": blocks_to_copy, }) # Only the driver worker returns the sampling results. output = all_outputs[0] return output async def check_health_async(self) -> None: """Raises an error if engine is unhealthy.""" self._check_if_any_actor_is_dead()