[CI] Reorganize scoring tests (#38207)

Signed-off-by: wang.yuqi <yuqi.wang@daocloud.io>
This commit is contained in:
wang.yuqi
2026-03-26 20:07:01 +08:00
committed by GitHub
parent f2d16207c7
commit dcdc145893
20 changed files with 1595 additions and 975 deletions

View File

@@ -0,0 +1,114 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import weakref
import pytest
from tests.entrypoints.pooling.scoring.util import EncoderScoringHfRunner
from vllm import LLM
from vllm.distributed import cleanup_dist_env_and_memory
from vllm.platforms import current_platform
MODEL_NAME = "intfloat/multilingual-e5-small"
PROMPT = "The chef prepared a delicious meal."
EMBEDDING_SIZE = 384
TEXTS_1 = [
"What is the capital of France?",
"What is the capital of Germany?",
]
TEXTS_2 = [
"The capital of France is Paris.",
"The capital of Germany is Berlin.",
]
DTYPE = "half"
@pytest.fixture(scope="module")
def llm():
# ROCm: Use FLEX_ATTENTION backend as it's the only attention backend
# that supports encoder-only models on ROCm.
attention_config = None
if current_platform.is_rocm():
attention_config = {"backend": "FLEX_ATTENTION"}
# pytest caches the fixture so we use weakref.proxy to
# enable garbage collection
llm = LLM(
model=MODEL_NAME,
max_num_batched_tokens=32768,
tensor_parallel_size=1,
gpu_memory_utilization=0.75,
enforce_eager=True,
seed=0,
attention_config=attention_config,
)
yield weakref.proxy(llm)
del llm
cleanup_dist_env_and_memory()
@pytest.fixture(scope="module")
def hf_model():
return EncoderScoringHfRunner(MODEL_NAME)
@pytest.mark.skip_global_cleanup
def test_1_to_1(llm, hf_model):
text_pair = [TEXTS_1[0], TEXTS_2[0]]
hf_outputs = hf_model.predict([text_pair]).tolist()
vllm_outputs = [
output.outputs.score for output in llm.score(text_pair[0], text_pair[1])
]
assert len(vllm_outputs) == 1
assert len(hf_outputs) == 1
assert hf_outputs[0] == pytest.approx(vllm_outputs[0], rel=0.01)
@pytest.mark.skip_global_cleanup
def test_1_to_n(llm, hf_model):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[0], TEXTS_2[1]],
]
hf_outputs = hf_model.predict(text_pairs).tolist()
vllm_outputs = [output.outputs.score for output in llm.score(TEXTS_1[0], TEXTS_2)]
assert len(vllm_outputs) == 2
assert len(hf_outputs) == 2
assert hf_outputs[0] == pytest.approx(vllm_outputs[0], rel=0.01)
assert hf_outputs[1] == pytest.approx(vllm_outputs[1], rel=0.01)
@pytest.mark.skip_global_cleanup
def test_n_to_n(llm, hf_model):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[1], TEXTS_2[1]],
]
hf_outputs = hf_model.predict(text_pairs).tolist()
vllm_outputs = [output.outputs.score for output in llm.score(TEXTS_1, TEXTS_2)]
assert len(vllm_outputs) == 2
assert len(hf_outputs) == 2
assert hf_outputs[0] == pytest.approx(vllm_outputs[0], rel=0.01)
assert hf_outputs[1] == pytest.approx(vllm_outputs[1], rel=0.01)
def test_embed(llm):
outputs = llm.encode(PROMPT, pooling_task="embed", use_tqdm=False)
assert len(outputs) == 1
assert len(outputs[0].outputs.data) == EMBEDDING_SIZE

View File

