[Refactor] [10/N] to simplify the vLLM openai completion serving architecture (#32369)
Signed-off-by: chaunceyjiang <chaunceyjiang@gmail.com>
This commit is contained in:
@@ -22,10 +22,10 @@ from typing import Any
|
||||
import model_hosting_container_standards.sagemaker as sagemaker_standards
|
||||
import pydantic
|
||||
import uvloop
|
||||
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request
|
||||
from fastapi import APIRouter, FastAPI, HTTPException, Request
|
||||
from fastapi.exceptions import RequestValidationError
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
from fastapi.responses import JSONResponse
|
||||
from starlette.concurrency import iterate_in_threadpool
|
||||
from starlette.datastructures import URL, Headers, MutableHeaders, State
|
||||
from starlette.types import ASGIApp, Message, Receive, Scope, Send
|
||||
@@ -33,36 +33,26 @@ from starlette.types import ASGIApp, Message, Receive, Scope, Send
|
||||
import vllm.envs as envs
|
||||
from vllm.engine.arg_utils import AsyncEngineArgs
|
||||
from vllm.engine.protocol import EngineClient
|
||||
from vllm.entrypoints.anthropic.protocol import (
|
||||
AnthropicError,
|
||||
AnthropicErrorResponse,
|
||||
AnthropicMessagesRequest,
|
||||
AnthropicMessagesResponse,
|
||||
)
|
||||
from vllm.entrypoints.anthropic.serving_messages import AnthropicServingMessages
|
||||
from vllm.entrypoints.anthropic.serving import AnthropicServingMessages
|
||||
from vllm.entrypoints.launcher import serve_http
|
||||
from vllm.entrypoints.logger import RequestLogger
|
||||
from vllm.entrypoints.openai.chat_completion.serving import OpenAIServingChat
|
||||
from vllm.entrypoints.openai.cli_args import make_arg_parser, validate_parsed_serve_args
|
||||
from vllm.entrypoints.openai.completion.serving import OpenAIServingCompletion
|
||||
from vllm.entrypoints.openai.engine.protocol import (
|
||||
CompletionRequest,
|
||||
CompletionResponse,
|
||||
ErrorInfo,
|
||||
ErrorResponse,
|
||||
)
|
||||
from vllm.entrypoints.openai.engine.serving import OpenAIServing
|
||||
from vllm.entrypoints.openai.orca_metrics import metrics_header
|
||||
from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses
|
||||
from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion
|
||||
from vllm.entrypoints.openai.serving_models import (
|
||||
BaseModelPath,
|
||||
from vllm.entrypoints.openai.models.protocol import BaseModelPath
|
||||
from vllm.entrypoints.openai.models.serving import (
|
||||
OpenAIServingModels,
|
||||
)
|
||||
from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses
|
||||
from vllm.entrypoints.openai.translations.serving import (
|
||||
OpenAIServingTranscription,
|
||||
OpenAIServingTranslation,
|
||||
)
|
||||
from vllm.entrypoints.openai.utils import validate_json_request
|
||||
from vllm.entrypoints.pooling.classify.serving import ServingClassification
|
||||
from vllm.entrypoints.pooling.embed.serving import OpenAIServingEmbedding
|
||||
from vllm.entrypoints.pooling.pooling.serving import OpenAIServingPooling
|
||||
@@ -75,12 +65,10 @@ from vllm.entrypoints.serve.tokenize.serving import OpenAIServingTokenization
|
||||
from vllm.entrypoints.tool_server import DemoToolServer, MCPToolServer, ToolServer
|
||||
from vllm.entrypoints.utils import (
|
||||
cli_env_setup,
|
||||
load_aware_call,
|
||||
log_non_default_args,
|
||||
process_chat_template,
|
||||
process_lora_modules,
|
||||
sanitize_message,
|
||||
with_cancellation,
|
||||
)
|
||||
from vllm.exceptions import VLLMValidationError
|
||||
from vllm.logger import init_logger
|
||||
@@ -99,7 +87,6 @@ prometheus_multiproc_dir: tempfile.TemporaryDirectory
|
||||
# Cannot use __name__ (https://github.com/vllm-project/vllm/pull/4765)
|
||||
logger = init_logger("vllm.entrypoints.openai.api_server")
|
||||
|
||||
ENDPOINT_LOAD_METRICS_FORMAT_HEADER_LABEL = "endpoint-load-metrics-format"
|
||||
|
||||
_running_tasks: set[asyncio.Task] = set()
|
||||
|
||||
@@ -231,22 +218,6 @@ def base(request: Request) -> OpenAIServing:
|
||||
return tokenization(request)
|
||||
|
||||
|
||||
def models(request: Request) -> OpenAIServingModels:
|
||||
return request.app.state.openai_serving_models
|
||||
|
||||
|
||||
def messages(request: Request) -> AnthropicServingMessages:
|
||||
return request.app.state.anthropic_serving_messages
|
||||
|
||||
|
||||
def chat(request: Request) -> OpenAIServingChat | None:
|
||||
return request.app.state.openai_serving_chat
|
||||
|
||||
|
||||
def completion(request: Request) -> OpenAIServingCompletion | None:
|
||||
return request.app.state.openai_serving_completion
|
||||
|
||||
|
||||
def tokenization(request: Request) -> OpenAIServingTokenization:
|
||||
return request.app.state.openai_serving_tokenization
|
||||
|
||||
@@ -278,116 +249,12 @@ async def get_server_load_metrics(request: Request):
|
||||
return JSONResponse(content={"server_load": request.app.state.server_load_metrics})
|
||||
|
||||
|
||||
@router.get("/v1/models")
|
||||
async def show_available_models(raw_request: Request):
|
||||
handler = models(raw_request)
|
||||
|
||||
models_ = await handler.show_available_models()
|
||||
return JSONResponse(content=models_.model_dump())
|
||||
|
||||
|
||||
@router.get("/version")
|
||||
async def show_version():
|
||||
ver = {"version": VLLM_VERSION}
|
||||
return JSONResponse(content=ver)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/v1/messages",
|
||||
dependencies=[Depends(validate_json_request)],
|
||||
responses={
|
||||
HTTPStatus.OK.value: {"content": {"text/event-stream": {}}},
|
||||
HTTPStatus.BAD_REQUEST.value: {"model": AnthropicErrorResponse},
|
||||
HTTPStatus.NOT_FOUND.value: {"model": AnthropicErrorResponse},
|
||||
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": AnthropicErrorResponse},
|
||||
},
|
||||
)
|
||||
@with_cancellation
|
||||
@load_aware_call
|
||||
async def create_messages(request: AnthropicMessagesRequest, raw_request: Request):
|
||||
def translate_error_response(response: ErrorResponse) -> JSONResponse:
|
||||
anthropic_error = AnthropicErrorResponse(
|
||||
error=AnthropicError(
|
||||
type=response.error.type,
|
||||
message=response.error.message,
|
||||
)
|
||||
)
|
||||
return JSONResponse(
|
||||
status_code=response.error.code, content=anthropic_error.model_dump()
|
||||
)
|
||||
|
||||
handler = messages(raw_request)
|
||||
if handler is None:
|
||||
error = base(raw_request).create_error_response(
|
||||
message="The model does not support Messages API"
|
||||
)
|
||||
return translate_error_response(error)
|
||||
|
||||
try:
|
||||
generator = await handler.create_messages(request, raw_request)
|
||||
except Exception as e:
|
||||
logger.exception("Error in create_messages: %s", e)
|
||||
return JSONResponse(
|
||||
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value,
|
||||
content=AnthropicErrorResponse(
|
||||
error=AnthropicError(
|
||||
type="internal_error",
|
||||
message=str(e),
|
||||
)
|
||||
).model_dump(),
|
||||
)
|
||||
|
||||
if isinstance(generator, ErrorResponse):
|
||||
return translate_error_response(generator)
|
||||
|
||||
elif isinstance(generator, AnthropicMessagesResponse):
|
||||
resp = generator.model_dump(exclude_none=True)
|
||||
logger.debug("Anthropic Messages Response: %s", resp)
|
||||
return JSONResponse(content=resp)
|
||||
|
||||
return StreamingResponse(content=generator, media_type="text/event-stream")
|
||||
|
||||
|
||||
@router.post(
|
||||
"/v1/completions",
|
||||
dependencies=[Depends(validate_json_request)],
|
||||
responses={
|
||||
HTTPStatus.OK.value: {"content": {"text/event-stream": {}}},
|
||||
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
||||
HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse},
|
||||
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
||||
},
|
||||
)
|
||||
@with_cancellation
|
||||
@load_aware_call
|
||||
async def create_completion(request: CompletionRequest, raw_request: Request):
|
||||
metrics_header_format = raw_request.headers.get(
|
||||
ENDPOINT_LOAD_METRICS_FORMAT_HEADER_LABEL, ""
|
||||
)
|
||||
handler = completion(raw_request)
|
||||
if handler is None:
|
||||
return base(raw_request).create_error_response(
|
||||
message="The model does not support Completions API"
|
||||
)
|
||||
|
||||
try:
|
||||
generator = await handler.create_completion(request, raw_request)
|
||||
except Exception as e:
|
||||
return handler.create_error_response(e)
|
||||
|
||||
if isinstance(generator, ErrorResponse):
|
||||
return JSONResponse(
|
||||
content=generator.model_dump(), status_code=generator.error.code
|
||||
)
|
||||
elif isinstance(generator, CompletionResponse):
|
||||
return JSONResponse(
|
||||
content=generator.model_dump(),
|
||||
headers=metrics_header(metrics_header_format),
|
||||
)
|
||||
|
||||
return StreamingResponse(content=generator, media_type="text/event-stream")
|
||||
|
||||
|
||||
def load_log_config(log_config_file: str | None) -> dict | None:
|
||||
if not log_config_file:
|
||||
return None
|
||||
@@ -486,7 +353,7 @@ def _extract_content_from_chunk(chunk_data: dict) -> str:
|
||||
from vllm.entrypoints.openai.chat_completion.protocol import (
|
||||
ChatCompletionStreamResponse,
|
||||
)
|
||||
from vllm.entrypoints.openai.engine.protocol import (
|
||||
from vllm.entrypoints.openai.completion.protocol import (
|
||||
CompletionStreamResponse,
|
||||
)
|
||||
|
||||
@@ -646,6 +513,22 @@ def build_app(args: Namespace) -> FastAPI:
|
||||
)
|
||||
|
||||
register_translations_api_router(app)
|
||||
|
||||
from vllm.entrypoints.openai.completion.api_router import (
|
||||
attach_router as register_completion_api_router,
|
||||
)
|
||||
|
||||
register_completion_api_router(app)
|
||||
from vllm.entrypoints.anthropic.api_router import (
|
||||
attach_router as register_anthropic_api_router,
|
||||
)
|
||||
|
||||
register_anthropic_api_router(app)
|
||||
from vllm.entrypoints.openai.models.api_router import (
|
||||
attach_router as register_models_api_router,
|
||||
)
|
||||
|
||||
register_models_api_router(app)
|
||||
from vllm.entrypoints.sagemaker.routes import register_sagemaker_routes
|
||||
|
||||
register_sagemaker_routes(router)
|
||||
|
||||
Reference in New Issue
Block a user