[misc] remove engine_use_ray (#8126)
This commit is contained in:
@@ -16,7 +16,7 @@ from vllm.engine.llm_engine import (DecoderPromptComponents, LLMEngine,
|
||||
PromptComponents, SchedulerOutputState)
|
||||
from vllm.engine.metrics_types import StatLoggerBase
|
||||
from vllm.executor.executor_base import ExecutorAsyncBase
|
||||
from vllm.executor.ray_utils import initialize_ray_cluster, ray
|
||||
from vllm.executor.ray_utils import initialize_ray_cluster
|
||||
from vllm.inputs import (EncoderDecoderLLMInputs, LLMInputs, PromptInputs,
|
||||
SingletonPromptInputs)
|
||||
from vllm.inputs.parse import is_explicit_encoder_decoder_prompt
|
||||
@@ -30,7 +30,6 @@ from vllm.sampling_params import SamplingParams
|
||||
from vllm.sequence import ExecuteModelRequest
|
||||
from vllm.transformers_utils.tokenizer import AnyTokenizer
|
||||
from vllm.usage.usage_lib import UsageContext
|
||||
from vllm.utils import print_warning_once
|
||||
|
||||
logger = init_logger(__name__)
|
||||
ENGINE_ITERATION_TIMEOUT_S = envs.VLLM_ENGINE_ITERATION_TIMEOUT_S
|
||||
@@ -590,9 +589,6 @@ class AsyncLLMEngine:
|
||||
worker_use_ray: Whether to use Ray for model workers. Required for
|
||||
distributed execution. Should be the same as
|
||||
`parallel_config.worker_use_ray`.
|
||||
engine_use_ray: Whether to make LLMEngine a Ray actor. If so, the
|
||||
async frontend will be executed in a separate process as the
|
||||
model workers.
|
||||
log_requests: Whether to log the requests.
|
||||
start_engine_loop: If True, the background task to run the engine
|
||||
will be automatically started in the generate call.
|
||||
@@ -604,41 +600,23 @@ class AsyncLLMEngine:
|
||||
|
||||
def __init__(self,
|
||||
worker_use_ray: bool,
|
||||
engine_use_ray: bool,
|
||||
*args,
|
||||
log_requests: bool = True,
|
||||
start_engine_loop: bool = True,
|
||||
**kwargs) -> None:
|
||||
self.worker_use_ray = worker_use_ray
|
||||
self.engine_use_ray = engine_use_ray
|
||||
self.log_requests = log_requests
|
||||
self.engine = self._init_engine(*args, **kwargs)
|
||||
self.engine = self._engine_class(*args, **kwargs)
|
||||
|
||||
# This ensures quick processing of request outputs
|
||||
# so the append to asyncio queues is not delayed,
|
||||
# especially for multi-step.
|
||||
#
|
||||
# TODO: Currently, disabled for engine_use_ray, ask
|
||||
# Cody/Will/Woosuk about this case.
|
||||
self.use_process_request_outputs_callback = not self.engine_use_ray
|
||||
self.use_process_request_outputs_callback = True
|
||||
if self.use_process_request_outputs_callback:
|
||||
self.engine.process_request_outputs_callback = \
|
||||
self.process_request_outputs
|
||||
|
||||
if self.engine_use_ray:
|
||||
print_warning_once(
|
||||
"DEPRECATED. `--engine-use-ray` is deprecated and will "
|
||||
"be removed in a future update. "
|
||||
"See https://github.com/vllm-project/vllm/issues/7045.")
|
||||
|
||||
if envs.VLLM_ALLOW_ENGINE_USE_RAY:
|
||||
print_warning_once(
|
||||
"VLLM_ALLOW_ENGINE_USE_RAY is set, force engine use Ray")
|
||||
else:
|
||||
raise ValueError("`--engine-use-ray` is deprecated. "
|
||||
"Set `VLLM_ALLOW_ENGINE_USE_RAY=1` to "
|
||||
"force use it")
|
||||
|
||||
self.background_loop: Optional[asyncio.Future] = None
|
||||
# We need to keep a reference to unshielded
|
||||
# task as well to prevent it from being garbage
|
||||
@@ -725,16 +703,11 @@ class AsyncLLMEngine:
|
||||
# Create the engine configs.
|
||||
engine_config = engine_args.create_engine_config()
|
||||
|
||||
if engine_args.engine_use_ray:
|
||||
from vllm.executor import ray_utils
|
||||
ray_utils.assert_ray_available()
|
||||
|
||||
executor_class = cls._get_executor_cls(engine_config)
|
||||
|
||||
# Create the async LLM engine.
|
||||
engine = cls(
|
||||
executor_class.uses_ray,
|
||||
engine_args.engine_use_ray,
|
||||
**engine_config.to_dict(),
|
||||
executor_class=executor_class,
|
||||
log_requests=not engine_args.disable_log_requests,
|
||||
@@ -777,10 +750,6 @@ class AsyncLLMEngine:
|
||||
self,
|
||||
lora_request: Optional[LoRARequest] = None,
|
||||
) -> AnyTokenizer:
|
||||
if self.engine_use_ray:
|
||||
return await self.engine.get_tokenizer.remote( # type: ignore
|
||||
lora_request)
|
||||
|
||||
return await (self.engine.get_tokenizer_group().
|
||||
get_lora_tokenizer_async(lora_request))
|
||||
|
||||
@@ -814,26 +783,6 @@ class AsyncLLMEngine:
|
||||
self._background_loop_unshielded = None
|
||||
self.background_loop = None
|
||||
|
||||
def _init_engine(self, *args,
|
||||
**kwargs) -> Union[_AsyncLLMEngine, "ray.ObjectRef"]:
|
||||
if not self.engine_use_ray:
|
||||
engine_class = self._engine_class
|
||||
elif self.worker_use_ray:
|
||||
engine_class = ray.remote(num_cpus=0)(self._engine_class).remote
|
||||
else:
|
||||
# FIXME(woosuk): This is a bit hacky. Be careful when changing the
|
||||
# order of the arguments.
|
||||
cache_config = kwargs["cache_config"]
|
||||
parallel_config = kwargs["parallel_config"]
|
||||
if (parallel_config.tensor_parallel_size == 1
|
||||
and parallel_config.pipeline_parallel_size == 1):
|
||||
num_gpus = cache_config.gpu_memory_utilization
|
||||
else:
|
||||
num_gpus = 1
|
||||
engine_class = ray.remote(num_gpus=num_gpus)(
|
||||
self._engine_class).remote
|
||||
return engine_class(*args, **kwargs)
|
||||
|
||||
async def engine_step(self, virtual_engine: int) -> bool:
|
||||
"""Kick the engine to process the waiting requests.
|
||||
|
||||
@@ -844,13 +793,8 @@ class AsyncLLMEngine:
|
||||
|
||||
for new_request in new_requests:
|
||||
# Add the request into the vLLM engine's waiting queue.
|
||||
# TODO: Maybe add add_request_batch to reduce Ray overhead
|
||||
try:
|
||||
if self.engine_use_ray:
|
||||
await self.engine.add_request.remote( # type: ignore
|
||||
**new_request)
|
||||
else:
|
||||
await self.engine.add_request_async(**new_request)
|
||||
await self.engine.add_request_async(**new_request)
|
||||
except ValueError as e:
|
||||
# TODO: use a vLLM specific error for failed validation
|
||||
self._request_tracker.process_exception(
|
||||
@@ -862,10 +806,7 @@ class AsyncLLMEngine:
|
||||
if aborted_requests:
|
||||
await self._engine_abort(aborted_requests)
|
||||
|
||||
if self.engine_use_ray:
|
||||
request_outputs = await self.engine.step.remote() # type: ignore
|
||||
else:
|
||||
request_outputs = await self.engine.step_async(virtual_engine)
|
||||
request_outputs = await self.engine.step_async(virtual_engine)
|
||||
|
||||
# Put the outputs into the corresponding streams.
|
||||
# If used as a callback, then already invoked inside
|
||||
@@ -891,16 +832,10 @@ class AsyncLLMEngine:
|
||||
return all_finished
|
||||
|
||||
async def _engine_abort(self, request_ids: Iterable[str]):
|
||||
if self.engine_use_ray:
|
||||
await self.engine.abort_request.remote(request_ids) # type: ignore
|
||||
else:
|
||||
self.engine.abort_request(request_ids)
|
||||
self.engine.abort_request(request_ids)
|
||||
|
||||
async def run_engine_loop(self):
|
||||
if self.engine_use_ray:
|
||||
pipeline_parallel_size = 1 # type: ignore
|
||||
else:
|
||||
pipeline_parallel_size = \
|
||||
pipeline_parallel_size = \
|
||||
self.engine.parallel_config.pipeline_parallel_size
|
||||
has_requests_in_progress = [False] * pipeline_parallel_size
|
||||
while True:
|
||||
@@ -912,12 +847,7 @@ class AsyncLLMEngine:
|
||||
# timeout, and unblocks the RPC thread in the workers so that
|
||||
# they can process any other queued control plane messages,
|
||||
# such as add/remove lora adapters.
|
||||
if self.engine_use_ray:
|
||||
await (self.engine.stop_remote_worker_execution_loop.
|
||||
remote() # type: ignore
|
||||
)
|
||||
else:
|
||||
await self.engine.stop_remote_worker_execution_loop_async()
|
||||
await self.engine.stop_remote_worker_execution_loop_async()
|
||||
await self._request_tracker.wait_for_new_requests()
|
||||
logger.debug("Got new requests!")
|
||||
requests_in_progress = [
|
||||
@@ -938,17 +868,9 @@ class AsyncLLMEngine:
|
||||
for task in done:
|
||||
result = task.result()
|
||||
virtual_engine = requests_in_progress.index(task)
|
||||
if self.engine_use_ray:
|
||||
has_unfinished_requests = (
|
||||
await (self.engine.
|
||||
has_unfinished_requests_for_virtual_engine.
|
||||
remote( # type: ignore
|
||||
virtual_engine)))
|
||||
else:
|
||||
has_unfinished_requests = (
|
||||
self.engine.
|
||||
has_unfinished_requests_for_virtual_engine(
|
||||
virtual_engine))
|
||||
has_unfinished_requests = (
|
||||
self.engine.has_unfinished_requests_for_virtual_engine(
|
||||
virtual_engine))
|
||||
if result or has_unfinished_requests:
|
||||
requests_in_progress[virtual_engine] = (
|
||||
asyncio.create_task(
|
||||
@@ -1190,52 +1112,29 @@ class AsyncLLMEngine:
|
||||
|
||||
async def get_model_config(self) -> ModelConfig:
|
||||
"""Get the model configuration of the vLLM engine."""
|
||||
if self.engine_use_ray:
|
||||
return await self.engine.get_model_config.remote() # type: ignore
|
||||
else:
|
||||
return self.engine.get_model_config()
|
||||
return self.engine.get_model_config()
|
||||
|
||||
async def get_parallel_config(self) -> ParallelConfig:
|
||||
"""Get the parallel configuration of the vLLM engine."""
|
||||
if self.engine_use_ray:
|
||||
return await self.engine.get_parallel_config.remote( # type: ignore
|
||||
)
|
||||
else:
|
||||
return self.engine.get_parallel_config()
|
||||
return self.engine.get_parallel_config()
|
||||
|
||||
async def get_decoding_config(self) -> DecodingConfig:
|
||||
"""Get the decoding configuration of the vLLM engine."""
|
||||
if self.engine_use_ray:
|
||||
return await self.engine.get_decoding_config.remote( # type: ignore
|
||||
)
|
||||
else:
|
||||
return self.engine.get_decoding_config()
|
||||
return self.engine.get_decoding_config()
|
||||
|
||||
async def get_scheduler_config(self) -> SchedulerConfig:
|
||||
"""Get the scheduling configuration of the vLLM engine."""
|
||||
if self.engine_use_ray:
|
||||
return await self.engine.get_scheduler_config.remote( # type: ignore
|
||||
)
|
||||
else:
|
||||
return self.engine.get_scheduler_config()
|
||||
return self.engine.get_scheduler_config()
|
||||
|
||||
async def get_lora_config(self) -> LoRAConfig:
|
||||
"""Get the lora configuration of the vLLM engine."""
|
||||
if self.engine_use_ray:
|
||||
return await self.engine.get_lora_config.remote( # type: ignore
|
||||
)
|
||||
else:
|
||||
return self.engine.get_lora_config()
|
||||
return self.engine.get_lora_config()
|
||||
|
||||
async def do_log_stats(
|
||||
self,
|
||||
scheduler_outputs: Optional[SchedulerOutputs] = None,
|
||||
model_output: Optional[List[SamplerOutput]] = None) -> None:
|
||||
if self.engine_use_ray:
|
||||
await self.engine.do_log_stats.remote( # type: ignore
|
||||
scheduler_outputs, model_output)
|
||||
else:
|
||||
self.engine.do_log_stats()
|
||||
self.engine.do_log_stats()
|
||||
|
||||
async def check_health(self) -> None:
|
||||
"""Raises an error if engine is unhealthy."""
|
||||
@@ -1244,37 +1143,17 @@ class AsyncLLMEngine:
|
||||
if self.is_stopped:
|
||||
raise AsyncEngineDeadError("Background loop is stopped.")
|
||||
|
||||
if self.engine_use_ray:
|
||||
try:
|
||||
await self.engine.check_health.remote() # type: ignore
|
||||
except ray.exceptions.RayActorError as e:
|
||||
raise RuntimeError("Engine is dead.") from e
|
||||
else:
|
||||
await self.engine.check_health_async()
|
||||
await self.engine.check_health_async()
|
||||
logger.debug("Health check took %fs", time.perf_counter() - t)
|
||||
|
||||
async def is_tracing_enabled(self) -> bool:
|
||||
if self.engine_use_ray:
|
||||
return await self.engine.is_tracing_enabled.remote( # type: ignore
|
||||
)
|
||||
else:
|
||||
return self.engine.is_tracing_enabled()
|
||||
return self.engine.is_tracing_enabled()
|
||||
|
||||
def add_logger(self, logger_name: str, logger: StatLoggerBase) -> None:
|
||||
if self.engine_use_ray:
|
||||
ray.get(
|
||||
self.engine.add_logger.remote( # type: ignore
|
||||
logger_name=logger_name, logger=logger))
|
||||
else:
|
||||
self.engine.add_logger(logger_name=logger_name, logger=logger)
|
||||
self.engine.add_logger(logger_name=logger_name, logger=logger)
|
||||
|
||||
def remove_logger(self, logger_name: str) -> None:
|
||||
if self.engine_use_ray:
|
||||
ray.get(
|
||||
self.engine.remove_logger.remote( # type: ignore
|
||||
logger_name=logger_name))
|
||||
else:
|
||||
self.engine.remove_logger(logger_name=logger_name)
|
||||
self.engine.remove_logger(logger_name=logger_name)
|
||||
|
||||
async def start_profile(self) -> None:
|
||||
self.engine.model_executor._run_workers("start_profile")
|
||||
|
||||
Reference in New Issue
Block a user