@@ -0,0 +1,414 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
import requests
from tests.entrypoints.pooling.scoring.util import EncoderScoringHfRunner
from tests.utils import RemoteOpenAIServer
from vllm.entrypoints.pooling.pooling.protocol import PoolingResponse
from vllm.entrypoints.pooling.score.protocol import RerankResponse, ScoreResponse
from vllm.platforms import current_platform
MODEL_NAME = "BAAI/bge-base-en-v1.5"
input_text = "This product was excellent and exceeded my expectations"
DTYPE = "half"
EMBEDDING_SIZE = 768
TEXTS_1 = [
"What is the capital of France?",
"What is the capital of Germany?",
]
TEXTS_2 = [
"The capital of France is Paris.",
"The capital of Germany is Berlin.",
]
@pytest.fixture(scope="module")
def server():
args = ["--enforce-eager", "--max-model-len", "100", "--dtype", DTYPE]
# ROCm: Use Flex Attention to support encoder-only self-attention.
if current_platform.is_rocm():
args.extend(["--attention-backend", "FLEX_ATTENTION"])
with RemoteOpenAIServer(MODEL_NAME, args) as remote_server:
yield remote_server
@pytest.fixture(scope="module")
def hf_model():
return EncoderScoringHfRunner(MODEL_NAME)
@pytest.mark.asyncio
async def test_score_api_queries_str_1_documents_str_1(
hf_model, server: RemoteOpenAIServer
):
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": TEXTS_1[0],
"documents": TEXTS_2[0],
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 1
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict([[TEXTS_1[0], TEXTS_2[0]]]).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_score_api_queries_str_1_documents_str_n(
hf_model, server: RemoteOpenAIServer
):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[0], TEXTS_2[1]],
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": TEXTS_1[0],
"documents": TEXTS_2,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 2
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict(text_pairs).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_score_api_queries_str_n_documents_str_n(
hf_model, server: RemoteOpenAIServer
):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[1], TEXTS_2[1]],
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": TEXTS_1,
"documents": TEXTS_2,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 2
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict(text_pairs).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_score_api_queries_vs_documents(hf_model, server: RemoteOpenAIServer):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[1], TEXTS_2[1]],
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": TEXTS_1,
"documents": TEXTS_2,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 2
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict(text_pairs).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_score_api_queries_vs_items(hf_model, server: RemoteOpenAIServer):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[1], TEXTS_2[1]],
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": TEXTS_1,
"items": TEXTS_2,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 2
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict(text_pairs).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_score_api_text_1_vs_text_2(hf_model, server: RemoteOpenAIServer):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[1], TEXTS_2[1]],
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"text_1": TEXTS_1,
"text_2": TEXTS_2,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 2
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict(text_pairs).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_score_api_data_1_vs_data_2(hf_model, server: RemoteOpenAIServer):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[1], TEXTS_2[1]],
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"data_1": TEXTS_1,
"data_2": TEXTS_2,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 2
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict(text_pairs).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_rerank_api_texts(server: RemoteOpenAIServer):
query = "What is the capital of France?"
documents = [
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
]
rerank_response = requests.post(
server.url_for("rerank"),
json={
"model": MODEL_NAME,
"query": query,
"documents": documents,
},
)
rerank_response.raise_for_status()
rerank = RerankResponse.model_validate(rerank_response.json())
assert rerank.id is not None
assert rerank.results is not None
assert len(rerank.results) == 2
paris_result = next(r for r in rerank.results if r.index == 1)
brazil_result = next(r for r in rerank.results if r.index == 0)
assert paris_result.relevance_score > brazil_result.relevance_score
@pytest.mark.asyncio
async def test_rerank_api_top_n(server: RemoteOpenAIServer):
query = "What is the capital of France?"
documents = [
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
"Cross-encoder models are neat",
]
rerank_response = requests.post(
server.url_for("rerank"),
json={"model": MODEL_NAME, "query": query, "documents": documents, "top_n": 2},
)
rerank_response.raise_for_status()
rerank = RerankResponse.model_validate(rerank_response.json())
assert rerank.id is not None
assert rerank.results is not None
assert len(rerank.results) == 2
assert rerank.results[0].index == 1
@pytest.mark.asyncio
async def test_rerank_api_max_model_len(server: RemoteOpenAIServer):
query = "What is the capital of France?" * 100
documents = [
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
]
rerank_response = requests.post(
server.url_for("rerank"),
json={"model": MODEL_NAME, "query": query, "documents": documents},
)
assert rerank_response.status_code == 400
# Assert just a small fragments of the response
assert "Please reduce the length of the input prompt" in rerank_response.text
@pytest.mark.asyncio
async def test_score_api_max_model_len(server: RemoteOpenAIServer):
queries = "What is the capital of France?" * 20
documents = [
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": queries,
"documents": documents,
},
)
assert score_response.status_code == 400
# Assert just a small fragments of the response
assert "Please reduce the length of the input prompt" in score_response.text
# Test truncation
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": queries,
"documents": documents,
"truncate_prompt_tokens": 101,
},
)
assert score_response.status_code == 400
assert "Please request a smaller truncation size." in score_response.text
@pytest.mark.asyncio
async def test_invocations(server: RemoteOpenAIServer):
query = "What is the capital of France?"
documents = [
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
]
request_args = {
"model": MODEL_NAME,
"query": query,
"documents": documents,
}
rerank_response = requests.post(server.url_for("rerank"), json=request_args)
rerank_response.raise_for_status()
invocation_response = requests.post(
server.url_for("invocations"), json=request_args
)
invocation_response.raise_for_status()
rerank_output = rerank_response.json()
invocation_output = invocation_response.json()
assert rerank_output.keys() == invocation_output.keys()
for rerank_result, invocations_result in zip(
rerank_output["results"], invocation_output["results"]
):
assert rerank_result.keys() == invocations_result.keys()
assert rerank_result["relevance_score"] == pytest.approx(
invocations_result["relevance_score"], rel=0.01
)
@pytest.mark.asyncio
async def test_pooling_embed(server: RemoteOpenAIServer):
response = requests.post(
server.url_for("pooling"),
json={
"model": MODEL_NAME,
"input": input_text,
"encoding_format": "float",
"task": "embed",
},
)
poolings = PoolingResponse.model_validate(response.json())
assert len(poolings.data) == 1
assert len(poolings.data[0].data) == EMBEDDING_SIZE
@pytest.mark.asyncio
@pytest.mark.parametrize("task", ["classify", "token_classify", "plugin"])
async def test_pooling_not_supported(server: RemoteOpenAIServer, task: str):
response = requests.post(
server.url_for("pooling"),
json={
"model": MODEL_NAME,
"input": input_text,
"encoding_format": "float",
"task": task,
},
)
assert response.json()["error"]["type"] == "BadRequestError"
assert response.json()["error"]["message"].startswith(f"Unsupported task: {task!r}")

View File

@@ -0,0 +1,61 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import os
import pytest
from tests.models.language.pooling_mteb_test.mteb_score_utils import (
MTEB_RERANK_LANGS,
MTEB_RERANK_TASKS,
MTEB_RERANK_TOL,
RerankClientMtebEncoder,
ScoreClientMtebEncoder,
run_mteb_rerank,
)
from tests.utils import RemoteOpenAIServer
from vllm.platforms import current_platform
os.environ["VLLM_LOGGING_LEVEL"] = "WARNING"
MODEL_NAME = "cross-encoder/ms-marco-MiniLM-L-6-v2"
st_main_score = 0.33457
@pytest.fixture(scope="module")
def server():
args = ["--runner", "pooling", "--enforce-eager", "--disable-uvicorn-access-log"]
# ROCm: Use Flex Attention to support encoder-only self-attention.
if current_platform.is_rocm():
args.extend(["--attention-backend", "FLEX_ATTENTION"])
with RemoteOpenAIServer(MODEL_NAME, args) as remote_server:
yield remote_server
def test_mteb_score(server):
url = server.url_for("score")
encoder = ScoreClientMtebEncoder(MODEL_NAME, url)
vllm_main_score = run_mteb_rerank(encoder, MTEB_RERANK_TASKS, MTEB_RERANK_LANGS)
print("VLLM main score: ", vllm_main_score)
print("SentenceTransformer main score: ", st_main_score)
print("Difference: ", st_main_score - vllm_main_score)
# We are not concerned that the vllm mteb results are better
# than SentenceTransformers, so we only perform one-sided testing.
assert st_main_score - vllm_main_score < MTEB_RERANK_TOL
def test_mteb_rerank(server):
url = server.url_for("rerank")
encoder = RerankClientMtebEncoder(MODEL_NAME, url)
vllm_main_score = run_mteb_rerank(encoder, MTEB_RERANK_TASKS, MTEB_RERANK_LANGS)
print("VLLM main score: ", vllm_main_score)
print("SentenceTransformer main score: ", st_main_score)
print("Difference: ", st_main_score - vllm_main_score)
# We are not concerned that the vllm mteb results are better
# than SentenceTransformers, so we only perform one-sided testing.
assert st_main_score - vllm_main_score < MTEB_RERANK_TOL

View File

