[Frontend][3/n] Improve pooling entrypoints | scoring. (#28631)
Signed-off-by: wang.yuqi <yuqi.wang@daocloud.io>
This commit is contained in:
@@ -46,22 +46,16 @@ from vllm.entrypoints.chat_utils import (
|
||||
load_chat_template,
|
||||
)
|
||||
from vllm.entrypoints.pooling.io_processor_factories import init_pooling_io_processors
|
||||
from vllm.entrypoints.pooling.score.utils import (
|
||||
ScoreData,
|
||||
ScoreMultiModalParam,
|
||||
_cosine_similarity,
|
||||
compress_token_type_ids,
|
||||
compute_maxsim_score,
|
||||
get_score_prompt,
|
||||
score_data_to_prompts,
|
||||
validate_score_input,
|
||||
from vllm.entrypoints.pooling.scoring.io_processor import (
|
||||
ScoringIOProcessor,
|
||||
)
|
||||
from vllm.entrypoints.pooling.scoring.typing import ScoreInput
|
||||
from vllm.entrypoints.pooling.typing import OfflineInputsContext, OfflineOutputsContext
|
||||
from vllm.entrypoints.utils import log_non_default_args
|
||||
from vllm.inputs import (
|
||||
DataPrompt,
|
||||
EngineInput,
|
||||
PromptType,
|
||||
SingletonPrompt,
|
||||
TextPrompt,
|
||||
TokensPrompt,
|
||||
)
|
||||
@@ -1161,7 +1155,9 @@ class LLM:
|
||||
if pooling_task in self.pooling_io_processors:
|
||||
io_processor = self.pooling_io_processors[pooling_task]
|
||||
processor_inputs = io_processor.pre_process_offline(
|
||||
prompts_seq, tokenization_kwargs
|
||||
ctx=OfflineInputsContext(
|
||||
prompts=prompts_seq, tokenization_kwargs=tokenization_kwargs
|
||||
)
|
||||
)
|
||||
seq_lora_requests = self._lora_request_to_seq(
|
||||
lora_request, len(prompts_seq)
|
||||
@@ -1178,7 +1174,9 @@ class LLM:
|
||||
outputs = self._run_engine(
|
||||
use_tqdm=use_tqdm, output_type=PoolingRequestOutput
|
||||
)
|
||||
outputs = io_processor.post_process_offline(outputs)
|
||||
outputs = io_processor.post_process_offline(
|
||||
ctx=OfflineOutputsContext(outputs=outputs)
|
||||
)
|
||||
else:
|
||||
outputs = self._run_completion(
|
||||
prompts=prompts_seq,
|
||||
@@ -1378,188 +1376,10 @@ class LLM:
|
||||
tokenization_kwargs=tokenization_kwargs,
|
||||
)
|
||||
|
||||
def _embedding_score(
|
||||
self,
|
||||
data_1: list[ScoreData],
|
||||
data_2: list[ScoreData],
|
||||
*,
|
||||
use_tqdm: bool | Callable[..., tqdm],
|
||||
pooling_params: PoolingParams | None,
|
||||
lora_request: list[LoRARequest] | LoRARequest | None,
|
||||
tokenization_kwargs: dict[str, Any],
|
||||
) -> list[ScoringRequestOutput]:
|
||||
tokenizer = self.get_tokenizer()
|
||||
|
||||
input_texts: list[str] = []
|
||||
for text in data_1 + data_2:
|
||||
if not isinstance(text, str):
|
||||
raise NotImplementedError(
|
||||
"Embedding scores currently do not support multimodal input."
|
||||
)
|
||||
input_texts.append(text)
|
||||
|
||||
encoded_output = self.encode(
|
||||
input_texts,
|
||||
use_tqdm=use_tqdm,
|
||||
lora_request=lora_request,
|
||||
pooling_params=pooling_params,
|
||||
pooling_task="embed",
|
||||
tokenization_kwargs=tokenization_kwargs,
|
||||
)
|
||||
|
||||
encoded_output_1 = encoded_output[0 : len(data_1)]
|
||||
encoded_output_2 = encoded_output[len(data_1) :]
|
||||
|
||||
if len(encoded_output_1) == 1:
|
||||
encoded_output_1 = encoded_output_1 * len(encoded_output_2)
|
||||
|
||||
scores = _cosine_similarity(
|
||||
tokenizer=tokenizer,
|
||||
embed_1=encoded_output_1,
|
||||
embed_2=encoded_output_2,
|
||||
)
|
||||
|
||||
return [ScoringRequestOutput.from_base(item) for item in scores]
|
||||
|
||||
def _late_interaction_score(
|
||||
self,
|
||||
data_1: list[ScoreData],
|
||||
data_2: list[ScoreData],
|
||||
*,
|
||||
use_tqdm: bool | Callable[..., tqdm],
|
||||
pooling_params: PoolingParams | None,
|
||||
lora_request: list[LoRARequest] | LoRARequest | None,
|
||||
tokenization_kwargs: dict[str, Any],
|
||||
) -> list[ScoringRequestOutput]:
|
||||
"""
|
||||
Late interaction scoring (ColBERT MaxSim).
|
||||
|
||||
Encodes queries and documents into per-token embeddings, then computes
|
||||
MaxSim: sum over query tokens of max similarity to any document token.
|
||||
"""
|
||||
from vllm.outputs import PoolingOutput
|
||||
|
||||
tokenizer = self.get_tokenizer()
|
||||
|
||||
# Convert ScoreData to PromptType (handles both text and multimodal)
|
||||
model_config = self.model_config
|
||||
prompts_1 = score_data_to_prompts(data_1, "query", model_config)
|
||||
prompts_2 = score_data_to_prompts(data_2, "document", model_config)
|
||||
|
||||
encoded_output: list[PoolingRequestOutput] = self.encode(
|
||||
prompts_1 + prompts_2,
|
||||
use_tqdm=use_tqdm,
|
||||
lora_request=lora_request,
|
||||
pooling_params=pooling_params,
|
||||
pooling_task="token_embed",
|
||||
tokenization_kwargs=tokenization_kwargs,
|
||||
)
|
||||
|
||||
encoded_output_1: list[PoolingRequestOutput] = encoded_output[: len(prompts_1)]
|
||||
encoded_output_2: list[PoolingRequestOutput] = encoded_output[len(prompts_1) :]
|
||||
|
||||
if len(encoded_output_1) == 1:
|
||||
encoded_output_1 = encoded_output_1 * len(encoded_output_2)
|
||||
|
||||
# Compute MaxSim scores
|
||||
scores: list[PoolingRequestOutput] = []
|
||||
padding: list[int] = []
|
||||
if (pad_token_id := tokenizer.pad_token_id) is not None:
|
||||
padding = [pad_token_id]
|
||||
|
||||
for emb_1, emb_2 in zip(encoded_output_1, encoded_output_2):
|
||||
# emb_1.outputs.data: [query_len, dim]
|
||||
# emb_2.outputs.data: [doc_len, dim]
|
||||
q_emb = emb_1.outputs.data
|
||||
d_emb = emb_2.outputs.data
|
||||
|
||||
maxsim_score = compute_maxsim_score(q_emb, d_emb)
|
||||
|
||||
tokens = emb_1.prompt_token_ids + padding + emb_2.prompt_token_ids
|
||||
|
||||
scores.append(
|
||||
PoolingRequestOutput(
|
||||
request_id=f"{emb_1.request_id}_{emb_2.request_id}",
|
||||
outputs=PoolingOutput(data=maxsim_score),
|
||||
prompt_token_ids=tokens,
|
||||
num_cached_tokens=emb_1.num_cached_tokens + emb_2.num_cached_tokens,
|
||||
finished=True,
|
||||
)
|
||||
)
|
||||
|
||||
return [ScoringRequestOutput.from_base(item) for item in scores]
|
||||
|
||||
def _cross_encoding_score(
|
||||
self,
|
||||
data_1: list[ScoreData],
|
||||
data_2: list[ScoreData],
|
||||
*,
|
||||
use_tqdm: bool | Callable[..., tqdm],
|
||||
pooling_params: PoolingParams | None,
|
||||
lora_request: list[LoRARequest] | LoRARequest | None,
|
||||
tokenization_kwargs: dict[str, Any],
|
||||
score_template: str | None,
|
||||
) -> list[ScoringRequestOutput]:
|
||||
model_config = self.model_config
|
||||
tokenizer = self.get_tokenizer()
|
||||
|
||||
if is_mistral_tokenizer(tokenizer):
|
||||
raise ValueError("Score API is not supported for Mistral tokenizer")
|
||||
|
||||
if len(data_1) == 1:
|
||||
data_1 = data_1 * len(data_2)
|
||||
|
||||
if pooling_params is None:
|
||||
pooling_params = PoolingParams(task="classify")
|
||||
elif pooling_params.task is None:
|
||||
pooling_params.task = "classify"
|
||||
|
||||
pooling_params_list = list[PoolingParams]()
|
||||
|
||||
prompts = list[PromptType]()
|
||||
|
||||
input_pairs = [(t1, t2) for t1, t2 in zip(data_1, data_2)]
|
||||
|
||||
for q, d in input_pairs:
|
||||
_, engine_prompt = get_score_prompt(
|
||||
model_config=model_config,
|
||||
data_1=q,
|
||||
data_2=d,
|
||||
tokenizer=tokenizer,
|
||||
tokenization_kwargs=tokenization_kwargs,
|
||||
score_template=score_template,
|
||||
)
|
||||
|
||||
if token_type_ids := engine_prompt.pop("token_type_ids", None):
|
||||
params = pooling_params.clone()
|
||||
compressed = compress_token_type_ids(token_type_ids)
|
||||
params.extra_kwargs = {"compressed_token_type_ids": compressed}
|
||||
pooling_params_list.append(params)
|
||||
else:
|
||||
pooling_params_list.append(pooling_params)
|
||||
|
||||
prompts.append(engine_prompt)
|
||||
|
||||
outputs = self._run_completion(
|
||||
prompts=prompts,
|
||||
params=pooling_params_list,
|
||||
output_type=PoolingRequestOutput,
|
||||
use_tqdm=use_tqdm,
|
||||
lora_request=lora_request,
|
||||
)
|
||||
|
||||
return [ScoringRequestOutput.from_base(item) for item in outputs]
|
||||
|
||||
def score(
|
||||
self,
|
||||
data_1: SingletonPrompt
|
||||
| Sequence[SingletonPrompt]
|
||||
| ScoreMultiModalParam
|
||||
| list[ScoreMultiModalParam],
|
||||
data_2: SingletonPrompt
|
||||
| Sequence[SingletonPrompt]
|
||||
| ScoreMultiModalParam
|
||||
| list[ScoreMultiModalParam],
|
||||
data_1: ScoreInput | list[ScoreInput],
|
||||
data_2: ScoreInput | list[ScoreInput],
|
||||
/,
|
||||
*,
|
||||
use_tqdm: bool | Callable[..., tqdm] = True,
|
||||
@@ -1606,83 +1426,71 @@ class LLM:
|
||||
A list of `ScoringRequestOutput` objects containing the
|
||||
generated scores in the same order as the input prompts.
|
||||
"""
|
||||
model_config = self.model_config
|
||||
|
||||
runner_type = model_config.runner_type
|
||||
if runner_type != "pooling":
|
||||
if self.runner_type != "pooling":
|
||||
raise ValueError(
|
||||
"LLM.score() is only supported for pooling models. "
|
||||
"Try passing `--runner pooling` to use the model as a "
|
||||
"pooling model."
|
||||
)
|
||||
|
||||
supported_tasks = self.supported_tasks
|
||||
score_type = self.model_config.score_type
|
||||
is_late_interaction = score_type == "late-interaction"
|
||||
is_cross_encoder = score_type == "cross-encoder"
|
||||
|
||||
# Late interaction models (e.g., ColBERT) use token_embed for scoring
|
||||
if not is_late_interaction and all(
|
||||
t not in supported_tasks for t in ("embed", "classify")
|
||||
if (
|
||||
score_type == "cross-encoder"
|
||||
and getattr(self.model_config.hf_config, "num_labels", 0) != 1
|
||||
):
|
||||
raise ValueError(
|
||||
"Score API is not supported by this model. "
|
||||
"Try converting the model using "
|
||||
"`--convert embed` or `--convert classify`."
|
||||
)
|
||||
raise ValueError("Scoring API is only enabled for num_labels == 1.")
|
||||
|
||||
if is_cross_encoder and getattr(model_config.hf_config, "num_labels", 0) != 1:
|
||||
raise ValueError("Score API is only enabled for num_labels == 1.")
|
||||
if score_type is None or score_type not in self.pooling_io_processors:
|
||||
raise ValueError("This model does not support the Scoring API.")
|
||||
|
||||
if not is_cross_encoder and chat_template is not None:
|
||||
raise ValueError(
|
||||
"chat_template is only supported for cross-encoder models."
|
||||
)
|
||||
io_processor = self.pooling_io_processors[score_type]
|
||||
assert isinstance(io_processor, ScoringIOProcessor)
|
||||
|
||||
is_multimodal_model = model_config.is_multimodal_model
|
||||
architecture = model_config.architecture
|
||||
pooling_task = io_processor.pooling_task
|
||||
scoring_data = io_processor.valid_inputs(data_1, data_2)
|
||||
offset = len(scoring_data.data_1)
|
||||
|
||||
score_data_1, score_data_2 = validate_score_input(
|
||||
data_1, # type: ignore[arg-type]
|
||||
data_2, # type: ignore[arg-type]
|
||||
is_multimodal_model=is_multimodal_model,
|
||||
architecture=architecture,
|
||||
ctx = OfflineInputsContext(
|
||||
prompts=scoring_data,
|
||||
pooling_params=pooling_params,
|
||||
tokenization_kwargs=tokenization_kwargs,
|
||||
chat_template=chat_template,
|
||||
offset=offset,
|
||||
)
|
||||
|
||||
renderer = self.renderer
|
||||
tok_params = renderer.default_cmpl_tok_params.with_kwargs(
|
||||
**(tokenization_kwargs or {})
|
||||
)
|
||||
encode_kwargs = tok_params.get_encode_kwargs()
|
||||
processor_inputs = io_processor.pre_process_offline(ctx)
|
||||
|
||||
if is_cross_encoder:
|
||||
return self._cross_encoding_score(
|
||||
score_data_1,
|
||||
score_data_2,
|
||||
use_tqdm=use_tqdm,
|
||||
pooling_params=pooling_params,
|
||||
lora_request=lora_request,
|
||||
tokenization_kwargs=encode_kwargs,
|
||||
score_template=chat_template,
|
||||
)
|
||||
elif is_late_interaction:
|
||||
return self._late_interaction_score(
|
||||
score_data_1,
|
||||
score_data_2,
|
||||
use_tqdm=use_tqdm,
|
||||
pooling_params=pooling_params,
|
||||
lora_request=lora_request,
|
||||
tokenization_kwargs=encode_kwargs,
|
||||
)
|
||||
else:
|
||||
return self._embedding_score(
|
||||
score_data_1,
|
||||
score_data_2,
|
||||
use_tqdm=use_tqdm,
|
||||
pooling_params=pooling_params,
|
||||
lora_request=lora_request,
|
||||
tokenization_kwargs=encode_kwargs,
|
||||
)
|
||||
seq_lora_requests = self._lora_request_to_seq(
|
||||
lora_request, len(processor_inputs)
|
||||
)
|
||||
|
||||
if ctx.pooling_params is None:
|
||||
ctx.pooling_params = PoolingParams()
|
||||
params_seq = self._params_to_seq(ctx.pooling_params, len(processor_inputs))
|
||||
|
||||
for param in params_seq:
|
||||
if param.task is None:
|
||||
param.task = pooling_task
|
||||
elif param.task != pooling_task:
|
||||
msg = f"You cannot overwrite {param.task=!r} with {pooling_task=!r}!"
|
||||
raise ValueError(msg)
|
||||
|
||||
seq_priority = self._priority_to_seq(None, len(processor_inputs))
|
||||
|
||||
self._render_and_add_requests(
|
||||
prompts=processor_inputs,
|
||||
params=params_seq,
|
||||
lora_requests=seq_lora_requests,
|
||||
priorities=seq_priority,
|
||||
)
|
||||
|
||||
outputs = self._run_engine(use_tqdm=use_tqdm, output_type=PoolingRequestOutput)
|
||||
outputs = io_processor.post_process_offline(
|
||||
ctx=OfflineOutputsContext(outputs=outputs, offset=offset),
|
||||
)
|
||||
|
||||
return [ScoringRequestOutput.from_base(item) for item in outputs]
|
||||
|
||||
def start_profile(self, profile_prefix: str | None = None) -> None:
|
||||
"""Start profiling with optional custom trace prefix.
|
||||
|
||||
Reference in New Issue
Block a user