[Frontend] Add GPU-less render serving path (vllm launch render) (#36166)

This commit is contained in:
Sage
2026-03-08 17:35:09 +02:00
committed by GitHub
parent b7332b058c
commit 4497431df6
10 changed files with 712 additions and 273 deletions

View File

@@ -8,7 +8,7 @@ import uvloop
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.entrypoints.cli.types import CLISubcommand
from vllm.entrypoints.openai.api_server import (
build_and_serve,
build_and_serve_renderer,
setup_server,
)
from vllm.entrypoints.openai.cli_args import (
@@ -109,19 +109,17 @@ def cmd_init() -> list[CLISubcommand]:
async def run_launch_fastapi(args: argparse.Namespace) -> None:
"""Run the online serving layer with FastAPI (no GPU inference)."""
from vllm.config import VllmConfig
from vllm.v1.engine.launch import LaunchEngineClient
# 1. Socket binding
listen_address, sock = setup_server(args)
# 2. Create LaunchEngineClient (no GPU)
# 2. Build and serve the API server
engine_args = AsyncEngineArgs.from_cli_args(args)
model_config = engine_args.create_model_config()
vllm_config = VllmConfig(model_config=model_config)
engine_client = LaunchEngineClient.from_vllm_config(vllm_config)
# 3. Build app, initialize state, and start serving
shutdown_task = await build_and_serve(engine_client, listen_address, sock, args)
shutdown_task = await build_and_serve_renderer(
vllm_config, listen_address, sock, args
)
try:
await shutdown_task
finally:

View File

@@ -22,6 +22,7 @@ from fastapi.middleware.cors import CORSMiddleware
from starlette.datastructures import State
import vllm.envs as envs
from vllm.config import VllmConfig
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.protocol import EngineClient
from vllm.entrypoints.chat_utils import load_chat_template
@@ -198,7 +199,7 @@ def build_app(
register_sagemaker_api_router(app, supported_tasks)
if any(task in supported_tasks for task in ("generate", "render")):
if "generate" in supported_tasks:
from vllm.entrypoints.openai.generate.api_router import (
register_generate_api_routers,
)
@@ -223,6 +224,13 @@ def build_app(
elastic_ep_attach_router(app)
if "generate" in supported_tasks or "render" in supported_tasks:
from vllm.entrypoints.serve.render.api_router import (
attach_router as attach_render_router,
)
attach_render_router(app)
if "transcription" in supported_tasks:
from vllm.entrypoints.openai.speech_to_text.api_router import (
attach_router as register_speech_to_text_api_router,
@@ -363,7 +371,7 @@ async def init_app_state(
trust_request_chat_template=args.trust_request_chat_template,
)
if any(task in supported_tasks for task in ("generate", "render")):
if "generate" in supported_tasks:
from vllm.entrypoints.openai.generate.api_router import init_generate_state
await init_generate_state(
@@ -393,6 +401,64 @@ async def init_app_state(
state.server_load_metrics = 0
async def init_render_app_state(
vllm_config: VllmConfig,
state: State,
args: Namespace,
) -> None:
"""Initialise FastAPI app state for a CPU-only render server.
Unlike :func:`init_app_state` this function does not require an
:class:`~vllm.engine.protocol.EngineClient`; it bootstraps the
preprocessing pipeline (renderer, io_processor, input_processor)
directly from the :class:`~vllm.config.VllmConfig`.
"""
from vllm.entrypoints.chat_utils import load_chat_template
from vllm.entrypoints.serve.render.serving import OpenAIServingRender
from vllm.plugins.io_processors import get_io_processor
from vllm.renderers import renderer_from_config
served_model_names = args.served_model_name or [args.model]
if args.enable_log_requests:
request_logger = RequestLogger(max_log_len=args.max_log_len)
else:
request_logger = None
renderer = renderer_from_config(vllm_config)
io_processor = get_io_processor(
vllm_config, renderer, vllm_config.model_config.io_processor_plugin
)
resolved_chat_template = load_chat_template(args.chat_template)
state.openai_serving_render = OpenAIServingRender(
model_config=vllm_config.model_config,
renderer=renderer,
io_processor=io_processor,
served_model_names=served_model_names,
request_logger=request_logger,
chat_template=resolved_chat_template,
chat_template_content_format=args.chat_template_content_format,
trust_request_chat_template=args.trust_request_chat_template,
enable_auto_tools=args.enable_auto_tool_choice,
exclude_tools_when_tool_choice_none=args.exclude_tools_when_tool_choice_none,
tool_parser=args.tool_call_parser,
default_chat_template_kwargs=args.default_chat_template_kwargs,
log_error_stack=args.log_error_stack,
)
# Expose models endpoint via the render handler.
state.openai_serving_models = state.openai_serving_render
state.vllm_config = vllm_config
# Disable stats logging — there is no engine to poll.
state.log_stats = False
state.engine_client = None
state.args = args
state.enable_server_load_tracking = False
state.server_load_metrics = 0
def create_server_socket(addr: tuple[str, int]) -> socket.socket:
family = socket.AF_INET
if is_valid_ipv6_address(addr[0]):
@@ -494,7 +560,6 @@ async def build_and_serve(
supported_tasks = await engine_client.get_supported_tasks()
logger.info("Supported tasks: %s", supported_tasks)
app = build_app(args, supported_tasks)
await init_app_state(engine_client, app.state, args, supported_tasks)
@@ -522,6 +587,51 @@ async def build_and_serve(
)
async def build_and_serve_renderer(
vllm_config: VllmConfig,
listen_address: str,
sock: socket.socket,
args: Namespace,
**uvicorn_kwargs,
) -> asyncio.Task:
"""Build FastAPI app for a CPU-only render server, initialize state, and
start serving.
Returns the shutdown task for the caller to await.
"""
# Get uvicorn log config (from file or with endpoint filter)
log_config = get_uvicorn_log_config(args)
if log_config is not None:
uvicorn_kwargs["log_config"] = log_config
app = build_app(args, ("render",))
await init_render_app_state(vllm_config, app.state, args)
logger.info("Starting vLLM server on %s", listen_address)
return await serve_http(
app,
sock=sock,
enable_ssl_refresh=args.enable_ssl_refresh,
host=args.host,
port=args.port,
log_level=args.uvicorn_log_level,
# NOTE: When the 'disable_uvicorn_access_log' value is True,
# no access log will be output.
access_log=not args.disable_uvicorn_access_log,
timeout_keep_alive=envs.VLLM_HTTP_TIMEOUT_KEEP_ALIVE,
ssl_keyfile=args.ssl_keyfile,
ssl_certfile=args.ssl_certfile,
ssl_ca_certs=args.ssl_ca_certs,
ssl_cert_reqs=args.ssl_cert_reqs,
ssl_ciphers=args.ssl_ciphers,
h11_max_incomplete_event_size=args.h11_max_incomplete_event_size,
h11_max_header_count=args.h11_max_header_count,
**uvicorn_kwargs,
)
async def run_server(args, **uvicorn_kwargs) -> None:
"""Run a single-worker API server."""

View File

@@ -71,34 +71,5 @@ async def create_chat_completion(request: ChatCompletionRequest, raw_request: Re
return StreamingResponse(content=generator, media_type="text/event-stream")
@router.post(
"/v1/chat/completions/render",
dependencies=[Depends(validate_json_request)],
response_model=list,
responses={
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse},
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
HTTPStatus.NOT_IMPLEMENTED.value: {"model": ErrorResponse},
},
)
async def render_chat_completion(request: ChatCompletionRequest, raw_request: Request):
"""Render chat completion request and return conversation and engine
prompts without generating."""
handler = chat(raw_request)
if handler is None:
base_server = raw_request.app.state.openai_serving_tokenization
return base_server.create_error_response(
message="The model does not support Chat Completions API"
)
result = await handler.render_chat_request(request)
if isinstance(result, ErrorResponse):
return JSONResponse(content=result.model_dump(), status_code=result.error.code)
return JSONResponse(content=result)
def attach_router(app: FastAPI):
app.include_router(router)

View File

@@ -69,32 +69,5 @@ async def create_completion(request: CompletionRequest, raw_request: Request):
return StreamingResponse(content=generator, media_type="text/event-stream")
@router.post(
"/v1/completions/render",
dependencies=[Depends(validate_json_request)],
response_model=list,
responses={
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse},
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
},
)
async def render_completion(request: CompletionRequest, raw_request: Request):
"""render completion request and return engine prompts without generating."""
handler = completion(raw_request)
if handler is None:
base_server = raw_request.app.state.openai_serving_tokenization
return base_server.create_error_response(
message="The model does not support Completions API"
)
result = await handler.render_completion_request(request)
if isinstance(result, ErrorResponse):
return JSONResponse(content=result.model_dump(), status_code=result.error.code)
return JSONResponse(content=result)
def attach_router(app: FastAPI):
app.include_router(router)

View File

@@ -111,7 +111,7 @@ async def init_generate_state(
enable_log_outputs=args.enable_log_outputs,
enable_log_deltas=args.enable_log_deltas,
)
if any(task in supported_tasks for task in ("generate", "render"))
if "generate" in supported_tasks
else None
)
# Warm up chat template processing to avoid first-request latency
@@ -126,7 +126,7 @@ async def init_generate_state(
enable_prompt_tokens_details=args.enable_prompt_tokens_details,
enable_force_include_usage=args.enable_force_include_usage,
)
if any(task in supported_tasks for task in ("generate", "render"))
if "generate" in supported_tasks
else None
)
state.anthropic_serving_messages = (
@@ -160,3 +160,26 @@ async def init_generate_state(
if "generate" in supported_tasks
else None
)
# Render endpoints are always backed by OpenAIServingRender so that
# /v1/chat/completions/render and /v1/completions/render work on both
# generate-mode and render-only servers.
from vllm.entrypoints.serve.render.serving import OpenAIServingRender
state.openai_serving_render = OpenAIServingRender(
model_config=engine_client.model_config,
renderer=engine_client.renderer,
io_processor=engine_client.io_processor,
served_model_names=[
mp.name for mp in state.openai_serving_models.base_model_paths
],
request_logger=request_logger,
chat_template=resolved_chat_template,
chat_template_content_format=args.chat_template_content_format,
trust_request_chat_template=args.trust_request_chat_template,
enable_auto_tools=args.enable_auto_tool_choice,
exclude_tools_when_tool_choice_none=args.exclude_tools_when_tool_choice_none,
tool_parser=args.tool_call_parser,
default_chat_template_kwargs=args.default_chat_template_kwargs,
log_error_stack=args.log_error_stack,
)

View File

@@ -22,8 +22,12 @@ def engine_client(request: Request) -> EngineClient:
@router.get("/health", response_class=Response)
async def health(raw_request: Request) -> Response:
"""Health check."""
client = engine_client(raw_request)
if client is None:
# Render-only servers have no engine; they are always healthy.
return Response(status_code=200)
try:
await engine_client(raw_request).check_health()
await client.check_health()
return Response(status_code=200)
except EngineDeadError:
return Response(status_code=503)

View File

@@ -0,0 +1,2 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

View File

@@ -0,0 +1,87 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from http import HTTPStatus
from fastapi import APIRouter, Depends, FastAPI, Request
from fastapi.responses import JSONResponse
from vllm.entrypoints.openai.chat_completion.protocol import ChatCompletionRequest
from vllm.entrypoints.openai.completion.protocol import CompletionRequest
from vllm.entrypoints.openai.engine.protocol import ErrorResponse
from vllm.entrypoints.openai.utils import validate_json_request
from vllm.entrypoints.serve.render.serving import OpenAIServingRender
from vllm.entrypoints.utils import create_error_response
from vllm.logger import init_logger
logger = init_logger(__name__)
router = APIRouter()
def render(request: Request) -> OpenAIServingRender | None:
return getattr(request.app.state, "openai_serving_render", None)
@router.post(
"/v1/chat/completions/render",
dependencies=[Depends(validate_json_request)],
response_model=list,
responses={
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse},
HTTPStatus.NOT_IMPLEMENTED.value: {"model": ErrorResponse},
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
},
)
async def render_chat_completion(request: ChatCompletionRequest, raw_request: Request):
handler = render(raw_request)
if handler is None:
error = create_error_response(
message="The model does not support Chat Completions Render API",
err_type="NotFoundError",
status_code=HTTPStatus.NOT_FOUND,
)
return JSONResponse(
status_code=HTTPStatus.NOT_FOUND, content=error.model_dump()
)
result = await handler.render_chat_request(request)
if isinstance(result, ErrorResponse):
return JSONResponse(content=result.model_dump(), status_code=result.error.code)
return JSONResponse(content=result)
@router.post(
"/v1/completions/render",
dependencies=[Depends(validate_json_request)],
response_model=list,
responses={
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse},
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
},
)
async def render_completion(request: CompletionRequest, raw_request: Request):
handler = render(raw_request)
if handler is None:
error = create_error_response(
message="The model does not support Completions Render API",
err_type="NotFoundError",
status_code=HTTPStatus.NOT_FOUND,
)
return JSONResponse(
status_code=HTTPStatus.NOT_FOUND, content=error.model_dump()
)
result = await handler.render_completion_request(request)
if isinstance(result, ErrorResponse):
return JSONResponse(content=result.model_dump(), status_code=result.error.code)
return JSONResponse(content=result)
def attach_router(app: FastAPI) -> None:
app.include_router(router)

View File

@@ -0,0 +1,475 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import sys
import traceback
from collections.abc import Callable, Sequence
from http import HTTPStatus
from typing import Any
import jinja2
from openai_harmony import Message as OpenAIMessage
from vllm.config import ModelConfig
from vllm.entrypoints.chat_utils import (
ChatTemplateContentFormatOption,
ConversationMessage,
)
from vllm.entrypoints.logger import RequestLogger
from vllm.entrypoints.openai.chat_completion.protocol import ChatCompletionRequest
from vllm.entrypoints.openai.completion.protocol import CompletionRequest
from vllm.entrypoints.openai.engine.protocol import (
ErrorInfo,
ErrorResponse,
ModelCard,
ModelList,
ModelPermission,
)
from vllm.entrypoints.openai.parser.harmony_utils import (
get_developer_message,
get_system_message,
parse_chat_inputs_to_harmony_messages,
render_for_completion,
)
from vllm.entrypoints.utils import sanitize_message
from vllm.inputs.data import ProcessorInputs, PromptType, SingletonPrompt, TokensPrompt
from vllm.logger import init_logger
from vllm.parser import ParserManager
from vllm.renderers import BaseRenderer, merge_kwargs
from vllm.renderers.inputs.preprocess import parse_model_prompt, prompt_to_seq
from vllm.tokenizers import TokenizerLike
from vllm.tool_parsers import ToolParser
from vllm.utils.mistral import is_mistral_tokenizer
from vllm.utils.mistral import mt as _mt
logger = init_logger(__name__)
class OpenAIServingRender:
def __init__(
self,
model_config: ModelConfig,
renderer: BaseRenderer,
io_processor: Any,
served_model_names: list[str],
*,
request_logger: RequestLogger | None,
chat_template: str | None,
chat_template_content_format: ChatTemplateContentFormatOption,
trust_request_chat_template: bool = False,
enable_auto_tools: bool = False,
exclude_tools_when_tool_choice_none: bool = False,
tool_parser: str | None = None,
default_chat_template_kwargs: dict[str, Any] | None = None,
log_error_stack: bool = False,
) -> None:
self.model_config = model_config
self.renderer = renderer
self.io_processor = io_processor
self.served_model_names = served_model_names
self.request_logger = request_logger
self.chat_template = chat_template
self.chat_template_content_format: ChatTemplateContentFormatOption = (
chat_template_content_format
)
self.trust_request_chat_template = trust_request_chat_template
self.enable_auto_tools = enable_auto_tools
self.exclude_tools_when_tool_choice_none = exclude_tools_when_tool_choice_none
self.tool_parser: Callable[[TokenizerLike], ToolParser] | None = (
ParserManager.get_tool_parser(
tool_parser_name=tool_parser,
enable_auto_tools=enable_auto_tools,
model_name=model_config.model,
)
)
self.default_chat_template_kwargs: dict[str, Any] = (
default_chat_template_kwargs or {}
)
self.log_error_stack = log_error_stack
self.use_harmony = model_config.hf_config.model_type == "gpt_oss"
self.supports_browsing = False
self.supports_code_interpreter = False
async def render_chat_request(
self,
request: ChatCompletionRequest,
) -> tuple[list[ConversationMessage], list[ProcessorInputs]] | ErrorResponse:
"""Copied from OpenAIServingChat.render_chat_request.
Differences: engine_client.errored check removed (no engine client).
"""
error_check_ret = await self._check_model(request)
if error_check_ret is not None:
logger.error("Error with model %s", error_check_ret)
return error_check_ret
try:
tokenizer = self.renderer.tokenizer
tool_parser = self.tool_parser
if is_mistral_tokenizer(tokenizer):
# because of issues with pydantic we need to potentially
# re-serialize the tool_calls field of the request
# for more info: see comment in `maybe_serialize_tool_calls`
_mt.maybe_serialize_tool_calls(request) # type: ignore[arg-type]
_mt.truncate_tool_call_ids(request) # type: ignore[arg-type]
_mt.validate_request_params(request)
# Check if tool parsing is unavailable (common condition)
tool_parsing_unavailable = (
tool_parser is None
and not is_mistral_tokenizer(tokenizer)
and not self.use_harmony
)
# Validate tool_choice when tool parsing is required but unavailable
if tool_parsing_unavailable and request.tool_choice not in (
None,
"none",
):
if request.tool_choice == "auto" and not self.enable_auto_tools:
# for hf tokenizers, "auto" tools requires
# --enable-auto-tool-choice and --tool-call-parser
return self.create_error_response(
'"auto" tool choice requires '
"--enable-auto-tool-choice and --tool-call-parser to be set"
)
elif request.tool_choice != "auto":
# "required" or named tool requires tool parser
return self.create_error_response(
f'tool_choice="{request.tool_choice}" requires '
"--tool-call-parser to be set"
)
if request.tools is None or (
request.tool_choice == "none"
and self.exclude_tools_when_tool_choice_none
):
tool_dicts = None
else:
tool_dicts = [tool.model_dump() for tool in request.tools]
if not self.use_harmony:
# Common case.
error_check_ret = self._validate_chat_template(
request_chat_template=request.chat_template,
chat_template_kwargs=request.chat_template_kwargs,
trust_request_chat_template=self.trust_request_chat_template,
)
if error_check_ret is not None:
return error_check_ret
conversation, engine_prompts = await self._preprocess_chat(
request,
request.messages,
default_template=self.chat_template,
default_template_content_format=self.chat_template_content_format,
default_template_kwargs=self.default_chat_template_kwargs,
tool_dicts=tool_dicts,
tool_parser=tool_parser,
)
else:
# For GPT-OSS.
should_include_tools = tool_dicts is not None
conversation, engine_prompts = self._make_request_with_harmony(
request, should_include_tools
)
except (ValueError, TypeError, RuntimeError, jinja2.TemplateError) as e:
logger.exception("Error in preprocessing prompt inputs")
return self.create_error_response(e)
return conversation, engine_prompts
async def render_completion_request(
self,
request: CompletionRequest,
) -> list[ProcessorInputs] | ErrorResponse:
"""Copied from OpenAIServingCompletion.render_completion_request.
Differences: engine_client.errored check removed (no engine client).
"""
error_check_ret = await self._check_model(request)
if error_check_ret is not None:
return error_check_ret
# Return error for unsupported features.
if request.suffix is not None:
return self.create_error_response("suffix is not currently supported")
if request.echo and request.prompt_embeds is not None:
return self.create_error_response("Echo is unsupported with prompt embeds.")
if request.prompt_logprobs is not None and request.prompt_embeds is not None:
return self.create_error_response(
"prompt_logprobs is not compatible with prompt embeds."
)
try:
engine_prompts = await self._preprocess_completion(
request,
prompt_input=request.prompt,
prompt_embeds=request.prompt_embeds,
)
except (ValueError, TypeError, RuntimeError, jinja2.TemplateError) as e:
logger.exception("Error in preprocessing prompt inputs")
return self.create_error_response(e)
return engine_prompts
def _make_request_with_harmony(
self,
request: ChatCompletionRequest,
should_include_tools: bool = True,
):
"""Copied from OpenAIServingChat._make_request_with_harmony."""
messages: list[OpenAIMessage] = []
# because of issues with pydantic we need to potentially
# re-serialize the tool_calls field of the request
# for more info: see comment in `maybe_serialize_tool_calls`
_mt.maybe_serialize_tool_calls(request) # type: ignore[arg-type]
# Add system message.
# NOTE: In Chat Completion API, browsing is enabled by default
# if the model supports it. TODO: Support browsing.
assert not self.supports_browsing
assert not self.supports_code_interpreter
sys_msg = get_system_message(
reasoning_effort=request.reasoning_effort,
browser_description=None,
python_description=None,
with_custom_tools=should_include_tools,
)
messages.append(sys_msg)
# Add developer message.
if request.tools:
dev_msg = get_developer_message(
tools=request.tools if should_include_tools else None # type: ignore[arg-type]
)
messages.append(dev_msg)
# Add user message.
messages.extend(parse_chat_inputs_to_harmony_messages(request.messages))
# Render prompt token ids.
prompt_token_ids = render_for_completion(messages)
engine_prompt = TokensPrompt(prompt_token_ids=prompt_token_ids)
# Add cache_salt if provided in the request
if request.cache_salt is not None:
engine_prompt["cache_salt"] = request.cache_salt
return messages, [engine_prompt]
async def show_available_models(self) -> ModelList:
"""Returns the models served by this render server."""
max_model_len = self.model_config.max_model_len
return ModelList(
data=[
ModelCard(
id=name,
max_model_len=max_model_len,
root=self.model_config.model,
permission=[ModelPermission()],
)
for name in self.served_model_names
]
)
def create_error_response(
self,
message: str | Exception,
err_type: str = "BadRequestError",
status_code: HTTPStatus = HTTPStatus.BAD_REQUEST,
param: str | None = None,
) -> ErrorResponse:
"""Copied from OpenAIServing.create_error_response."""
exc: Exception | None = None
if isinstance(message, Exception):
exc = message
from vllm.exceptions import VLLMValidationError
if isinstance(exc, VLLMValidationError):
err_type = "BadRequestError"
status_code = HTTPStatus.BAD_REQUEST
param = exc.parameter
elif isinstance(exc, (ValueError, TypeError, RuntimeError, OverflowError)):
# Common validation errors from user input
err_type = "BadRequestError"
status_code = HTTPStatus.BAD_REQUEST
param = None
elif isinstance(exc, NotImplementedError):
err_type = "NotImplementedError"
status_code = HTTPStatus.NOT_IMPLEMENTED
param = None
elif exc.__class__.__name__ == "TemplateError":
# jinja2.TemplateError (avoid importing jinja2)
err_type = "BadRequestError"
status_code = HTTPStatus.BAD_REQUEST
param = None
else:
err_type = "InternalServerError"
status_code = HTTPStatus.INTERNAL_SERVER_ERROR
param = None
message = str(exc)
if self.log_error_stack:
exc_type, _, _ = sys.exc_info()
if exc_type is not None:
traceback.print_exc()
else:
traceback.print_stack()
return ErrorResponse(
error=ErrorInfo(
message=sanitize_message(message),
type=err_type,
code=status_code.value,
param=param,
)
)
def _is_model_supported(self, model_name: str) -> bool:
"""Simplified from OpenAIServing._is_model_supported (no LoRA support)."""
return model_name in self.served_model_names
async def _check_model(
self,
request: Any,
) -> ErrorResponse | None:
"""Simplified from OpenAIServing._check_model (no LoRA support)."""
if self._is_model_supported(request.model):
return None
return self.create_error_response(
message=f"The model `{request.model}` does not exist.",
err_type="NotFoundError",
status_code=HTTPStatus.NOT_FOUND,
param="model",
)
def _validate_chat_template(
self,
request_chat_template: str | None,
chat_template_kwargs: dict[str, Any] | None,
trust_request_chat_template: bool,
) -> ErrorResponse | None:
"""Copied from OpenAIServing._validate_chat_template."""
if not trust_request_chat_template and (
request_chat_template is not None
or (
chat_template_kwargs
and chat_template_kwargs.get("chat_template") is not None
)
):
return self.create_error_response(
"Chat template is passed with request, but "
"--trust-request-chat-template is not set. "
"Refused request with untrusted chat template."
)
return None
async def _preprocess_completion(
self,
request: Any,
prompt_input: str | list[str] | list[int] | list[list[int]] | None,
prompt_embeds: bytes | list[bytes] | None,
) -> list[ProcessorInputs]:
"""Copied from OpenAIServing._preprocess_completion."""
prompts = list[SingletonPrompt | bytes]()
if prompt_embeds is not None: # embeds take higher priority
prompts.extend(prompt_to_seq(prompt_embeds))
if prompt_input is not None:
prompts.extend(prompt_to_seq(prompt_input))
return await self._preprocess_cmpl(request, prompts)
async def _preprocess_cmpl(
self,
request: Any,
prompts: Sequence[PromptType | bytes],
) -> list[ProcessorInputs]:
"""Copied from OpenAIServing._preprocess_cmpl."""
renderer = self.renderer
model_config = self.model_config
parsed_prompts = [
(
prompt
if isinstance(prompt, bytes)
else parse_model_prompt(model_config, prompt)
)
for prompt in prompts
]
tok_params = request.build_tok_params(model_config)
return await renderer.render_cmpl_async(
parsed_prompts,
tok_params,
prompt_extras={
k: v
for k in ("mm_processor_kwargs", "cache_salt")
if (v := getattr(request, k, None)) is not None
},
)
async def _preprocess_chat(
self,
request: Any,
messages: list[Any],
default_template: str | None,
default_template_content_format: ChatTemplateContentFormatOption,
default_template_kwargs: dict[str, Any] | None,
tool_dicts: list[dict[str, Any]] | None = None,
tool_parser: Callable[[TokenizerLike], ToolParser] | None = None,
) -> tuple[list[ConversationMessage], list[ProcessorInputs]]:
"""Copied from OpenAIServing._preprocess_chat.
Differences: isinstance check is ChatCompletionRequest-only
(ResponsesRequest not supported here); TODO comment dropped accordingly.
"""
renderer = self.renderer
default_template_kwargs = merge_kwargs(
default_template_kwargs,
dict(
tools=tool_dicts,
tokenize=is_mistral_tokenizer(renderer.tokenizer),
),
)
tok_params = request.build_tok_params(self.model_config)
chat_params = request.build_chat_params(
default_template, default_template_content_format
).with_defaults(default_template_kwargs)
(conversation,), (engine_prompt,) = await renderer.render_chat_async(
[messages],
chat_params,
tok_params,
prompt_extras={
k: v
for k in ("mm_processor_kwargs", "cache_salt")
if (v := getattr(request, k, None)) is not None
},
)
# tool parsing is done only if a tool_parser has been set and if
# tool_choice is not "none" (if tool_choice is "none" but a tool_parser
# is set, we want to prevent parsing a tool_call hallucinated by the LLM
if tool_parser is not None:
tool_choice = getattr(request, "tool_choice", "none")
if tool_choice != "none":
if not isinstance(request, ChatCompletionRequest):
msg = (
"Tool usage is only supported "
" for ChatCompletionRequest, but got "
f"{type(request).__name__}"
)
raise NotImplementedError(msg)
tokenizer = renderer.get_tokenizer()
request = tool_parser(tokenizer).adjust_request(request=request) # type: ignore[arg-type]
return conversation, [engine_prompt]

View File

@@ -1,204 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
LaunchEngineClient: A lightweight EngineClient for GPU-less online serving.
This implements the EngineClient protocol without AsyncLLM or EngineCore,
enabling preprocessing (tokenization, rendering) and postprocessing
(detokenization) without GPU inference.
"""
from collections.abc import AsyncGenerator, Iterable, Mapping
from typing import Any
from vllm.config import VllmConfig
from vllm.engine.protocol import EngineClient, StreamingInput
from vllm.inputs import ProcessorInputs, PromptType
from vllm.logger import init_logger
from vllm.lora.request import LoRARequest
from vllm.outputs import PoolingRequestOutput, RequestOutput
from vllm.plugins.io_processors import get_io_processor
from vllm.pooling_params import PoolingParams
from vllm.renderers import renderer_from_config
from vllm.sampling_params import SamplingParams
from vllm.tasks import SupportedTask
from vllm.v1.engine import EngineCoreRequest, PauseMode
from vllm.v1.engine.input_processor import InputProcessor
logger = init_logger(__name__)
class LaunchEngineClient(EngineClient):
"""GPU-less EngineClient that only supports preprocessing/postprocessing.
This is a Null Object at the EngineClient level, bypassing AsyncLLM
entirely. It initializes renderer, io_processor, and input_processor
for tokenization and rendering, but raises NotImplementedError for
any inference-related operations.
"""
def __init__(
self,
vllm_config: VllmConfig,
) -> None:
self.vllm_config = vllm_config
self.model_config = vllm_config.model_config
self.renderer = renderer = renderer_from_config(self.vllm_config)
self.io_processor = get_io_processor(
self.vllm_config,
self.renderer,
self.model_config.io_processor_plugin,
)
# Convert TokPrompt --> EngineCoreRequest.
self.input_processor = InputProcessor(self.vllm_config, renderer)
@classmethod
def from_vllm_config(
cls,
vllm_config: VllmConfig,
) -> "LaunchEngineClient":
"""Create a LaunchEngineClient from a VllmConfig without GPU."""
return cls(
vllm_config=vllm_config,
)
# -- Task support --
async def get_supported_tasks(self) -> tuple[SupportedTask, ...]:
return ("render",)
# -- Inference (not supported) --
async def generate(
self,
prompt: EngineCoreRequest
| PromptType
| ProcessorInputs
| AsyncGenerator[StreamingInput, None],
sampling_params: SamplingParams,
request_id: str,
*,
prompt_text: str | None = None,
lora_request: LoRARequest | None = None,
tokenization_kwargs: dict[str, Any] | None = None,
trace_headers: Mapping[str, str] | None = None,
priority: int = 0,
data_parallel_rank: int | None = None,
reasoning_ended: bool | None = None,
) -> AsyncGenerator[RequestOutput, None]:
raise NotImplementedError(
"LaunchEngineClient does not support inference. "
"Use vllm serve for generation requests."
)
# yield is needed to make this an async generator
yield # type: ignore[misc] # pragma: no cover
# -- Request management (no-op) --
async def abort(
self, request_id: str | Iterable[str], internal: bool = False
) -> None:
pass
# -- Generation control (no-op) --
async def pause_generation(
self,
*,
mode: PauseMode = "abort",
wait_for_inflight_requests: bool | None = None,
clear_cache: bool = True,
) -> None:
pass
async def resume_generation(self) -> None:
pass
async def is_paused(self) -> bool:
return False
def shutdown(self, timeout: float | None = None) -> None:
pass
async def encode(
self,
prompt: PromptType | ProcessorInputs,
pooling_params: PoolingParams,
request_id: str,
lora_request: LoRARequest | None = None,
trace_headers: Mapping[str, str] | None = None,
priority: int = 0,
tokenization_kwargs: dict[str, Any] | None = None,
reasoning_ended: bool | None = None,
) -> AsyncGenerator[PoolingRequestOutput, None]:
raise NotImplementedError(
"LaunchEngineClient does not support inference. "
"Use vllm serve for encoding requests."
)
yield # type: ignore[misc] # pragma: no cover
# -- Observability (no-op / defaults) --
async def is_tracing_enabled(self) -> bool:
return False
async def do_log_stats(self) -> None:
pass
async def check_health(self) -> None:
pass
async def start_profile(self) -> None:
pass
async def stop_profile(self) -> None:
pass
# -- Cache management (no-op) --
async def reset_mm_cache(self) -> None:
pass
async def reset_prefix_cache(
self, reset_running_requests: bool = False, reset_connector: bool = False
) -> bool:
return True
async def reset_encoder_cache(self) -> None:
pass
# -- Power management (no-op) --
async def sleep(self, level: int = 1, mode: PauseMode = "abort") -> None:
pass
async def wake_up(self, tags: list[str] | None = None) -> None:
pass
async def is_sleeping(self) -> bool:
return False
# -- LoRA (not supported) --
async def add_lora(self, lora_request: LoRARequest) -> bool:
return False
# -- Status properties --
@property
def is_running(self) -> bool:
return True
@property
def is_stopped(self) -> bool:
return False
@property
def errored(self) -> bool:
return False
@property
def dead_error(self) -> BaseException:
return RuntimeError("LaunchEngineClient does not support inference")