@@ -0,0 +1,137 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import weakref
import pytest
import torch
from tests.models.utils import softmax
from vllm import LLM, PoolingParams
from vllm.distributed import cleanup_dist_env_and_memory
from vllm.platforms import current_platform
MODEL_NAME = "tomaarsen/Qwen3-Reranker-0.6B-seq-cls"
PROMPT = "The chef prepared a delicious meal."
TEXTS_1 = [
"What is the capital of France?",
"What is the capital of Germany?",
]
TEXTS_2 = [
"The capital of France is Paris.",
"The capital of Germany is Berlin.",
]
@pytest.fixture(scope="module")
def llm():
# ROCm: Use FLEX_ATTENTION backend as it's the only attention backend
# that supports encoder-only models on ROCm.
attention_config = None
if current_platform.is_rocm():
attention_config = {"backend": "FLEX_ATTENTION"}
# pytest caches the fixture so we use weakref.proxy to
# enable garbage collection
llm = LLM(
model=MODEL_NAME,
max_num_batched_tokens=32768,
tensor_parallel_size=1,
gpu_memory_utilization=0.75,
enforce_eager=True,
seed=0,
attention_config=attention_config,
)
yield weakref.proxy(llm)
del llm
cleanup_dist_env_and_memory()
@pytest.fixture(scope="module")
def hf_model(hf_runner):
return hf_runner(MODEL_NAME, is_cross_encoder=True)
@pytest.mark.skip_global_cleanup
def test_1_to_1(llm, hf_model):
text_pair = [TEXTS_1[0], TEXTS_2[0]]
hf_outputs = hf_model.predict([text_pair]).tolist()
vllm_outputs = [
output.outputs.score for output in llm.score(text_pair[0], text_pair[1])
]
assert len(vllm_outputs) == 1
assert len(hf_outputs) == 1
assert hf_outputs[0] == pytest.approx(vllm_outputs[0], rel=0.01)
@pytest.mark.skip_global_cleanup
def test_1_to_n(llm, hf_model):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[0], TEXTS_2[1]],
]
vllm_outputs = [output.outputs.score for output in llm.score(TEXTS_1[0], TEXTS_2)]
hf_outputs = hf_model.predict(text_pairs).tolist()
assert len(vllm_outputs) == 2
assert len(hf_outputs) == 2
assert hf_outputs[0] == pytest.approx(vllm_outputs[0], rel=0.01)
assert hf_outputs[1] == pytest.approx(vllm_outputs[1], rel=0.01)
@pytest.mark.skip_global_cleanup
def test_n_to_n(llm, hf_model):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[1], TEXTS_2[1]],
]
vllm_outputs = [output.outputs.score for output in llm.score(TEXTS_1, TEXTS_2)]
hf_outputs = hf_model.predict(text_pairs).tolist()
assert len(vllm_outputs) == 2
assert len(hf_outputs) == 2
assert hf_outputs[0] == pytest.approx(vllm_outputs[0], rel=0.01)
assert hf_outputs[1] == pytest.approx(vllm_outputs[1], rel=0.01)
@pytest.mark.skip_global_cleanup
def test_classify(llm):
outputs = llm.encode(PROMPT, pooling_task="classify", use_tqdm=False)
assert len(outputs) == 1
assert len(outputs[0].outputs.data) == 1
def test_pooling_params(llm: LLM):
def get_outputs(use_activation):
outputs = llm.score(
TEXTS_1[0],
TEXTS_2[0],
pooling_params=PoolingParams(use_activation=use_activation),
use_tqdm=False,
)
return torch.tensor([x.outputs.score for x in outputs])
default = get_outputs(use_activation=None)
w_activation = get_outputs(use_activation=True)
wo_activation = get_outputs(use_activation=False)
assert torch.allclose(default, w_activation, atol=1e-2), (
"Default should use activation."
)
assert not torch.allclose(w_activation, wo_activation, atol=1e-2), (
"wo_activation should not use activation."
)
assert torch.allclose(softmax(wo_activation), w_activation, atol=1e-2), (
"w_activation should be close to activation(wo_activation)."
)

View File

