[Refactor] [8/N] to simplify the vLLM openai responsesapi_serving architecture (#32260)

Signed-off-by: chaunceyjiang <chaunceyjiang@gmail.com>
This commit is contained in:
Chauncey
2026-01-14 15:26:24 +08:00
committed by GitHub
parent 6388b50058
commit 9312a6c03a
21 changed files with 754 additions and 674 deletions

View File

@@ -14,7 +14,7 @@ import socket
import tempfile
import uuid
from argparse import Namespace
from collections.abc import AsyncGenerator, AsyncIterator, Awaitable
from collections.abc import AsyncIterator, Awaitable
from contextlib import asynccontextmanager
from http import HTTPStatus
from typing import Annotated, Any
@@ -49,9 +49,6 @@ from vllm.entrypoints.openai.engine.protocol import (
CompletionResponse,
ErrorInfo,
ErrorResponse,
ResponsesRequest,
ResponsesResponse,
StreamingResponsesResponse,
TranscriptionRequest,
TranscriptionResponseVariant,
TranslationRequest,
@@ -59,12 +56,12 @@ from vllm.entrypoints.openai.engine.protocol import (
)
from vllm.entrypoints.openai.engine.serving import OpenAIServing
from vllm.entrypoints.openai.orca_metrics import metrics_header
from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses
from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion
from vllm.entrypoints.openai.serving_models import (
BaseModelPath,
OpenAIServingModels,
)
from vllm.entrypoints.openai.serving_responses import OpenAIServingResponses
from vllm.entrypoints.openai.serving_transcription import (
OpenAIServingTranscription,
OpenAIServingTranslation,
@@ -311,112 +308,6 @@ async def show_version():
return JSONResponse(content=ver)
async def _convert_stream_to_sse_events(
generator: AsyncGenerator[StreamingResponsesResponse, None],
) -> AsyncGenerator[str, None]:
"""Convert the generator to a stream of events in SSE format"""
async for event in generator:
event_type = getattr(event, "type", "unknown")
# https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
event_data = (
f"event: {event_type}\ndata: {event.model_dump_json(indent=None)}\n\n"
)
yield event_data
@router.post(
"/v1/responses",
dependencies=[Depends(validate_json_request)],
responses={
HTTPStatus.OK.value: {"content": {"text/event-stream": {}}},
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse},
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
},
)
@with_cancellation
async def create_responses(request: ResponsesRequest, raw_request: Request):
handler = responses(raw_request)
if handler is None:
return base(raw_request).create_error_response(
message="The model does not support Responses API"
)
try:
generator = await handler.create_responses(request, raw_request)
except Exception as e:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
) from e
if isinstance(generator, ErrorResponse):
return JSONResponse(
content=generator.model_dump(), status_code=generator.error.code
)
elif isinstance(generator, ResponsesResponse):
return JSONResponse(content=generator.model_dump())
return StreamingResponse(
content=_convert_stream_to_sse_events(generator), media_type="text/event-stream"
)
@router.get("/v1/responses/{response_id}")
async def retrieve_responses(
response_id: str,
raw_request: Request,
starting_after: int | None = None,
stream: bool | None = False,
):
handler = responses(raw_request)
if handler is None:
return base(raw_request).create_error_response(
message="The model does not support Responses API"
)
try:
response = await handler.retrieve_responses(
response_id,
starting_after=starting_after,
stream=stream,
)
except Exception as e:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
) from e
if isinstance(response, ErrorResponse):
return JSONResponse(
content=response.model_dump(), status_code=response.error.code
)
elif isinstance(response, ResponsesResponse):
return JSONResponse(content=response.model_dump())
return StreamingResponse(
content=_convert_stream_to_sse_events(response), media_type="text/event-stream"
)
@router.post("/v1/responses/{response_id}/cancel")
async def cancel_responses(response_id: str, raw_request: Request):
handler = responses(raw_request)
if handler is None:
return base(raw_request).create_error_response(
message="The model does not support Responses API"
)
try:
response = await handler.cancel_responses(response_id)
except Exception as e:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
) from e
if isinstance(response, ErrorResponse):
return JSONResponse(
content=response.model_dump(), status_code=response.error.code
)
return JSONResponse(content=response.model_dump())
@router.post(
"/v1/messages",
dependencies=[Depends(validate_json_request)],
@@ -844,6 +735,12 @@ def build_app(args: Namespace) -> FastAPI:
)
register_chat_api_router(app)
from vllm.entrypoints.openai.responses.api_router import (
attach_router as register_responses_api_router,
)
register_responses_api_router(app)
from vllm.entrypoints.sagemaker.routes import register_sagemaker_routes
register_sagemaker_routes(router)