Generative Scoring (#34539)

Signed-off-by: Vedant Jhaveri <vjhaveri@linkedin.com>
Co-authored-by: Vedant Jhaveri <vjhaveri@linkedin.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Cyrus Leung <tlleungac@connect.ust.hk>
This commit is contained in:
Vedant V Jhaveri
2026-03-31 16:02:11 -07:00
committed by GitHub
parent 36f1dc19ae
commit 2e56975657
13 changed files with 1265 additions and 3 deletions

View File

@@ -73,8 +73,11 @@ In addition, we have the following custom APIs:
- [Cohere Embed API](../models/pooling_models/embed.md#cohere-embed-api) (`/v2/embed`)
- Compatible with [Cohere's Embed API](https://docs.cohere.com/reference/embed)
- Works with any [embedding model](../models/pooling_models/embed.md#supported-models), including multimodal models.
- [Score API](../models/pooling_models/scoring.md#score-api) (`/score`)
- Applicable to [score models](../models/pooling_models/scoring.md).
- [Score API](../models/pooling_models/scoring.md#score-api) (`/score`, `/v1/score`)
- Applicable to [score models](../models/pooling_models/scoring.md) (cross-encoder, bi-encoder, late-interaction).
- [Generative Scoring API](#generative-scoring-api) (`/generative_scoring`)
- Applicable to [CausalLM models](../models/generative_models.md) (task `"generate"`).
- Computes next-token probabilities for specified `label_token_ids`.
- [Rerank API](../models/pooling_models/scoring.md#rerank-api) (`/rerank`, `/v1/rerank`, `/v2/rerank`)
- Implements [Jina AI's v1 rerank API](https://jina.ai/reranker/)
- Also compatible with [Cohere's v1 & v2 rerank APIs](https://docs.cohere.com/v2/reference/rerank)
@@ -481,6 +484,71 @@ This approach is more robust than index-based access (`messages[0]`, `messages[1
Example template file: [examples/pooling/score/template/nemotron-rerank.jinja](../../examples/pooling/score/template/nemotron-rerank.jinja)
### Generative Scoring API
The `/generative_scoring` endpoint uses a CausalLM model (e.g., Llama, Qwen, Mistral) to compute the probability of specified token IDs appearing as the next token. Each item (document) is concatenated with the query to form a prompt, and the model predicts how likely each label token is as the next token after that prompt. This lets you score items against a query — for example, asking "Is this the capital of France?" and scoring each city by how likely the model is to answer "Yes".
This endpoint is automatically available when the server is started with a generative model (task `"generate"`). It is separate from the pooling-based [Score API](#score-api), which uses cross-encoder, bi-encoder, or late-interaction models.
**Requirements:**
- The `label_token_ids` parameter is **required** and must contain **at least 1 token ID**.
- When 2 label tokens are provided, the score equals `P(label_token_ids[0]) / (P(label_token_ids[0]) + P(label_token_ids[1]))` (softmax over the two labels).
- When more labels are provided, the score is the softmax-normalized probability of the first label token across all label tokens.
#### Example
```bash
curl -X POST http://localhost:8000/generative_scoring \
-H "Content-Type: application/json" \
-d '{
"model": "Qwen/Qwen3-0.6B",
"query": "Is this city the capital of France?",
"items": ["Paris", "London", "Berlin"],
"label_token_ids": [9454, 2753]
}'
```
Here, each item is appended to the query to form prompts like `"Is this city the capital of France? Paris"`, `"... London"`, etc. The model then predicts the next token, and the score reflects the probability of "Yes" (token 9454) vs "No" (token 2753).
??? console "Response"
```json
{
"id": "generative-scoring-abc123",
"object": "list",
"created": 1234567890,
"model": "Qwen/Qwen3-0.6B",
"data": [
{"index": 0, "object": "score", "score": 0.95},
{"index": 1, "object": "score", "score": 0.12},
{"index": 2, "object": "score", "score": 0.08}
],
"usage": {"prompt_tokens": 45, "total_tokens": 48, "completion_tokens": 3}
}
```
#### How it works
1. **Prompt Construction**: For each item, builds `prompt = query + item` (or `item + query` if `item_first=true`)
2. **Forward Pass**: Runs the model on each prompt to get next-token logits
3. **Probability Extraction**: Extracts logprobs for the specified `label_token_ids`
4. **Softmax Normalization**: Applies softmax over only the label tokens (when `apply_softmax=true`)
5. **Score**: Returns the normalized probability of the first label token
#### Finding Token IDs
To find the token IDs for your labels, use the tokenizer:
```python
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen3-0.6B")
yes_id = tokenizer.encode("Yes", add_special_tokens=False)[0]
no_id = tokenizer.encode("No", add_special_tokens=False)[0]
print(f"Yes: {yes_id}, No: {no_id}")
```
## Ray Serve LLM
Ray Serve LLM enables scalable, production-grade serving of the vLLM engine. It integrates tightly with vLLM and extends it with features such as auto-scaling, load balancing, and back-pressure.

View File

@@ -0,0 +1,2 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

View File

@@ -0,0 +1,325 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Tests for the Generative Scoring API.
Tests cover:
1. Protocol models (request/response construction)
2. Probability computation (softmax normalization)
3. Input validation
4. Score formula: P(token[0]) / (P(token[0]) + P(token[1]))
5. Prompt building and item ordering
"""
import math
from dataclasses import dataclass, field
from typing import Any
from unittest.mock import MagicMock
import pytest
from vllm.config.multimodal import MultiModalConfig
from vllm.entrypoints.openai.engine.protocol import ErrorResponse
from vllm.entrypoints.openai.generative_scoring.serving import (
GenerativeScoringItemResult,
GenerativeScoringRequest,
GenerativeScoringResponse,
OpenAIServingGenerativeScoring,
)
from vllm.entrypoints.openai.models.protocol import BaseModelPath
from vllm.entrypoints.openai.models.serving import OpenAIServingModels
from vllm.logprobs import Logprob
from vllm.outputs import CompletionOutput, RequestOutput
from vllm.tokenizers import get_tokenizer
from vllm.v1.engine.async_llm import AsyncLLM
MODEL_NAME = "Qwen/Qwen3-0.6B"
BASE_MODEL_PATHS = [BaseModelPath(name=MODEL_NAME, model_path=MODEL_NAME)]
@dataclass
class MockHFConfig:
model_type: str = "any"
@dataclass
class MockModelConfig:
task = "generate"
runner_type = "generate"
tokenizer = MODEL_NAME
trust_remote_code = False
tokenizer_mode = "auto"
max_model_len = 100
tokenizer_revision = None
multimodal_config = MultiModalConfig()
hf_config = MockHFConfig()
logits_processor_pattern = None
logits_processors: list[str] | None = None
diff_sampling_param: dict | None = None
allowed_local_media_path: str = ""
allowed_media_domains: list[str] | None = None
encoder_config = None
generation_config: str = "auto"
media_io_kwargs: dict[str, dict[str, Any]] = field(default_factory=dict)
skip_tokenizer_init = False
vocab_size = 151936
def get_diff_sampling_param(self):
return self.diff_sampling_param or {}
def get_vocab_size(self):
return self.vocab_size
def _create_mock_engine():
"""Create a mock AsyncLLM engine."""
mock_engine = MagicMock(spec=AsyncLLM)
mock_engine.get_tokenizer.return_value = get_tokenizer(MODEL_NAME)
mock_engine.errored = False
mock_engine.model_config = MockModelConfig()
mock_engine.input_processor = MagicMock()
mock_engine.io_processor = MagicMock()
# renderer is accessed by OpenAIServing.__init__ and serving.py
mock_renderer = MagicMock()
mock_renderer.tokenizer = get_tokenizer(MODEL_NAME)
mock_engine.renderer = mock_renderer
return mock_engine
def _create_serving(mock_engine) -> OpenAIServingGenerativeScoring:
"""Create an OpenAIServingGenerativeScoring instance with mocks."""
models = OpenAIServingModels(
engine_client=mock_engine,
base_model_paths=BASE_MODEL_PATHS,
)
return OpenAIServingGenerativeScoring(mock_engine, models, request_logger=None)
def _create_mock_request_output(logprobs_dict: dict[int, float]) -> RequestOutput:
"""Create a mock RequestOutput with specified logprobs."""
logprobs_with_objs = {
tid: Logprob(logprob=lp, rank=i + 1)
for i, (tid, lp) in enumerate(logprobs_dict.items())
}
completion_output = CompletionOutput(
index=0,
text="",
token_ids=[100],
cumulative_logprob=-1.0,
logprobs=[logprobs_with_objs],
finish_reason="length",
)
return RequestOutput(
request_id="test-request",
prompt="test prompt",
prompt_token_ids=[1, 2, 3],
prompt_logprobs=None,
outputs=[completion_output],
finished=True,
)
class TestProtocolModels:
"""Tests for GenerativeScoringRequest and GenerativeScoringResponse."""
def test_request_and_response_all_fields(self):
"""Test request construction with all field types and response structure."""
# Test request with string inputs
req_str = GenerativeScoringRequest(
query="Is this the capital?",
items=["Paris", "London"],
label_token_ids=[9454, 2753],
)
assert req_str.query == "Is this the capital?"
assert req_str.items == ["Paris", "London"]
assert req_str.label_token_ids == [9454, 2753]
assert req_str.apply_softmax is True # default
assert req_str.item_first is False # default
assert req_str.add_special_tokens is True # default
# Test request with pre-tokenized inputs and custom options
req_tok = GenerativeScoringRequest(
query=[100, 200, 300],
items=[[400, 500], [600, 700]],
label_token_ids=[1234, 5678],
apply_softmax=False,
item_first=True,
add_special_tokens=False,
)
assert req_tok.query == [100, 200, 300]
assert req_tok.items == [[400, 500], [600, 700]]
assert req_tok.apply_softmax is False
assert req_tok.item_first is True
assert req_tok.add_special_tokens is False
# Test response structure
response = GenerativeScoringResponse(
model="test-model",
data=[
GenerativeScoringItemResult(index=0, score=0.7),
GenerativeScoringItemResult(index=1, score=0.4),
],
usage={"prompt_tokens": 10, "total_tokens": 12, "completion_tokens": 2},
)
assert response.object == "list"
assert response.model == "test-model"
assert len(response.data) == 2
assert response.data[0].score == 0.7
assert response.data[0].object == "score"
assert response.data[1].score == 0.4
assert response.usage.prompt_tokens == 10
class TestProbabilityComputation:
"""Tests for _compute_probabilities with both softmax modes."""
@pytest.mark.parametrize(
"label_logprobs,apply_softmax,should_sum_to_one",
[
({100: -1.0, 200: -2.0}, True, True),
({100: -100.0, 200: -100.5}, True, True), # numerical stability
({100: -1.0, 200: -2.0}, False, False),
],
ids=["softmax_basic", "softmax_extreme_values", "true_probs"],
)
def test_compute_probabilities(
self, label_logprobs, apply_softmax, should_sum_to_one
):
"""Test probability computation for softmax and true probability modes."""
serving = OpenAIServingGenerativeScoring.__new__(OpenAIServingGenerativeScoring)
probs = serving._compute_probabilities(
label_logprobs, apply_softmax=apply_softmax
)
# Verify sum behavior
total = sum(probs.values())
if should_sum_to_one:
assert abs(total - 1.0) < 1e-6
else:
assert total < 1.0
# Verify math
if apply_softmax:
max_lp = max(label_logprobs.values())
exp_vals = {k: math.exp(v - max_lp) for k, v in label_logprobs.items()}
sum_exp = sum(exp_vals.values())
for tid, lp in label_logprobs.items():
assert abs(probs[tid] - exp_vals[tid] / sum_exp) < 1e-9
else:
for tid, lp in label_logprobs.items():
assert abs(probs[tid] - math.exp(lp)) < 1e-9
def test_score_formula(self):
"""Test the score formula: P(token[0]) / (P(token[0]) + P(token[1]))."""
serving = OpenAIServingGenerativeScoring.__new__(OpenAIServingGenerativeScoring)
# With logprobs -0.5 and -2.0, softmax gives higher prob to first token
logprobs = {9454: -0.5, 2753: -2.0}
probs = serving._compute_probabilities(logprobs, apply_softmax=True)
# Score = P(9454) / (P(9454) + P(2753)) = P(9454) since they sum to 1
score = probs[9454]
# Manual calculation
exp_0 = math.exp(-0.5)
exp_1 = math.exp(-2.0)
expected_score = exp_0 / (exp_0 + exp_1)
assert abs(score - expected_score) < 1e-9
assert score > 0.5 # First token has higher logprob, so higher probability
class TestValidation:
"""Tests for input validation errors."""
@pytest.mark.asyncio
@pytest.mark.parametrize(
"request_kwargs,expected_error",
[
(
{"query": "q", "items": ["i"], "label_token_ids": [999999, 999998]},
"out of vocabulary",
),
(
{"query": "q", "items": [], "label_token_ids": [100, 200]},
"at least one item",
),
],
ids=["invalid_token_id", "empty_items"],
)
async def test_validation_errors(self, request_kwargs, expected_error):
"""Test that invalid inputs return appropriate errors."""
mock_engine = _create_mock_engine()
serving = _create_serving(mock_engine)
request = GenerativeScoringRequest(model=MODEL_NAME, **request_kwargs)
result = await serving.create_generative_scoring(request, None)
assert isinstance(result, ErrorResponse)
assert expected_error in result.error.message.lower()
class TestPromptBuilding:
"""Tests for prompt construction and item ordering."""
@pytest.mark.asyncio
@pytest.mark.parametrize(
"item_first,expected",
[
(False, [[100, 101, 200, 201], [100, 101, 300, 301]]), # query + item
(True, [[200, 201, 100, 101], [300, 301, 100, 101]]), # item + query
],
ids=["query_first", "item_first"],
)
async def test_item_ordering(self, item_first, expected):
"""Test that item_first flag controls prompt concatenation order."""
mock_engine = _create_mock_engine()
serving = _create_serving(mock_engine)
request = GenerativeScoringRequest(
query=[100, 101],
items=[[200, 201], [300, 301]],
label_token_ids=[500, 501],
item_first=item_first,
)
engine_inputs, _ = await serving._build_prompts(
request, MagicMock(), max_model_len=4096
)
for i, exp in enumerate(expected):
assert engine_inputs[i]["prompt_token_ids"] == exp
class TestGeneration:
"""Tests for the full generation flow with mocked engine."""
@pytest.mark.asyncio
async def test_successful_generation(self):
"""Test successful score generation returns valid response."""
mock_engine = _create_mock_engine()
serving = _create_serving(mock_engine)
mock_logprobs = {1234: -0.5, 5678: -2.0, 100: -3.0}
mock_output = _create_mock_request_output(mock_logprobs)
async def mock_generate(*args, **kwargs):
yield mock_output
mock_engine.generate = mock_generate
request = GenerativeScoringRequest(
model=MODEL_NAME,
query="Is Paris the capital?",
items=["Yes", "No"],
label_token_ids=[1234, 5678],
)
result = await serving.create_generative_scoring(request, None)
assert isinstance(result, GenerativeScoringResponse)
assert len(result.data) == 2
for item_result in result.data:
assert 0.0 <= item_result.score <= 1.0
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,157 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""End-to-end tests for the Generative Scoring API.
Tests verify the full HTTP request/response flow using RemoteOpenAIServer.
"""
import pytest
import requests
from ....utils import RemoteOpenAIServer
MODEL_NAME = "Qwen/Qwen3-0.6B"
@pytest.fixture(scope="module")
def server():
args = [
"--dtype",
"bfloat16",
"--max-model-len",
"512",
"--enforce-eager",
"--max-num-seqs",
"32",
]
with RemoteOpenAIServer(MODEL_NAME, args) as remote_server:
yield remote_server
class TestGenerativeScoringAPI:
"""End-to-end tests for the Generative Scoring API."""
@pytest.mark.asyncio
async def test_basic_score_and_response_structure(self, server: RemoteOpenAIServer):
"""Test basic generative scoring request and verify response structure."""
response = requests.post(
server.url_for("generative_scoring"),
json={
"model": MODEL_NAME,
"query": "Is Paris the capital of France? Answer Yes or No: ",
"items": ["Paris is beautiful.", "London is rainy."],
"label_token_ids": [9454, 2753],
},
)
assert response.status_code == 200, f"Response: {response.text}"
data = response.json()
# Verify response structure
assert data["id"].startswith("generative-scoring-")
assert data["object"] == "list"
assert "model" in data
assert "usage" in data
assert len(data["data"]) == 2
# Verify each result
for i, result in enumerate(data["data"]):
assert result["index"] == i
assert result["object"] == "score"
assert 0.0 <= result["score"] <= 1.0
# Verify usage tracking
usage = data["usage"]
assert usage["prompt_tokens"] > 0
assert usage["completion_tokens"] > 0
assert (
usage["total_tokens"] == usage["prompt_tokens"] + usage["completion_tokens"]
)
@pytest.mark.asyncio
async def test_multiple_items(self, server: RemoteOpenAIServer):
"""Test generative scoring request with multiple items."""
response = requests.post(
server.url_for("generative_scoring"),
json={
"model": MODEL_NAME,
"query": "Is this city a capital? ",
"items": ["Paris", "London", "Berlin", "New York", "Tokyo"],
"label_token_ids": [9454, 2753],
},
)
assert response.status_code == 200
data = response.json()
assert len(data["data"]) == 5
@pytest.mark.asyncio
async def test_validation_missing_label_token_ids(self, server: RemoteOpenAIServer):
"""Test that missing label_token_ids returns a validation error."""
response = requests.post(
server.url_for("generative_scoring"),
json={
"model": MODEL_NAME,
"query": "Test query",
"items": ["item1", "item2"],
},
)
# Missing required field returns 400 (manual JSON parsing)
assert response.status_code == 400
@pytest.mark.asyncio
async def test_validation_empty_items(self, server: RemoteOpenAIServer):
"""Test that empty items returns an error."""
response = requests.post(
server.url_for("generative_scoring"),
json={
"model": MODEL_NAME,
"query": "Test query",
"items": [],
"label_token_ids": [100, 200],
},
)
assert response.status_code == 400
@pytest.mark.asyncio
@pytest.mark.parametrize(
"label_token_ids,expected_status",
[
([9999999999, 9999999998], 400), # Out of vocab range
],
ids=["invalid_token_ids"],
)
async def test_validation_errors(
self, server: RemoteOpenAIServer, label_token_ids, expected_status
):
"""Test validation errors for various invalid inputs."""
response = requests.post(
server.url_for("generative_scoring"),
json={
"model": MODEL_NAME,
"query": "Test query",
"items": ["item1"],
"label_token_ids": label_token_ids,
},
)
assert response.status_code == expected_status
@pytest.mark.asyncio
async def test_score_consistency(self, server: RemoteOpenAIServer):
"""Test that scores are deterministic across identical requests."""
request_body = {
"model": MODEL_NAME,
"query": "Is this consistent? ",
"items": ["Yes it is."],
"label_token_ids": [100, 200],
}
r1 = requests.post(server.url_for("generative_scoring"), json=request_body)
r2 = requests.post(server.url_for("generative_scoring"), json=request_body)
assert r1.status_code == 200 and r2.status_code == 200
r1_score = r1.json()["data"][0]["score"]
r2_score = r2.json()["data"][0]["score"]
assert abs(r1_score - r2_score) < 1e-6
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -246,6 +246,13 @@ def build_app(
register_pooling_api_routers(app, supported_tasks, model_config)
if "generate" in supported_tasks:
from vllm.entrypoints.openai.generative_scoring.api_router import (
register_generative_scoring_api_router,
)
register_generative_scoring_api_router(app)
app.root_path = args.root_path
app.add_middleware(
CORSMiddleware,
@@ -413,6 +420,13 @@ async def init_app_state(
init_pooling_state(engine_client, state, args, request_logger, supported_tasks)
if "generate" in supported_tasks:
from vllm.entrypoints.openai.generative_scoring.api_router import (
init_generative_scoring_state,
)
await init_generative_scoring_state(engine_client, state, args, request_logger)
state.enable_server_load_tracking = args.enable_server_load_tracking
state.server_load_metrics = 0

View File

@@ -0,0 +1,2 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

View File

@@ -0,0 +1,87 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from http import HTTPStatus
from typing import TYPE_CHECKING
from fastapi import APIRouter, Depends, FastAPI, Request
from fastapi.responses import JSONResponse
from vllm.entrypoints.openai.engine.protocol import ErrorResponse
from vllm.entrypoints.openai.generative_scoring.serving import (
GenerativeScoringResponse,
OpenAIServingGenerativeScoring,
)
from vllm.entrypoints.openai.utils import validate_json_request
from vllm.entrypoints.utils import load_aware_call, with_cancellation
from vllm.logger import init_logger
if TYPE_CHECKING:
from argparse import Namespace
from starlette.datastructures import State
from vllm.engine.protocol import EngineClient
from vllm.entrypoints.logger import RequestLogger
router = APIRouter()
logger = init_logger(__name__)
def generative_scoring(request: Request) -> OpenAIServingGenerativeScoring | None:
return request.app.state.serving_generative_scoring
@router.post(
"/generative_scoring",
dependencies=[Depends(validate_json_request)],
responses={
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
},
)
@with_cancellation
@load_aware_call
async def create_generative_scoring(raw_request: Request):
handler = generative_scoring(raw_request)
if handler is None:
raise NotImplementedError(
"The model does not support the Generative Scoring API"
)
raw_body = await raw_request.json()
from vllm.entrypoints.openai.generative_scoring.serving import (
GenerativeScoringRequest,
)
gen_request = GenerativeScoringRequest(**raw_body)
result = await handler.create_generative_scoring(gen_request, raw_request)
if isinstance(result, ErrorResponse):
return JSONResponse(content=result.model_dump(), status_code=result.error.code)
elif isinstance(result, GenerativeScoringResponse):
return JSONResponse(content=result.model_dump())
raise ValueError(f"Unexpected response type: {type(result)}")
def register_generative_scoring_api_router(app: FastAPI):
app.include_router(router)
async def init_generative_scoring_state(
engine_client: "EngineClient",
state: "State",
args: "Namespace",
request_logger: "RequestLogger | None",
):
from vllm.entrypoints.openai.generative_scoring.serving import (
OpenAIServingGenerativeScoring,
)
state.serving_generative_scoring = OpenAIServingGenerativeScoring(
engine_client,
state.openai_serving_models,
request_logger=request_logger,
)

View File

@@ -0,0 +1,491 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Generative Scoring implementation for generative models.
This module implements generative scoring functionality that computes the
probability of specified token IDs appearing as the next token after a
given query+item prompt. This works on any generative model that produces
logits (task="generate").
"""
import asyncio
import math
import time
from collections.abc import AsyncGenerator, Mapping
from typing import Literal
from fastapi import Request
from pydantic import Field
from vllm.engine.protocol import EngineClient
from vllm.entrypoints.logger import RequestLogger
from vllm.entrypoints.openai.engine.protocol import (
ErrorResponse,
OpenAIBaseModel,
UsageInfo,
)
from vllm.entrypoints.openai.engine.serving import OpenAIServing
from vllm.entrypoints.openai.models.serving import OpenAIServingModels
from vllm.inputs import EngineInput, tokens_input
from vllm.logger import init_logger
from vllm.outputs import RequestOutput
from vllm.sampling_params import SamplingParams
from vllm.tokenizers import TokenizerLike
from vllm.tracing import (
contains_trace_headers,
extract_trace_headers,
log_tracing_disabled_warning,
)
from vllm.utils import random_uuid
from vllm.utils.async_utils import merge_async_iterators
logger = init_logger(__name__)
# ============================================================================
# Protocol definitions
# ============================================================================
class GenerativeScoringRequest(OpenAIBaseModel):
"""Request for computing generative scoring.
Attributes:
model: The model to use for scoring. Optional, follows existing patterns.
query: The query text or pre-tokenized query token IDs.
items: The item text(s) or pre-tokenized item token IDs.
label_token_ids: List of token IDs to compute probabilities for.
apply_softmax: Whether to normalize probabilities using softmax over only
the label_token_ids (True) or return true model probabilities over
the full vocab for those ids (False).
item_first: If True, prepend items to query. Otherwise append items to query.
add_special_tokens: Whether to add special tokens when tokenizing.
"""
model: str | None = None
query: str | list[int] = Field(
...,
description="The query text or pre-tokenized query token IDs.",
)
items: list[str] | list[list[int]] = Field(
...,
description="List of item texts or pre-tokenized item token IDs.",
)
label_token_ids: list[int] = Field(
...,
description="List of token IDs to compute probabilities for.",
)
apply_softmax: bool = Field(
default=True,
description=(
"If True, normalize probabilities using softmax over only the "
"label_token_ids. If False, return the true model probabilities "
"over the full vocab for those ids."
),
)
item_first: bool = Field(
default=False,
description="If True, prepend items to query. Otherwise append items to query.",
)
add_special_tokens: bool = Field(
default=True,
description="Whether to add special tokens when tokenizing.",
)
priority: int = Field(
default=0,
description=(
"The priority of the request (lower means earlier handling; default: 0)."
),
)
request_id: str = Field(
default_factory=random_uuid,
description="The request_id related to this request.",
)
class GenerativeScoringItemResult(OpenAIBaseModel):
"""Result for a single item in the generative scoring response.
Attributes:
index: The index of this item in the input items list.
object: Type of object, always "score".
score: The probability score for the first label token.
"""
index: int
object: Literal["score"] = "score"
score: float
class GenerativeScoringResponse(OpenAIBaseModel):
"""Response from the generative scoring computation.
Attributes:
id: Unique identifier for this response.
object: Type of object, always "list".
created: Unix timestamp of when the response was created.
model: The model used for scoring.
data: List of scoring results, one per input item.
usage: Token usage information.
"""
id: str = Field(default="")
object: Literal["list"] = "list"
created: int = Field(default_factory=lambda: int(time.time()))
model: str
data: list[GenerativeScoringItemResult]
usage: UsageInfo
# ============================================================================
# Serving class
# ============================================================================
class OpenAIServingGenerativeScoring(OpenAIServing):
"""Serving class for generative scoring computation.
This class handles computing the probability of specified token IDs
appearing as the next token after concatenating query and item prompts.
The key operation is:
1. For each item, build a prompt: query + item (or item + query if item_first)
2. Run a forward pass to get the next token distribution
3. Extract probabilities for the specified label_token_ids
4. Normalize either over the full vocab (apply_softmax=False) or
over just the label_token_ids (apply_softmax=True)
"""
def __init__(
self,
engine_client: EngineClient,
models: OpenAIServingModels,
*,
request_logger: RequestLogger | None,
) -> None:
super().__init__(
engine_client=engine_client,
models=models,
request_logger=request_logger,
)
async def create_generative_scoring(
self,
request: GenerativeScoringRequest,
raw_request: Request | None = None,
) -> GenerativeScoringResponse | ErrorResponse:
"""Create generative scoring for the given request.
Args:
request: The GenerativeScoringRequest containing query, items, and
label_token_ids.
raw_request: The raw FastAPI request object.
Returns:
GenerativeScoringResponse with probabilities for each item, or
ErrorResponse if an error occurred.
"""
# Check model
error_check_ret = await self._check_model(request) # type: ignore[arg-type]
if error_check_ret is not None:
return error_check_ret
# Check if engine is alive
if self.engine_client.errored:
raise self.engine_client.dead_error
# Get tokenizer
tokenizer = self.renderer.tokenizer
if tokenizer is None:
return self.create_error_response(
"Tokenizer not available. Cannot process generative scoring request."
)
# Validate label_token_ids
vocab_size = self.model_config.get_vocab_size()
for token_id in request.label_token_ids:
if token_id < 0 or token_id >= vocab_size:
return self.create_error_response(
f"label_token_id {token_id} is out of vocabulary range "
f"[0, {vocab_size}). Please provide valid token IDs."
)
if len(request.label_token_ids) == 0:
return self.create_error_response(
"label_token_ids must contain at least one token ID."
)
# Validate items
if len(request.items) == 0:
return self.create_error_response("items must contain at least one item.")
# Note: Mixed item types (string and token list) are validated by
# Pydantic at request parsing time, so we don't need to check here.
try:
lora_request = self._maybe_get_adapters(request) # type: ignore[arg-type]
except (ValueError, TypeError, RuntimeError) as e:
logger.exception("Error preparing request components")
return self.create_error_response(e)
base_id = self._base_request_id(raw_request, default=request.request_id)
request_id = f"generative-scoring-{base_id}"
created_time = int(time.time())
# Build prompts for each item
try:
engine_inputs, prompt_token_counts = await self._build_prompts(
request, tokenizer, self.model_config.max_model_len
)
except (ValueError, TypeError) as e:
logger.exception("Error building prompts")
return self.create_error_response(e)
# Create sampling params for scoring
# We use max_tokens=1 with logprob_token_ids to efficiently get
# logprobs for only the specified label tokens (not full vocab)
# Note: temperature/top_k/top_p don't affect logprobs - they only
# affect the sampling distribution. Logprobs are computed from raw
# logits via log_softmax before any sampling transformations.
sampling_params = SamplingParams(
max_tokens=1,
logprobs=len(request.label_token_ids),
logprob_token_ids=request.label_token_ids,
n=1,
)
# Get trace headers
trace_headers = (
None
if raw_request is None
else await self._get_trace_headers(raw_request.headers)
)
# Schedule requests for all inputs
generators: list[AsyncGenerator[RequestOutput, None]] = []
for i, engine_input in enumerate(engine_inputs):
request_id_item = f"{request_id}-{i}"
self._log_inputs(
request_id_item,
engine_input,
params=sampling_params,
lora_request=lora_request,
)
generator = self.engine_client.generate(
engine_input,
sampling_params,
request_id_item,
lora_request=lora_request,
trace_headers=trace_headers,
priority=request.priority,
)
generators.append(generator)
# Collect results
result_generator = merge_async_iterators(*generators)
results: list[RequestOutput | None] = [None] * len(engine_inputs)
try:
async for i, res in result_generator:
results[i] = res
except asyncio.CancelledError:
return self.create_error_response("Client disconnected")
except Exception as e:
logger.exception("Error during generation")
return self.create_error_response(e)
# Process results to extract label token probabilities
item_results: list[GenerativeScoringItemResult] = []
total_prompt_tokens = 0
total_completion_tokens = 0
for i, result in enumerate(results):
if result is None:
return self.create_error_response(
f"Failed to generate result for item {i}"
)
# Check for errors
if result.outputs and result.outputs[0].finish_reason == "error":
return self.create_error_response(f"Generation error for item {i}")
# Get logprobs from the generated token
if not result.outputs or len(result.outputs) == 0:
return self.create_error_response(f"No output generated for item {i}")
output = result.outputs[0]
if output.logprobs is None or len(output.logprobs) == 0:
return self.create_error_response(
f"No logprobs available for item {i}. "
"This might indicate an issue with logprobs configuration."
)
# The logprobs dict maps token_id -> Logprob object
# For logprobs=-1, this contains all vocab tokens
logprobs_dict = output.logprobs[0]
# Extract logprobs for label tokens
label_logprobs: dict[int, float] = {}
missing_tokens = []
for token_id in request.label_token_ids:
if token_id in logprobs_dict:
label_logprobs[token_id] = logprobs_dict[token_id].logprob
else:
missing_tokens.append(token_id)
if missing_tokens:
return self.create_error_response(
f"Token IDs {missing_tokens} not found in logprobs for item {i}. "
"This might indicate the tokens are outside the model's vocabulary."
)
# Compute probabilities based on apply_softmax setting
token_probs = self._compute_probabilities(
label_logprobs,
apply_softmax=request.apply_softmax,
)
# Use the first label token's probability as the score
first_label_id = request.label_token_ids[0]
score = token_probs[first_label_id]
item_results.append(
GenerativeScoringItemResult(
index=i,
score=score,
)
)
# Update token counts
total_prompt_tokens += prompt_token_counts[i]
total_completion_tokens += len(output.token_ids)
# Build response
model_name = self.models.model_name(lora_request)
response = GenerativeScoringResponse(
id=request_id,
created=created_time,
model=model_name,
data=item_results,
usage=UsageInfo(
prompt_tokens=total_prompt_tokens,
total_tokens=total_prompt_tokens + total_completion_tokens,
completion_tokens=total_completion_tokens,
),
)
return response
async def _build_prompts(
self,
request: GenerativeScoringRequest,
tokenizer: TokenizerLike,
max_model_len: int,
) -> tuple[list[EngineInput], list[int]]:
"""Build prompts by concatenating query and items.
Uses the Renderer's tokenizer to tokenize text inputs, then
creates EngineInput via tokens_input() for engine consumption.
Args:
request: The request containing query, items, and settings.
tokenizer: The tokenizer to use.
max_model_len: Maximum model context length for truncation.
Returns:
Tuple of (list of EngineInput, list of prompt token counts).
"""
# Tokenize query if it's a string
if isinstance(request.query, str):
query_token_ids = tokenizer.encode(
request.query,
add_special_tokens=request.add_special_tokens,
)
else:
query_token_ids = request.query
engine_inputs: list[EngineInput] = []
prompt_token_counts: list[int] = []
for item in request.items:
# Tokenize item if it's a string
if isinstance(item, str):
# Don't add special tokens for items to avoid duplicate BOS/EOS
item_token_ids = tokenizer.encode(
item,
add_special_tokens=False,
)
else:
item_token_ids = item
# Concatenate based on item_first setting
if request.item_first:
prompt_token_ids = item_token_ids + query_token_ids
else:
prompt_token_ids = query_token_ids + item_token_ids
# Truncate to max_model_len - 1 to leave room for 1 output token
max_prompt_len = max_model_len - 1
if len(prompt_token_ids) > max_prompt_len:
prompt_token_ids = prompt_token_ids[:max_prompt_len]
engine_inputs.append(tokens_input(prompt_token_ids))
prompt_token_counts.append(len(prompt_token_ids))
return engine_inputs, prompt_token_counts
def _compute_probabilities(
self,
label_logprobs: dict[int, float],
apply_softmax: bool,
) -> dict[int, float]:
"""Compute probabilities from logprobs.
Args:
label_logprobs: Dictionary mapping token_id to logprob.
apply_softmax: If True, normalize over only the label tokens.
If False, return true model probabilities (exp(logprob)).
Returns:
Dictionary mapping token_id to probability.
"""
if apply_softmax:
# Normalize over only the label tokens (subset softmax)
# softmax(gathered_logits) over the subset
logprobs_list = list(label_logprobs.values())
max_logprob = max(logprobs_list)
# Compute exp(logprob - max) for numerical stability
exp_values = {
token_id: math.exp(logprob - max_logprob)
for token_id, logprob in label_logprobs.items()
}
sum_exp = sum(exp_values.values())
return {
token_id: exp_val / sum_exp for token_id, exp_val in exp_values.items()
}
else:
# Return true model probabilities
# Since logprobs are already log(softmax(logits)),
# we just need to exp() them
return {
token_id: math.exp(logprob)
for token_id, logprob in label_logprobs.items()
}
async def _get_trace_headers(
self,
headers: Mapping[str, str],
) -> Mapping[str, str] | None:
"""Extract trace headers from request headers."""
if not contains_trace_headers(headers):
return None
if not await self.engine_client.is_tracing_enabled():
log_tracing_disabled_warning()
return None
return extract_trace_headers(headers)

View File

@@ -232,6 +232,12 @@ class SamplingParams(
prompt_logprobs: int | None = None
"""Number of log probabilities to return per prompt token.
When set to -1, return all `vocab_size` log probabilities."""
logprob_token_ids: list[int] | None = None
"""Specific token IDs to return logprobs for. More efficient than
logprobs=-1 when you only need logprobs for a small set of tokens.
When set, logprobs for exactly these token IDs will be returned,
in addition to the sampled token. This is useful for scoring tasks
where you want to compare probabilities of specific label tokens."""
flat_logprobs: bool = False
"""Whether to return logprobs in flatten format (i.e. FlatLogprob)
for better performance.

View File

@@ -40,5 +40,10 @@ class SamplingMetadata:
# Loaded logits processors
logitsprocs: LogitsProcessors
# Specific token IDs to compute logprobs for (more efficient than full vocab)
# When set, logprobs are computed only for these token IDs using gather
# req_index -> list of token IDs to get logprobs for
logprob_token_ids: dict[int, list[int]] | None = None
# Speculative token ids
spec_token_ids: list[list[int]] | None = None

View File

@@ -13,6 +13,7 @@ from vllm.v1.sample.ops.bad_words import apply_bad_words
from vllm.v1.sample.ops.logprobs import batched_count_greater_than
from vllm.v1.sample.ops.penalties import apply_all_penalties
from vllm.v1.sample.ops.topk_topp_sampler import TopKTopPSampler
from vllm.v1.worker.gpu.sample.logprob import compute_token_logprobs
_SAMPLING_EPS = 1e-5
@@ -102,8 +103,16 @@ class Sampler(nn.Module):
# return int32 (while PyTorch argmax and topk return int64).
sampled = sampled.long()
# Handle logprob_token_ids if specified (more efficient than full vocab)
# This is used by generative_scoring API to get logprobs for specific tokens
logprob_token_ids_tensors = None
if sampling_metadata.logprob_token_ids:
logprob_token_ids_tensors = self.gather_specific_token_logprobs(
logits, sampling_metadata.logprob_token_ids, sampled
)
if num_logprobs is None:
logprobs_tensors = None
logprobs_tensors = logprob_token_ids_tensors
elif num_logprobs == -1:
# Return the full unsorted and unranked logprobs.
logprobs_tensors = LogprobsTensors(
@@ -115,6 +124,11 @@ class Sampler(nn.Module):
raw_logprobs, num_logprobs, token_ids=sampled
)
# If we have both num_logprobs and logprob_token_ids, prefer
# logprob_token_ids as it's more specific
if logprob_token_ids_tensors is not None and num_logprobs is not None:
logprobs_tensors = logprob_token_ids_tensors
# Use int32 to reduce the tensor size.
sampled = sampled.to(torch.int32)
@@ -128,6 +142,77 @@ class Sampler(nn.Module):
)
return sampler_output
def gather_specific_token_logprobs(
self,
logits: torch.Tensor,
logprob_token_ids: dict[int, list[int]],
sampled: torch.Tensor,
) -> LogprobsTensors | None:
"""Compute logprobs for specific token IDs using Triton kernel.
This method handles heterogeneous token ID lists across requests by
padding shorter lists to max length and using a fused Triton kernel
for efficient log_softmax + gather computation.
Benchmarks show the Triton kernel approach is ~1.4x faster than sparse
gather for batch sizes > 1 due to the fused kernel reducing memory
bandwidth requirements.
Args:
logits: [batch_size, vocab_size] tensor of logits
logprob_token_ids: dict mapping req_index -> list of token IDs
sampled: [batch_size] tensor of sampled token IDs
Returns:
LogprobsTensors with logprobs for the specified tokens, or None
if no requests have logprob_token_ids.
"""
if not logprob_token_ids:
return None
batch_size = logits.shape[0]
device = logits.device
# Find max number of tokens across all requests
max_num_tokens = max(len(tids) for tids in logprob_token_ids.values())
# Create padded token_ids tensor: [batch_size, max_num_tokens + 1]
# +1 for sampled token in first position
token_ids_tensor = torch.zeros(
batch_size, max_num_tokens + 1, dtype=torch.int64, device=device
)
token_ids_tensor[:, 0] = sampled # First column is sampled token
# Create mask for valid positions (True = valid, False = padded)
valid_mask = torch.zeros(
batch_size, max_num_tokens + 1, dtype=torch.bool, device=device
)
valid_mask[:, 0] = True # Sampled token is always valid
# Fill in token IDs for each request
for req_idx, token_ids in logprob_token_ids.items():
num_tokens = len(token_ids)
token_ids_tensor[req_idx, 1 : num_tokens + 1] = torch.tensor(
token_ids, dtype=torch.int64, device=device
)
valid_mask[req_idx, 1 : num_tokens + 1] = True
# Compute logprobs using the fused Triton kernel (log_softmax + gather)
logprobs = compute_token_logprobs(logits, token_ids_tensor)
# Mask invalid (padded) positions with -inf
logprobs = logprobs.masked_fill(~valid_mask, float("-inf"))
# Compute ranks for the sampled token
sampled_logits = logits.gather(-1, sampled.unsqueeze(-1))
token_ranks = (logits > sampled_logits).sum(dim=-1)
return LogprobsTensors(
logprob_token_ids=token_ids_tensor.to(torch.int32),
logprobs=logprobs,
selected_token_ranks=token_ranks,
)
@staticmethod
def apply_temperature(
logits: torch.Tensor,

View File

@@ -235,6 +235,10 @@ class InputBatch:
self.num_logprobs: dict[str, int] = {}
# req_id -> list of specific token IDs to compute logprobs for
# More efficient than num_logprobs=-1 when only a few tokens are needed
self.logprob_token_ids: dict[str, list[int]] = {}
# To accumulate prompt logprobs tensor chunks across prefill steps.
self.in_progress_prompt_logprobs_cpu: dict[str, LogprobsTensors] = {}
@@ -395,6 +399,10 @@ class InputBatch:
else sampling_params.logprobs
)
# Store specific token IDs to compute logprobs for (more efficient)
if sampling_params.logprob_token_ids is not None:
self.logprob_token_ids[req_id] = sampling_params.logprob_token_ids
if sampling_params.allowed_token_ids:
self.has_allowed_token_ids.add(req_id)
if self.allowed_token_ids_mask_cpu_tensor is None:
@@ -522,6 +530,7 @@ class InputBatch:
self.repetition_penalties_reqs.discard(req_id)
self.generators.pop(req_index, None)
self.num_logprobs.pop(req_id, None)
self.logprob_token_ids.pop(req_id, None)
self.in_progress_prompt_logprobs_cpu.pop(req_id, None)
if self.prev_req_id_to_index is not None:
self.prev_req_id_to_index.pop(req_id, None)
@@ -865,6 +874,15 @@ class InputBatch:
)
allowed_token_ids_mask = self.allowed_token_ids_mask[:num_reqs]
# Build per-request logprob_token_ids mapping: req_index -> token_ids
logprob_token_ids_by_index: dict[int, list[int]] | None = None
if self.logprob_token_ids:
logprob_token_ids_by_index = {}
for req_id, token_ids in self.logprob_token_ids.items():
if req_id in self.req_id_to_index:
req_index = self.req_id_to_index[req_id]
logprob_token_ids_by_index[req_index] = token_ids
return SamplingMetadata(
temperature=temperature,
all_greedy=self.all_greedy,
@@ -873,6 +891,7 @@ class InputBatch:
top_k=None if self.no_top_k else self.top_k[:num_reqs],
generators=self.generators,
max_num_logprobs=self.max_num_logprobs,
logprob_token_ids=logprob_token_ids_by_index,
prompt_token_ids=prompt_token_ids,
frequency_penalties=self.frequency_penalties[:num_reqs],
presence_penalties=self.presence_penalties[:num_reqs],

View File

@@ -5600,6 +5600,7 @@ class GPUModelRunner(
top_k=dummy_tensors(logits.size(1) - 1),
generators={},
max_num_logprobs=None,
logprob_token_ids=None,
no_penalties=True,
prompt_token_ids=None,
frequency_penalties=dummy_tensors(0.1),