@@ -0,0 +1,487 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
import requests
import torch
import torch.nn.functional as F
from tests.utils import RemoteOpenAIServer
from vllm.entrypoints.pooling.pooling.protocol import PoolingResponse
from vllm.entrypoints.pooling.score.protocol import RerankResponse, ScoreResponse
from vllm.platforms import current_platform
MODEL_NAME = "BAAI/bge-reranker-base"
DTYPE = "half"
input_text = "This product was excellent and exceeded my expectations"
input_tokens = [0, 3293, 12996, 509, 40881, 136, 204839, 297, 759, 202702, 2]
TEXTS_1 = [
"What is the capital of France?",
"What is the capital of Germany?",
]
TEXTS_2 = [
"The capital of France is Paris.",
"The capital of Germany is Berlin.",
]
@pytest.fixture(scope="module")
def server():
args = ["--enforce-eager", "--max-model-len", "100", "--dtype", DTYPE]
# ROCm: Use Flex Attention to support encoder-only self-attention.
if current_platform.is_rocm():
args.extend(["--attention-backend", "FLEX_ATTENTION"])
with RemoteOpenAIServer(MODEL_NAME, args) as remote_server:
yield remote_server
@pytest.fixture(scope="module")
def hf_model(hf_runner):
return hf_runner(MODEL_NAME, is_cross_encoder=True)
@pytest.mark.asyncio
async def test_basic(server: RemoteOpenAIServer):
# test /v1/models
response = requests.get(server.url_for("/v1/models"))
served_model = response.json()["data"][0]["id"]
assert served_model == MODEL_NAME
# test /tokenize
response = requests.post(
server.url_for("/tokenize"),
json={"model": MODEL_NAME, "prompt": input_text},
)
assert response.json()["tokens"] == input_tokens
@pytest.mark.asyncio
async def test_score_api_queries_str_1_documents_str_1(
hf_model, server: RemoteOpenAIServer
):
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": TEXTS_1[0],
"documents": TEXTS_2[0],
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 1
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict([[TEXTS_1[0], TEXTS_2[0]]]).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_score_api_queries_str_1_documents_str_n(
hf_model, server: RemoteOpenAIServer
):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[0], TEXTS_2[1]],
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": TEXTS_1[0],
"documents": TEXTS_2,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 2
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict(text_pairs).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_score_api_queries_str_n_documents_str_n(
hf_model, server: RemoteOpenAIServer
):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[1], TEXTS_2[1]],
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": TEXTS_1,
"documents": TEXTS_2,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 2
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict(text_pairs).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_score_api_queries_vs_documents(hf_model, server: RemoteOpenAIServer):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[1], TEXTS_2[1]],
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": TEXTS_1,
"documents": TEXTS_2,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 2
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict(text_pairs).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_score_api_queries_vs_items(hf_model, server: RemoteOpenAIServer):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[1], TEXTS_2[1]],
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": TEXTS_1,
"items": TEXTS_2,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 2
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict(text_pairs).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_score_api_text_1_vs_text_2(hf_model, server: RemoteOpenAIServer):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[1], TEXTS_2[1]],
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"text_1": TEXTS_1,
"text_2": TEXTS_2,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 2
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict(text_pairs).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_score_api_data_1_vs_data_2(hf_model, server: RemoteOpenAIServer):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[1], TEXTS_2[1]],
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"data_1": TEXTS_1,
"data_2": TEXTS_2,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 2
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict(text_pairs).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_rerank_api_texts(server: RemoteOpenAIServer):
query = "What is the capital of France?"
documents = [
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
]
rerank_response = requests.post(
server.url_for("rerank"),
json={
"model": MODEL_NAME,
"query": query,
"documents": documents,
},
)
rerank_response.raise_for_status()
rerank = RerankResponse.model_validate(rerank_response.json())
assert rerank.id is not None
assert rerank.results is not None
assert len(rerank.results) == 2
assert rerank.results[0].relevance_score >= 0.9
assert rerank.results[1].relevance_score <= 0.01
@pytest.mark.asyncio
async def test_rerank_api_top_n(server: RemoteOpenAIServer):
query = "What is the capital of France?"
documents = [
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
"Cross-encoder models are neat",
]
rerank_response = requests.post(
server.url_for("rerank"),
json={"model": MODEL_NAME, "query": query, "documents": documents, "top_n": 2},
)
rerank_response.raise_for_status()
rerank = RerankResponse.model_validate(rerank_response.json())
assert rerank.id is not None
assert rerank.results is not None
assert len(rerank.results) == 2
assert rerank.results[0].relevance_score >= 0.9
assert rerank.results[1].relevance_score <= 0.01
@pytest.mark.asyncio
async def test_rerank_api_max_model_len(server: RemoteOpenAIServer):
query = "What is the capital of France?" * 100
documents = [
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
]
rerank_response = requests.post(
server.url_for("rerank"),
json={"model": MODEL_NAME, "query": query, "documents": documents},
)
assert rerank_response.status_code == 400
# Assert just a small fragments of the response
assert "Please reduce the length of the input prompt" in rerank_response.text
@pytest.mark.asyncio
async def test_score_api_max_model_len(server: RemoteOpenAIServer):
queries = "What is the capital of France?" * 20
documents = [
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": queries,
"documents": documents,
},
)
assert score_response.status_code == 400
# Assert just a small fragments of the response
assert "Please reduce the length of the input prompt" in score_response.text
# Test truncation
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": queries,
"documents": documents,
"truncate_prompt_tokens": 101,
},
)
assert score_response.status_code == 400
assert "Please request a smaller truncation size." in score_response.text
@pytest.mark.asyncio
async def test_invocations(server: RemoteOpenAIServer):
query = "What is the capital of France?"
documents = [
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
]
request_args = {
"model": MODEL_NAME,
"query": query,
"documents": documents,
}
rerank_response = requests.post(server.url_for("rerank"), json=request_args)
rerank_response.raise_for_status()
invocation_response = requests.post(
server.url_for("invocations"), json=request_args
)
invocation_response.raise_for_status()
rerank_output = rerank_response.json()
invocation_output = invocation_response.json()
assert rerank_output.keys() == invocation_output.keys()
for rerank_result, invocations_result in zip(
rerank_output["results"], invocation_output["results"]
):
assert rerank_result.keys() == invocations_result.keys()
assert rerank_result["relevance_score"] == pytest.approx(
invocations_result["relevance_score"], rel=0.01
)
@pytest.mark.asyncio
async def test_use_activation(server: RemoteOpenAIServer):
async def get_outputs(use_activation):
query = "What is the capital of France?"
documents = [
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
]
response = requests.post(
server.url_for("rerank"),
json={
"model": MODEL_NAME,
"query": query,
"documents": documents,
"use_activation": use_activation,
},
)
outputs = response.json()
return torch.tensor([x["relevance_score"] for x in outputs["results"]])
default = await get_outputs(use_activation=None)
w_activation = await get_outputs(use_activation=True)
wo_activation = await get_outputs(use_activation=False)
assert torch.allclose(default, w_activation, atol=1e-2), (
"Default should use activation."
)
assert not torch.allclose(w_activation, wo_activation, atol=1e-2), (
"wo_activation should not use activation."
)
assert torch.allclose(F.sigmoid(wo_activation), w_activation, atol=1e-2), (
"w_activation should be close to activation(wo_activation)."
)
@pytest.mark.asyncio
async def test_pooling_classify(server: RemoteOpenAIServer):
response = requests.post(
server.url_for("pooling"),
json={
"model": MODEL_NAME,
"input": input_text,
"encoding_format": "float",
"task": "classify",
},
)
poolings = PoolingResponse.model_validate(response.json())
assert len(poolings.data) == 1
assert len(poolings.data[0].data) == 1
@pytest.mark.asyncio
async def test_pooling_token_classify(server: RemoteOpenAIServer):
response = requests.post(
server.url_for("pooling"),
json={
"model": MODEL_NAME,
"task": "token_classify",
"input": input_text,
"encoding_format": "float",
},
)
poolings = PoolingResponse.model_validate(response.json())
assert len(poolings.data) == 1
assert len(poolings.data[0].data) == len(input_tokens)
assert len(poolings.data[0].data[0]) == 1
@pytest.mark.asyncio
@pytest.mark.parametrize("task", ["embed", "token_embed", "plugin"])
async def test_pooling_not_supported(server: RemoteOpenAIServer, task: str):
response = requests.post(
server.url_for("pooling"),
json={
"model": MODEL_NAME,
"input": input_text,
"encoding_format": "float",
"task": task,
},
)
assert response.json()["error"]["type"] == "BadRequestError"
assert response.json()["error"]["message"].startswith(f"Unsupported task: {task!r}")

View File

@@ -0,0 +1,365 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import json
import pytest
import requests
from tests.utils import VLLM_PATH, RemoteOpenAIServer
from vllm.entrypoints.pooling.score.protocol import RerankResponse, ScoreResponse
from vllm.multimodal.utils import encode_image_url, fetch_image
from vllm.platforms import current_platform
MODEL_NAME = "Qwen/Qwen3-VL-Reranker-2B"
HF_OVERRIDES = {
"architectures": ["Qwen3VLForSequenceClassification"],
"classifier_from_token": ["no", "yes"],
"is_original_qwen3_reranker": True,
}
ROCM_ATTN_BACKENDS = [
"ROCM_ATTN",
"ROCM_AITER_FA",
"TRITON_ATTN",
"FLEX_ATTENTION",
]
ATTN_BACKENDS = ROCM_ATTN_BACKENDS if current_platform.is_rocm() else ["auto"]
# Per-backend tolerance with explicit entries; "default" is the fallback
BACKEND_TOL: dict[str, float] = {
"default": 0.05, # 5% tolerance for other backends (e.g. FLASH_ATTN)
# Relaxed tolerances for ROCm attn
# See: https://github.com/vllm-project/vllm/issues/35569
"ROCM_ATTN": 0.09, # gfx950:~8.45%, gfx942:~3.70%
"ROCM_AITER_FA": 0.045, # gfx950:~2.00%, gfx942:~0.80%
"TRITON_ATTN": 0.045, # gfx950:~3.00%, gfx942:~2.20%
"FLEX_ATTENTION": 0.045, # gfx950:~3.25%, gfx942:~1.10%
}
# ROCm: disable skinny GEMM to avoid non-deterministic results from
# atomic reductions in wvSplitKrc kernel.
# See: https://github.com/vllm-project/vllm/pull/33493#issuecomment-3906083975
ROCM_ENV_OVERRIDES = (
{"VLLM_ROCM_USE_SKINNY_GEMM": "0"} if current_platform.is_rocm() else {}
)
# ROCm: disable prefix caching and eliminate batch variance to reduce
# test flakiness.
ROCM_EXTRA_ARGS = (
["--no-enable-prefix-caching", "--max-num-seqs", "1"]
if current_platform.is_rocm()
else []
)
def get_tol(backend: str) -> float:
return BACKEND_TOL.get(backend, BACKEND_TOL["default"])
def assert_score(actual: float, expected: float, backend: str, label: str):
tol = get_tol(backend)
diff = abs(actual - expected)
rel_diff = diff / abs(expected) if expected != 0 else diff
print(
f"[{backend}] {label}: actual={actual:.6f} expected={expected:.6f} "
f"diff={diff:.6f} rel_diff={rel_diff:.4f} tol={tol}"
)
assert actual == pytest.approx(expected, rel=tol), (
f"[{backend}] {label}: score mismatch — "
f"actual={actual:.6f}, expected={expected:.6f}, "
f"rel_diff={rel_diff:.4f}, tol={tol}"
)
query = "A cat standing in the snow."
document = "This product was excellent and exceeded my expectations."
image_url = "https://vllm-public-assets.s3.us-west-2.amazonaws.com/multimodal_asset/cat_snow.jpg"
documents = [
{
"type": "text",
"text": document,
},
{
"type": "image_url",
"image_url": {"url": image_url},
},
{
"type": "image_url",
"image_url": {"url": encode_image_url(fetch_image(image_url))},
},
]
TEXT_VS_TEXT = 0.10040374100208282
TEXT_VS_IMAGE = 0.7423753142356873
TEXT_VS_TEXT_PLUS_IMAGE = 0.5298863053321838
@pytest.fixture(scope="module", params=ATTN_BACKENDS)
def server(request):
backend = request.param
print(f"\n=== Starting server with attention backend: {backend} ===")
args = [
"--enforce-eager",
"--max-model-len",
"8192",
"--chat-template",
str(VLLM_PATH / "examples/pooling/score/template/qwen3_vl_reranker.jinja"),
]
env = dict()
if backend != "auto":
args += ["--attention-config", json.dumps({"backend": backend})]
args += ROCM_EXTRA_ARGS
env = dict(ROCM_ENV_OVERRIDES)
if backend != "ROCM_AITER_FA":
env["VLLM_ROCM_USE_AITER"] = "0"
with RemoteOpenAIServer(
MODEL_NAME, args, override_hf_configs=HF_OVERRIDES, env_dict=env
) as remote_server:
print(f"=== Server ready with backend: {backend} ===")
yield remote_server, backend
@pytest.mark.asyncio
async def test_score_api_queries_str_documents_str(
server: tuple[RemoteOpenAIServer, str],
):
remote_server, backend = server
score_response = requests.post(
remote_server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": query,
"documents": document,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 1
assert score.usage.prompt_tokens == 81
assert_score(score.data[0].score, TEXT_VS_TEXT, backend, "text_vs_text")
@pytest.mark.asyncio
async def test_score_api_queries_str_documents_text_content(
server: tuple[RemoteOpenAIServer, str],
):
remote_server, backend = server
score_response = requests.post(
remote_server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": query,
"documents": {"content": [documents[0]]},
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 1
assert score.usage.prompt_tokens == 81
assert_score(score.data[0].score, TEXT_VS_TEXT, backend, "text_vs_text")
@pytest.mark.asyncio
async def test_score_api_queries_str_documents_image_url_content(
server: tuple[RemoteOpenAIServer, str],
):
remote_server, backend = server
score_response = requests.post(
remote_server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": query,
"documents": {"content": [documents[1]]},
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 1
assert score.usage.prompt_tokens == 98
assert_score(score.data[0].score, TEXT_VS_IMAGE, backend, "text_vs_image")
@pytest.mark.asyncio
async def test_score_api_queries_str_documents_image_base64_content(
server: tuple[RemoteOpenAIServer, str],
):
remote_server, backend = server
score_response = requests.post(
remote_server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": query,
"documents": {"content": [documents[2]]},
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 1
assert score.usage.prompt_tokens == 98
assert_score(score.data[0].score, TEXT_VS_IMAGE, backend, "text_vs_image_base64")
@pytest.mark.asyncio
async def test_score_api_queries_str_documents_image_url_plus_text_content(
server: tuple[RemoteOpenAIServer, str],
):
remote_server, backend = server
score_response = requests.post(
remote_server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": query,
"documents": {"content": [documents[0], documents[1]]},
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 1
assert score.usage.prompt_tokens == 108
assert_score(
score.data[0].score, TEXT_VS_TEXT_PLUS_IMAGE, backend, "text_vs_text_plus_image"
)
@pytest.mark.asyncio
async def test_score_api_queries_str_documents_list(
server: tuple[RemoteOpenAIServer, str],
):
remote_server, backend = server
score_response = requests.post(
remote_server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": query,
"documents": [
document,
{"content": [documents[0]]},
{"content": [documents[1]]},
{"content": [documents[0], documents[1]]},
],
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 4
assert score.usage.prompt_tokens == 368
assert_score(score.data[0].score, TEXT_VS_TEXT, backend, "list[0]_text_vs_text")
assert_score(score.data[1].score, TEXT_VS_TEXT, backend, "list[1]_text_vs_text")
assert_score(score.data[2].score, TEXT_VS_IMAGE, backend, "list[2]_text_vs_image")
assert_score(
score.data[3].score,
TEXT_VS_TEXT_PLUS_IMAGE,
backend,
"list[3]_text_vs_text_plus_image",
)
@pytest.mark.asyncio
async def test_rerank_api_queries_str_documents_list(
server: tuple[RemoteOpenAIServer, str],
):
remote_server, backend = server
rerank_response = requests.post(
remote_server.url_for("rerank"),
json={
"model": MODEL_NAME,
"query": query,
"documents": [
document,
{"content": [documents[0]]},
{"content": [documents[1]]},
{"content": [documents[0], documents[1]]},
],
},
)
rerank_response.raise_for_status()
rerank = RerankResponse.model_validate(rerank_response.json())
assert rerank.id is not None
assert rerank.model is not None
assert rerank.usage is not None
assert len(rerank.results) == 4
rerank.results.sort(key=lambda x: x.index)
assert_score(
rerank.results[0].relevance_score,
TEXT_VS_TEXT,
backend,
"rerank[0]_text_vs_text",
)
assert_score(
rerank.results[1].relevance_score,
TEXT_VS_TEXT,
backend,
"rerank[1]_text_vs_text",
)
assert_score(
rerank.results[2].relevance_score,
TEXT_VS_IMAGE,
backend,
"rerank[2]_text_vs_image",
)
assert_score(
rerank.results[3].relevance_score,
TEXT_VS_TEXT_PLUS_IMAGE,
backend,
"rerank[3]_text_vs_text_plus_image",
)
@pytest.mark.asyncio
async def test_score_api_queries_list_documents_list(
server: tuple[RemoteOpenAIServer, str],
):
remote_server, backend = server
score_response = requests.post(
remote_server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": [query] * 4,
"documents": [
document,
{"content": [documents[0]]},
{"content": [documents[1]]},
{"content": [documents[0], documents[1]]},
],
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 4
assert score.usage.prompt_tokens == 368
assert_score(score.data[0].score, TEXT_VS_TEXT, backend, "paired[0]_text_vs_text")
assert_score(score.data[1].score, TEXT_VS_TEXT, backend, "paired[1]_text_vs_text")
assert_score(score.data[2].score, TEXT_VS_IMAGE, backend, "paired[2]_text_vs_image")
assert_score(
score.data[3].score,
TEXT_VS_TEXT_PLUS_IMAGE,
backend,
"paired[3]_text_vs_text_plus_image",
)

View File

@@ -0,0 +1,119 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import weakref
import pytest
from vllm import LLM
from vllm.distributed import cleanup_dist_env_and_memory
from vllm.platforms import current_platform
from .util import ColBERTScoringHfRunner
MODEL_NAME = "answerdotai/answerai-colbert-small-v1"
COLBERT_DIM = 96
LINEAR_WEIGHTS_KEY = "linear.weight"
PROMPT = "The chef prepared a delicious meal."
TEXTS_1 = [
"What is the capital of France?",
"What is the capital of Germany?",
]
TEXTS_2 = [
"The capital of France is Paris.",
"The capital of Germany is Berlin.",
]
DTYPE = "half"
@pytest.fixture(scope="module")
def llm():
# ROCm: Use FLEX_ATTENTION backend as it's the only attention backend
# that supports encoder-only models on ROCm.
attention_config = None
if current_platform.is_rocm():
attention_config = {"backend": "FLEX_ATTENTION"}
# pytest caches the fixture so we use weakref.proxy to
# enable garbage collection
llm = LLM(
model=MODEL_NAME,
max_num_batched_tokens=32768,
tensor_parallel_size=1,
gpu_memory_utilization=0.75,
enforce_eager=True,
seed=0,
attention_config=attention_config,
)
yield weakref.proxy(llm)
del llm
cleanup_dist_env_and_memory()
@pytest.fixture(scope="module")
def hf_model():
return ColBERTScoringHfRunner(
model_name=MODEL_NAME, linear_weights_key=LINEAR_WEIGHTS_KEY
)
@pytest.mark.skip_global_cleanup
def test_1_to_1(llm, hf_model):
text_pair = [TEXTS_1[0], TEXTS_2[0]]
hf_outputs = hf_model.predict([text_pair]).tolist()
vllm_outputs = [
output.outputs.score for output in llm.score(text_pair[0], text_pair[1])
]
assert len(vllm_outputs) == 1
assert len(hf_outputs) == 1
assert hf_outputs[0] == pytest.approx(vllm_outputs[0], rel=0.01)
@pytest.mark.skip_global_cleanup
def test_1_to_n(llm, hf_model):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[0], TEXTS_2[1]],
]
hf_outputs = hf_model.predict(text_pairs).tolist()
vllm_outputs = [output.outputs.score for output in llm.score(TEXTS_1[0], TEXTS_2)]
assert len(vllm_outputs) == 2
assert len(hf_outputs) == 2
assert hf_outputs[0] == pytest.approx(vllm_outputs[0], rel=0.01)
assert hf_outputs[1] == pytest.approx(vllm_outputs[1], rel=0.01)
@pytest.mark.skip_global_cleanup
def test_n_to_n(llm, hf_model):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[1], TEXTS_2[1]],
]
hf_outputs = hf_model.predict(text_pairs).tolist()
vllm_outputs = [output.outputs.score for output in llm.score(TEXTS_1, TEXTS_2)]
assert len(vllm_outputs) == 2
assert len(hf_outputs) == 2
assert hf_outputs[0] == pytest.approx(vllm_outputs[0], rel=0.01)
assert hf_outputs[1] == pytest.approx(vllm_outputs[1], rel=0.01)
def test_token_embed(llm):
outputs = llm.encode(PROMPT, pooling_task="token_embed", use_tqdm=False)
assert len(outputs) == 1
assert outputs[0].outputs.data.shape == (9, COLBERT_DIM)

View File

@@ -0,0 +1,232 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Online API tests for ColBERT late interaction scoring."""
import pytest
import requests
from tests.utils import RemoteOpenAIServer
from vllm.entrypoints.pooling.score.protocol import RerankResponse, ScoreResponse
from .util import ColBERTScoringHfRunner
MODEL_NAME = "answerdotai/answerai-colbert-small-v1"
COLBERT_DIM = 96
MAX_MODEL_LEN = 512
LINEAR_WEIGHTS_KEY = "linear.weight"
TEXTS_1 = [
"What is the capital of France?",
"What is the capital of Germany?",
]
TEXTS_2 = [
"The capital of France is Paris.",
"The capital of Germany is Berlin.",
]
@pytest.fixture(scope="module")
def server():
args = [
"--max-model-len",
str(MAX_MODEL_LEN),
]
with RemoteOpenAIServer(MODEL_NAME, args) as remote_server:
yield remote_server
@pytest.fixture(scope="module")
def hf_model():
return ColBERTScoringHfRunner(
model_name=MODEL_NAME, linear_weights_key=LINEAR_WEIGHTS_KEY
)
@pytest.mark.asyncio
async def test_score_api_queries_str_1_documents_str_1(
hf_model, server: RemoteOpenAIServer
):
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": TEXTS_1[0],
"documents": TEXTS_2[0],
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 1
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict([[TEXTS_1[0], TEXTS_2[0]]]).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_score_api_queries_str_1_documents_str_n(
hf_model, server: RemoteOpenAIServer
):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[0], TEXTS_2[1]],
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": TEXTS_1[0],
"documents": TEXTS_2,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 2
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict(text_pairs).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_score_api_queries_str_n_documents_str_n(
hf_model, server: RemoteOpenAIServer
):
text_pairs = [
[TEXTS_1[0], TEXTS_2[0]],
[TEXTS_1[1], TEXTS_2[1]],
]
score_response = requests.post(
server.url_for("score"),
json={
"model": MODEL_NAME,
"queries": TEXTS_1,
"documents": TEXTS_2,
},
)
score_response.raise_for_status()
score = ScoreResponse.model_validate(score_response.json())
assert score.id is not None
assert score.data is not None
assert len(score.data) == 2
vllm_outputs = [d.score for d in score.data]
hf_outputs = hf_model.predict(text_pairs).tolist()
for i in range(len(vllm_outputs)):
assert hf_outputs[i] == pytest.approx(vllm_outputs[i], rel=0.01)
@pytest.mark.asyncio
async def test_rerank_api_texts(server: RemoteOpenAIServer):
"""Test ColBERT rerank endpoint."""
query = "What is the capital of France?"
documents = [
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
]
rerank_response = requests.post(
server.url_for("rerank"),
json={
"model": MODEL_NAME,
"query": query,
"documents": documents,
},
)
rerank_response.raise_for_status()
rerank = RerankResponse.model_validate(rerank_response.json())
assert rerank.id is not None
assert rerank.results is not None
assert len(rerank.results) == 2
paris_result = next(r for r in rerank.results if r.index == 1)
brazil_result = next(r for r in rerank.results if r.index == 0)
assert paris_result.relevance_score > brazil_result.relevance_score
@pytest.mark.asyncio
async def test_rerank_api_top_n(server: RemoteOpenAIServer):
"""Test ColBERT rerank with top_n parameter."""
query = "What is the capital of France?"
documents = [
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
"Machine learning is a field of AI.",
]
rerank_response = requests.post(
server.url_for("rerank"),
json={
"model": MODEL_NAME,
"query": query,
"documents": documents,
"top_n": 2,
},
)
rerank_response.raise_for_status()
rerank = RerankResponse.model_validate(rerank_response.json())
assert len(rerank.results) == 2
assert rerank.results[0].index == 1
@pytest.mark.asyncio
async def test_token_embed(server: RemoteOpenAIServer):
"""Test ColBERT token_embed task via pooling endpoint."""
text = "What is the capital of France?"
pooling_response = requests.post(
server.url_for("pooling"),
json={
"model": MODEL_NAME,
"input": text,
"task": "token_embed",
},
)
pooling_response.raise_for_status()
pooling = pooling_response.json()
assert "data" in pooling
assert len(pooling["data"]) == 1
embeddings = pooling["data"][0]["data"]
assert isinstance(embeddings, list)
assert len(embeddings) > 0
assert len(embeddings[0]) == COLBERT_DIM
@pytest.mark.asyncio
async def test_embed_not_supported(server: RemoteOpenAIServer):
"""Test that ColBERT model does not support 'embed' task."""
task = "embed"
text = "What is the capital of France?"
response = requests.post(
server.url_for("pooling"),
json={
"model": MODEL_NAME,
"input": text,
"task": task,
},
)
assert response.json()["error"]["type"] == "BadRequestError"
assert response.json()["error"]["message"].startswith(f"Unsupported task: {task!r}")

View File

@@ -0,0 +1,353 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from unittest.mock import patch
import pytest
from vllm.config import ModelConfig
from vllm.entrypoints.chat_utils import ChatTemplateResolutionError
from vllm.entrypoints.pooling.score.utils import (
get_score_prompt,
)
from vllm.inputs import TokensPrompt
from vllm.tokenizers import get_tokenizer
# A cross-encoder model for testing
CROSS_ENCODER_MODEL_ID = "cross-encoder/ms-marco-MiniLM-L-6-v2"
def assert_prompt_tokenization_consistent(
tokenizer, full_prompt, engine_prompt, add_special_tokens=True
):
"""Verify that engine_prompt token_ids match tokenizing full_prompt."""
expected_ids = tokenizer(full_prompt, add_special_tokens=add_special_tokens)[
"input_ids"
]
actual_ids = engine_prompt["prompt_token_ids"]
assert actual_ids == expected_ids, (
f"Token IDs don't match.\nExpected: {expected_ids}\nActual: {actual_ids}"
)
@pytest.fixture(scope="module")
def cross_encoder_model_config():
return ModelConfig(
CROSS_ENCODER_MODEL_ID,
runner="pooling",
)
@pytest.fixture(scope="module")
def cross_encoder_tokenizer(cross_encoder_model_config):
return get_tokenizer(
CROSS_ENCODER_MODEL_ID,
trust_remote_code=cross_encoder_model_config.trust_remote_code,
)
@pytest.fixture(scope="module")
def llm_reranker_model_config():
"""Model config for LLM-as-reranker style (no pad token)."""
config = ModelConfig(
CROSS_ENCODER_MODEL_ID,
runner="pooling",
)
# use_sep_token is a property that reads from hf_config,
# so we set it there to override the default (True)
config.hf_config.use_sep_token = False
return config
@pytest.fixture
def tokenization_kwargs():
"""Common tokenization kwargs used across tests."""
return {"add_special_tokens": True, "return_tensors": None}
@pytest.fixture
def mock_model_with_score_template():
"""Mock model class that supports score template and tracks post_process calls."""
class MockModelWithScoreTemplate:
supports_score_template = True
post_process_called: list[TokensPrompt] = []
@staticmethod
def get_score_template(p1: str, p2: str) -> str:
return f"[QUERY]{p1}[SEP][DOC]{p2}"
@staticmethod
def post_process_tokens(prompt: TokensPrompt) -> None:
MockModelWithScoreTemplate.post_process_called.append(prompt)
return MockModelWithScoreTemplate
@pytest.fixture
def mock_model_no_score_template():
"""Mock model class that does not support score template."""
class MockModelNoScoreTemplate:
supports_score_template = False
return MockModelNoScoreTemplate
class TestGetScorePrompt:
"""Tests for the get_score_prompt function."""
def test_tokenization_kwargs_passed_through(
self,
llm_reranker_model_config,
cross_encoder_tokenizer,
):
"""Test that tokenization kwargs are properly passed through."""
data_1 = "Query text"
data_2 = "Document text"
# Test with truncation - custom kwargs for this test
custom_tokenization_kwargs = {
"add_special_tokens": True,
"return_tensors": None,
"truncation": True,
"max_length": 20,
}
full_prompt, engine_prompt = get_score_prompt(
llm_reranker_model_config,
cross_encoder_tokenizer,
custom_tokenization_kwargs,
data_1,
data_2,
)
assert isinstance(full_prompt, str)
assert "prompt_token_ids" in engine_prompt
# With max_length=20 and truncation, should not exceed this
assert len(engine_prompt["prompt_token_ids"]) <= 20
# Since truncation was applied, token_ids should be a prefix of full encoding
full_ids = cross_encoder_tokenizer(full_prompt, add_special_tokens=True)[
"input_ids"
]
actual_ids = engine_prompt["prompt_token_ids"]
assert full_ids[: len(actual_ids)] == actual_ids, (
f"Token IDs are not a prefix of full encoding.\n"
f"Full IDs: {full_ids}\n"
f"Actual IDs: {actual_ids}"
)
def test_model_supports_score_template(
self,
cross_encoder_model_config,
cross_encoder_tokenizer,
tokenization_kwargs,
mock_model_with_score_template,
):
"""Test when model supports score template (no score_template arg)."""
with patch(
"vllm.model_executor.model_loader.get_model_cls",
return_value=mock_model_with_score_template,
):
full_prompt, engine_prompt = get_score_prompt(
cross_encoder_model_config,
cross_encoder_tokenizer,
tokenization_kwargs,
"query text",
"document text",
)
assert full_prompt == "[QUERY]query text[SEP][DOC]document text"
assert "prompt_token_ids" in engine_prompt
assert len(engine_prompt["prompt_token_ids"]) > 0
assert_prompt_tokenization_consistent(
cross_encoder_tokenizer, full_prompt, engine_prompt
)
def test_model_supports_score_template_but_custom_template_provided(
self,
cross_encoder_model_config,
cross_encoder_tokenizer,
tokenization_kwargs,
mock_model_with_score_template,
):
"""Test when model supports score template but custom template is provided."""
template = (
'TEMPLATE_USED {{ messages[0]["content"] }} {{ messages[1]["content"] }}'
)
with (
patch(
"vllm.model_executor.model_loader.get_model_cls",
return_value=mock_model_with_score_template,
),
):
full_prompt, engine_prompt = get_score_prompt(
cross_encoder_model_config,
cross_encoder_tokenizer,
tokenization_kwargs,
"query",
"doc",
score_template=template, # Providing a template
)
assert "prompt_token_ids" in engine_prompt
assert full_prompt == "TEMPLATE_USED query doc"
assert_prompt_tokenization_consistent(
cross_encoder_tokenizer, full_prompt, engine_prompt
)
def test_not_using_default_template(
self,
llm_reranker_model_config,
cross_encoder_tokenizer,
tokenization_kwargs,
mock_model_no_score_template,
):
# FIXME: For now, we only apply a template when one is explicitly provided.
# We cannot rely on the tokenizer's chat template because many models
# inherit junk templates from their base LLM, which breaks both the models
# and the tests that use them.
with (
patch(
"vllm.model_executor.model_loader.get_model_cls",
return_value=mock_model_no_score_template,
),
patch(
"vllm.entrypoints.pooling.score.utils.safe_apply_chat_template",
return_value="test querytest doc",
),
):
full_prompt, engine_prompt = get_score_prompt(
llm_reranker_model_config,
cross_encoder_tokenizer,
tokenization_kwargs,
"test query",
"test doc",
)
assert full_prompt == "test querytest doc"
assert "prompt_token_ids" in engine_prompt
assert_prompt_tokenization_consistent(
cross_encoder_tokenizer, full_prompt, engine_prompt
)
def test_fallback_with_sep_token(
self,
cross_encoder_model_config,
cross_encoder_tokenizer,
tokenization_kwargs,
mock_model_no_score_template,
):
"""Test fallback path when ChatTemplateResolutionError
and use_sep_token=True."""
with (
patch(
"vllm.model_executor.model_loader.get_model_cls",
return_value=mock_model_no_score_template,
),
patch(
"vllm.entrypoints.pooling.score.utils.safe_apply_chat_template",
side_effect=ChatTemplateResolutionError("No template"),
),
):
full_prompt, engine_prompt = get_score_prompt(
cross_encoder_model_config, # use_sep_token=True
cross_encoder_tokenizer,
tokenization_kwargs,
"query",
"document",
)
assert "prompt_token_ids" in engine_prompt
# Should have token_type_ids from text_pair encoding
assert "token_type_ids" in engine_prompt
assert "query" in full_prompt
assert "document" in full_prompt
assert full_prompt != "querydocument"
assert (
engine_prompt["prompt_token_ids"]
== cross_encoder_tokenizer(
"query", text_pair="document", add_special_tokens=True
)["input_ids"]
)
# FIXME(?): add_special_tokens=False is needed because in this case
# full_prompt is obtained by decoding the tokenized prompt, which includes
# special tokens and we would get duplicated special tokens otherwise.
# This is inconsistent with other cases.
assert_prompt_tokenization_consistent(
cross_encoder_tokenizer,
full_prompt,
engine_prompt,
add_special_tokens=False,
)
def test_fallback_without_sep_token(
self,
llm_reranker_model_config,
cross_encoder_tokenizer,
tokenization_kwargs,
mock_model_no_score_template,
):
"""Test fallback path when ChatTemplateResolutionError
and use_sep_token=False."""
with (
patch(
"vllm.model_executor.model_loader.get_model_cls",
return_value=mock_model_no_score_template,
),
patch(
"vllm.entrypoints.pooling.score.utils.safe_apply_chat_template",
side_effect=ChatTemplateResolutionError("No template"),
),
):
full_prompt, engine_prompt = get_score_prompt(
llm_reranker_model_config, # use_sep_token=False
cross_encoder_tokenizer,
tokenization_kwargs,
"query",
"document",
)
assert full_prompt == "querydocument"
assert "prompt_token_ids" in engine_prompt
assert_prompt_tokenization_consistent(
cross_encoder_tokenizer, full_prompt, engine_prompt
)
def test_post_process_tokens_called(
self,
cross_encoder_model_config,
cross_encoder_tokenizer,
tokenization_kwargs,
mock_model_with_score_template,
):
"""Test that post_process_tokens is called on the engine prompt."""
# Reset the call tracker
mock_model_with_score_template.post_process_called.clear()
with (
patch(
"vllm.model_executor.model_loader.get_model_cls",
return_value=mock_model_with_score_template,
),
patch(
"vllm.entrypoints.pooling.score.utils.safe_apply_chat_template",
side_effect=ChatTemplateResolutionError("No template"),
),
):
full_prompt, engine_prompt = get_score_prompt(
cross_encoder_model_config,
cross_encoder_tokenizer,
tokenization_kwargs,
"query",
"doc",
)
# post_process_tokens should have been called once
assert len(mock_model_with_score_template.post_process_called) == 1
assert mock_model_with_score_template.post_process_called[0] is engine_prompt
assert_prompt_tokenization_consistent(
cross_encoder_tokenizer, full_prompt, engine_prompt
)

View File

@@ -0,0 +1,69 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import torch
import torch.nn.functional as F
from huggingface_hub import hf_hub_download
from safetensors.torch import load_file
from transformers import AutoModel, AutoTokenizer
from tests.conftest import HfRunner
from vllm.entrypoints.pooling.score.utils import compute_maxsim_score
class ColBERTScoringHfRunner(torch.nn.Module):
def __init__(self, model_name, linear_weights_key):
super().__init__()
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
extra = {}
if self.device.type == "cpu":
extra["attn_implementation"] = "eager"
self.model = AutoModel.from_pretrained(
model_name,
**extra,
).to(self.device)
self.model.eval()
path = hf_hub_download(model_name, filename="model.safetensors")
weights = load_file(path)
self.linear_weight = weights[linear_weights_key].to(self.device).float()
@torch.inference_mode()
def forward(self, texts):
embeddings = []
for text in texts:
inputs = self.tokenizer(text, return_tensors="pt").to(self.device)
hidden = self.model(**inputs).last_hidden_state.float()
projected = F.linear(hidden, self.linear_weight.float())
normalised = F.normalize(projected, p=2, dim=-1)
embeddings.append(normalised.squeeze(0).cpu())
return embeddings
@torch.inference_mode()
def predict(self, prompts: list[list[str]], *args, **kwargs):
hf_embeddings = [self(prompt) for prompt in prompts]
hf_outputs = [
compute_maxsim_score(*map(torch.tensor, pair)).item()
for pair in hf_embeddings
]
return torch.as_tensor(hf_outputs)
class EncoderScoringHfRunner(HfRunner):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, is_sentence_transformer=True)
@torch.inference_mode()
def predict(self, prompts: list[list[str]], *args, **kwargs):
hf_embeddings = [self.encode(prompt) for prompt in prompts]
hf_outputs = [
F.cosine_similarity(*map(torch.tensor, pair), dim=0)
for pair in hf_embeddings
]
return torch.as_tensor(hf_outputs)