Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
95c0f928cd | ||
|
|
c9b1e977dc | ||
|
|
1ff2393897 | ||
|
|
5bec0b0ba3 | ||
|
|
6da1310f91 | ||
|
|
bc46be5daf | ||
|
|
8e39d39fd4 | ||
|
|
46fa044cc1 | ||
|
|
ab43e37158 | ||
|
|
f45d010120 | ||
|
|
244b922088 |
@@ -11,6 +11,9 @@ torchaudio==2.10.0
|
||||
torchvision==0.25.0 # Required for phi3v processor. See https://github.com/pytorch/vision?tab=readme-ov-file#installation for corresponding version
|
||||
# FlashInfer should be updated together with the Dockerfile
|
||||
flashinfer-python==0.6.4
|
||||
# Cap nvidia-cudnn-frontend (transitive dep of flashinfer) due to
|
||||
# breaking changes in 1.19.0
|
||||
nvidia-cudnn-frontend>=1.13.0,<1.19.0
|
||||
|
||||
# QuACK and Cutlass DSL for FA4 (cute-DSL implementation)
|
||||
nvidia-cutlass-dsl>=4.4.0.dev1
|
||||
|
||||
172
tests/reasoning/test_nemotron_v3_reasoning_parser.py
Normal file
172
tests/reasoning/test_nemotron_v3_reasoning_parser.py
Normal file
@@ -0,0 +1,172 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
||||
|
||||
from typing import TypedDict
|
||||
|
||||
import pytest
|
||||
import regex as re
|
||||
|
||||
from tests.reasoning.utils import run_reasoning_extraction
|
||||
from vllm.entrypoints.openai.chat_completion.protocol import ChatCompletionRequest
|
||||
from vllm.reasoning import ReasoningParser, ReasoningParserManager
|
||||
|
||||
parser_name = "nemotron_v3"
|
||||
|
||||
|
||||
class ReasoningCase(TypedDict):
|
||||
output: str
|
||||
reasoning: str | None
|
||||
content: str | None
|
||||
|
||||
|
||||
class FakeNemotronTokenizer:
|
||||
def __init__(self):
|
||||
self._vocab = {
|
||||
"<think>": 1,
|
||||
"</think>": 2,
|
||||
}
|
||||
self._pattern = re.compile(r"(<think>|</think>)")
|
||||
|
||||
def get_vocab(self) -> dict[str, int]:
|
||||
return self._vocab
|
||||
|
||||
def tokenize(self, text: str) -> list[str]:
|
||||
tokens: list[str] = []
|
||||
for part in self._pattern.split(text):
|
||||
if part:
|
||||
tokens.append(part)
|
||||
return tokens
|
||||
|
||||
def convert_tokens_to_string(self, tokens: list[str]) -> str:
|
||||
return "".join(tokens)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tokenizer():
|
||||
return FakeNemotronTokenizer()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"streaming,param_dict",
|
||||
[
|
||||
pytest.param(
|
||||
False,
|
||||
{
|
||||
"output": "This is a reasoning section</think>This is the rest",
|
||||
"reasoning": "This is a reasoning section",
|
||||
"content": "This is the rest",
|
||||
},
|
||||
id="without_start_token",
|
||||
),
|
||||
pytest.param(
|
||||
True,
|
||||
{
|
||||
"output": "This is a reasoning section</think>This is the rest",
|
||||
"reasoning": "This is a reasoning section",
|
||||
"content": "This is the rest",
|
||||
},
|
||||
id="without_start_token_streaming",
|
||||
),
|
||||
pytest.param(
|
||||
False,
|
||||
{
|
||||
"output": "<think>This is a reasoning section</think>This is the rest",
|
||||
"reasoning": "This is a reasoning section",
|
||||
"content": "This is the rest",
|
||||
},
|
||||
id="with_start_token",
|
||||
),
|
||||
pytest.param(
|
||||
True,
|
||||
{
|
||||
"output": "<think>This is a reasoning section</think>This is the rest",
|
||||
"reasoning": "This is a reasoning section",
|
||||
"content": "This is the rest",
|
||||
},
|
||||
id="with_start_token_streaming",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_nemotron_v3_reasoning(
|
||||
tokenizer: FakeNemotronTokenizer,
|
||||
streaming: bool,
|
||||
param_dict: ReasoningCase,
|
||||
):
|
||||
output = tokenizer.tokenize(param_dict["output"])
|
||||
model_output = [tokenizer.convert_tokens_to_string([token]) for token in output]
|
||||
parser: ReasoningParser = ReasoningParserManager.get_reasoning_parser(parser_name)(
|
||||
tokenizer
|
||||
)
|
||||
|
||||
reasoning, content = run_reasoning_extraction(
|
||||
parser, model_output, streaming=streaming
|
||||
)
|
||||
|
||||
assert reasoning == param_dict["reasoning"]
|
||||
assert content == param_dict["content"]
|
||||
|
||||
|
||||
def test_nemotron_v3_without_thinking_returns_content(
|
||||
tokenizer: FakeNemotronTokenizer,
|
||||
):
|
||||
parser_cls = ReasoningParserManager.get_reasoning_parser(parser_name)
|
||||
parser = parser_cls(tokenizer)
|
||||
request = ChatCompletionRequest(
|
||||
model="test-model",
|
||||
messages=[],
|
||||
chat_template_kwargs={"enable_thinking": False},
|
||||
)
|
||||
|
||||
reasoning, content = run_reasoning_extraction(
|
||||
parser,
|
||||
["This is plain content"],
|
||||
request=request,
|
||||
streaming=False,
|
||||
)
|
||||
|
||||
assert reasoning is None
|
||||
assert content == "This is plain content"
|
||||
|
||||
|
||||
def test_nemotron_v3_force_nonempty_content_returns_content(
|
||||
tokenizer: FakeNemotronTokenizer,
|
||||
):
|
||||
parser_cls = ReasoningParserManager.get_reasoning_parser(parser_name)
|
||||
parser = parser_cls(tokenizer)
|
||||
request = ChatCompletionRequest(
|
||||
model="test-model",
|
||||
messages=[],
|
||||
chat_template_kwargs={"force_nonempty_content": True},
|
||||
)
|
||||
|
||||
reasoning, content = run_reasoning_extraction(
|
||||
parser,
|
||||
["<think>This is plain content"],
|
||||
request=request,
|
||||
streaming=False,
|
||||
)
|
||||
|
||||
assert reasoning is None
|
||||
assert content == "This is plain content"
|
||||
|
||||
|
||||
def test_nemotron_v3_with_thinking_keeps_truncated_reasoning(
|
||||
tokenizer: FakeNemotronTokenizer,
|
||||
):
|
||||
parser_cls = ReasoningParserManager.get_reasoning_parser(parser_name)
|
||||
parser = parser_cls(tokenizer)
|
||||
request = ChatCompletionRequest(
|
||||
model="test-model",
|
||||
messages=[],
|
||||
chat_template_kwargs={"enable_thinking": True},
|
||||
)
|
||||
|
||||
reasoning, content = run_reasoning_extraction(
|
||||
parser,
|
||||
["This is truncated reasoning"],
|
||||
request=request,
|
||||
streaming=False,
|
||||
)
|
||||
|
||||
assert reasoning == "This is truncated reasoning"
|
||||
assert content is None
|
||||
@@ -35,12 +35,6 @@ class TrtLlmFp8Experts(mk.FusedMoEExpertsMonolithic):
|
||||
):
|
||||
super().__init__(moe_config, quant_config)
|
||||
|
||||
if moe_config.moe_parallel_config.use_ep and quant_config.is_per_tensor:
|
||||
raise NotImplementedError(
|
||||
"EP parallelism is not supported with TRTLLM"
|
||||
"per-tensor FP8 quantization."
|
||||
)
|
||||
|
||||
self.routing_method_type = moe_config.routing_method
|
||||
self.topk = moe_config.experts_per_token
|
||||
self.intermediate_size_per_partition = (
|
||||
@@ -182,9 +176,6 @@ class TrtLlmFp8Experts(mk.FusedMoEExpertsMonolithic):
|
||||
assert not apply_router_weight_on_input
|
||||
assert activation == MoEActivation.SILU
|
||||
|
||||
if e_score_correction_bias is not None:
|
||||
e_score_correction_bias = e_score_correction_bias.to(hidden_states.dtype)
|
||||
|
||||
if self.routing_method_type == RoutingMethodType.DeepSeekV3:
|
||||
router_logits = router_logits.to(torch.float32)
|
||||
|
||||
@@ -240,12 +231,11 @@ class TrtLlmFp8Experts(mk.FusedMoEExpertsMonolithic):
|
||||
) -> torch.Tensor:
|
||||
# Delay import for non-CUDA.
|
||||
import flashinfer
|
||||
from flashinfer.fused_moe.core import ActivationType
|
||||
|
||||
# Confirm supported activation function.
|
||||
assert activation in [MoEActivation.SILU, MoEActivation.RELU2_NO_MUL]
|
||||
|
||||
activation_type = ActivationType(activation_to_flashinfer_int(activation))
|
||||
activation_type = activation_to_flashinfer_int(activation)
|
||||
|
||||
# Confirm Llama-4 routing is proper.
|
||||
if self.routing_method_type == RoutingMethodType.Llama4:
|
||||
|
||||
@@ -323,4 +323,5 @@ class TrtLlmNvFp4ExpertsMonolithic(
|
||||
routed_scaling_factor=routed_scaling_factor,
|
||||
routing_method_type=self.routing_method_type,
|
||||
do_finalize=True,
|
||||
activation_type=activation_to_flashinfer_int(activation),
|
||||
)[0]
|
||||
|
||||
@@ -912,7 +912,7 @@ class BatchedTritonExperts(mk.FusedMoEExpertsModular):
|
||||
|
||||
@staticmethod
|
||||
def _supports_no_act_and_mul() -> bool:
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _supports_quant_scheme(
|
||||
|
||||
@@ -1944,7 +1944,7 @@ class TritonExperts(mk.FusedMoEExpertsModular):
|
||||
|
||||
@staticmethod
|
||||
def _supports_no_act_and_mul() -> bool:
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _supports_quant_scheme(
|
||||
@@ -1983,6 +1983,9 @@ class TritonExperts(mk.FusedMoEExpertsModular):
|
||||
MoEActivation.GELU,
|
||||
MoEActivation.SWIGLUOAI,
|
||||
MoEActivation.SWIGLUSTEP,
|
||||
MoEActivation.SILU_NO_MUL,
|
||||
MoEActivation.GELU_NO_MUL,
|
||||
MoEActivation.RELU2_NO_MUL,
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -68,6 +68,10 @@ _REASONING_PARSERS_TO_REGISTER = {
|
||||
"mistral_reasoning_parser",
|
||||
"MistralReasoningParser",
|
||||
),
|
||||
"nemotron_v3": (
|
||||
"nemotron_v3_reasoning_parser",
|
||||
"NemotronV3ReasoningParser",
|
||||
),
|
||||
"olmo3": (
|
||||
"olmo3_reasoning_parser",
|
||||
"Olmo3ReasoningParser",
|
||||
|
||||
35
vllm/reasoning/nemotron_v3_reasoning_parser.py
Normal file
35
vllm/reasoning/nemotron_v3_reasoning_parser.py
Normal file
@@ -0,0 +1,35 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
||||
from vllm.entrypoints.openai.chat_completion.protocol import (
|
||||
ChatCompletionRequest,
|
||||
)
|
||||
from vllm.entrypoints.openai.responses.protocol import (
|
||||
ResponsesRequest,
|
||||
)
|
||||
from vllm.reasoning.deepseek_r1_reasoning_parser import DeepSeekR1ReasoningParser
|
||||
|
||||
|
||||
class NemotronV3ReasoningParser(DeepSeekR1ReasoningParser):
|
||||
"""
|
||||
Reasoning parser for Nemotron V3 models.
|
||||
"""
|
||||
|
||||
def extract_reasoning(
|
||||
self, model_output: str, request: ChatCompletionRequest | ResponsesRequest
|
||||
) -> tuple[str | None, str | None]:
|
||||
reasoning_content, final_content = super().extract_reasoning(
|
||||
model_output, request
|
||||
)
|
||||
chat_template_kwargs = getattr(request, "chat_template_kwargs", None)
|
||||
|
||||
if (
|
||||
chat_template_kwargs
|
||||
and (
|
||||
chat_template_kwargs.get("enable_thinking") is False
|
||||
or chat_template_kwargs.get("force_nonempty_content") is True
|
||||
)
|
||||
and final_content is None
|
||||
):
|
||||
reasoning_content, final_content = final_content, reasoning_content
|
||||
|
||||
return reasoning_content, final_content
|
||||
@@ -30,3 +30,8 @@ def round_up(x: int, y: int) -> int:
|
||||
def round_down(x: int, y: int) -> int:
|
||||
"""Round down x to the nearest multiple of y."""
|
||||
return (x // y) * y
|
||||
|
||||
|
||||
def largest_power_of_2_divisor(n: int) -> int:
|
||||
"""Return the largest power-of-2 that divides *n* (isolate lowest set bit)."""
|
||||
return n & (-n)
|
||||
|
||||
@@ -86,6 +86,26 @@ class AttentionBackend(ABC):
|
||||
) -> tuple[int, ...]:
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def get_kv_cache_block_dim(
|
||||
cls,
|
||||
block_size: int,
|
||||
num_kv_heads: int,
|
||||
head_size: int,
|
||||
cache_dtype_str: str = "auto",
|
||||
) -> int:
|
||||
"""Discover which tensor dim is the block index, since different
|
||||
backends lay out dims differently."""
|
||||
_S = 1234567
|
||||
shape = cls.get_kv_cache_shape(
|
||||
_S,
|
||||
block_size,
|
||||
num_kv_heads,
|
||||
head_size,
|
||||
cache_dtype_str=cache_dtype_str,
|
||||
)
|
||||
return shape.index(_S)
|
||||
|
||||
@staticmethod
|
||||
def get_kv_cache_stride_order(
|
||||
include_num_layers_dimension: bool = False,
|
||||
|
||||
@@ -372,12 +372,14 @@ class DeepseekV32IndexerMetadataBuilder(AttentionMetadataBuilder):
|
||||
|
||||
# [7, 6, 8, 0] -> [7, 7, 7, 6, 8, 8, 8, 8]
|
||||
expanded_base = torch.repeat_interleave(
|
||||
seq_lens - decode_lens, decode_lens
|
||||
seq_lens - decode_lens, decode_lens, output_size=actual_expanded
|
||||
)
|
||||
|
||||
# [0, 3, 4, 8] -> [0, 0, 0, 3, 4, 4, 4, 4]
|
||||
expanded_starts = torch.repeat_interleave(
|
||||
common_attn_metadata.query_start_loc[:num_decodes], decode_lens
|
||||
common_attn_metadata.query_start_loc[:num_decodes],
|
||||
decode_lens,
|
||||
output_size=actual_expanded,
|
||||
)
|
||||
|
||||
# [0, 1, 2, 0, 0, 1, 2, 3]
|
||||
@@ -395,7 +397,9 @@ class DeepseekV32IndexerMetadataBuilder(AttentionMetadataBuilder):
|
||||
# Give each of the flattened entries the same block table row as the
|
||||
# original request.
|
||||
self.expanded_block_table_buffer[:actual_expanded] = (
|
||||
torch.repeat_interleave(block_table, decode_lens, dim=0)
|
||||
torch.repeat_interleave(
|
||||
block_table, decode_lens, dim=0, output_size=actual_expanded
|
||||
)
|
||||
)
|
||||
if actual_expanded < num_decode_tokens:
|
||||
self.expanded_block_table_buffer[
|
||||
|
||||
@@ -489,6 +489,13 @@ class KVCacheManager:
|
||||
# Only create new KVCacheBlocks for non-empty blocks
|
||||
return KVCacheBlocks(blocks) if any(blocks) else self.empty_kv_cache_blocks
|
||||
|
||||
def take_new_block_ids(self) -> list[int]:
|
||||
"""Drain and return new attention block IDs for zeroing."""
|
||||
ids: list[int] = []
|
||||
for mgr in self.coordinator.single_type_managers:
|
||||
ids.extend(mgr.take_new_block_ids())
|
||||
return ids
|
||||
|
||||
def new_step_starts(self) -> None:
|
||||
"""Called when a new step is started."""
|
||||
self.coordinator.new_step_starts()
|
||||
|
||||
@@ -233,6 +233,11 @@ class SchedulerOutput:
|
||||
# EC Cache Connector metadata
|
||||
ec_connector_metadata: ECConnectorMetadata | None = None
|
||||
|
||||
# Block IDs freshly allocated from the pool during this scheduling step.
|
||||
# The worker zeros the corresponding GPU memory before the blocks are used,
|
||||
# preventing stale NaN/data from corrupting attention or SSM computation.
|
||||
new_block_ids_to_zero: list[int] | None = None
|
||||
|
||||
@classmethod
|
||||
def make_empty(cls) -> "SchedulerOutput":
|
||||
return cls(
|
||||
|
||||
@@ -48,7 +48,7 @@ from vllm.v1.core.sched.output import (
|
||||
from vllm.v1.core.sched.request_queue import SchedulingPolicy, create_request_queue
|
||||
from vllm.v1.core.sched.utils import check_stop, remove_all
|
||||
from vllm.v1.engine import EngineCoreEventType, EngineCoreOutput, EngineCoreOutputs
|
||||
from vllm.v1.kv_cache_interface import KVCacheConfig, MambaSpec
|
||||
from vllm.v1.kv_cache_interface import KVCacheConfig
|
||||
from vllm.v1.metrics.perf import ModelMetrics, PerfStats
|
||||
from vllm.v1.metrics.stats import PrefixCacheStats, SchedulerStats
|
||||
from vllm.v1.outputs import DraftTokenIds, KVConnectorOutput, ModelRunnerOutput
|
||||
@@ -233,13 +233,8 @@ class Scheduler(SchedulerInterface):
|
||||
self.use_pp = self.parallel_config.pipeline_parallel_size > 1
|
||||
self.use_v2_model_runner = envs.VLLM_USE_V2_MODEL_RUNNER
|
||||
|
||||
def has_mamba_layers(kv_cache_config: KVCacheConfig) -> bool:
|
||||
return any(
|
||||
isinstance(group_spec.kv_cache_spec, MambaSpec)
|
||||
for group_spec in kv_cache_config.kv_cache_groups
|
||||
)
|
||||
|
||||
self.has_mamba_layers = has_mamba_layers(kv_cache_config)
|
||||
self.has_mamba_layers = kv_cache_config.has_mamba_layers
|
||||
self.needs_kv_cache_zeroing = kv_cache_config.needs_kv_cache_zeroing
|
||||
self.need_mamba_block_aligned_split = (
|
||||
self.has_mamba_layers and self.cache_config.mamba_cache_mode == "align"
|
||||
)
|
||||
@@ -871,6 +866,12 @@ class Scheduler(SchedulerInterface):
|
||||
self.prev_step_scheduled_req_ids.clear()
|
||||
self.prev_step_scheduled_req_ids.update(num_scheduled_tokens.keys())
|
||||
|
||||
new_block_ids_to_zero = (
|
||||
(self.kv_cache_manager.take_new_block_ids() or None)
|
||||
if self.needs_kv_cache_zeroing
|
||||
else None
|
||||
)
|
||||
|
||||
scheduler_output = SchedulerOutput(
|
||||
scheduled_new_reqs=new_reqs_data,
|
||||
scheduled_cached_reqs=cached_reqs_data,
|
||||
@@ -886,6 +887,7 @@ class Scheduler(SchedulerInterface):
|
||||
# the previous and the current steps.
|
||||
finished_req_ids=self.finished_req_ids,
|
||||
free_encoder_mm_hashes=self.encoder_cache_manager.get_freed_mm_hashes(),
|
||||
new_block_ids_to_zero=new_block_ids_to_zero,
|
||||
)
|
||||
|
||||
# NOTE(Kuntai): this function is designed for multiple purposes:
|
||||
|
||||
@@ -55,6 +55,7 @@ class SingleTypeKVCacheManager(ABC):
|
||||
self.kv_cache_spec = kv_cache_spec
|
||||
self.block_pool = block_pool
|
||||
self.enable_caching = enable_caching
|
||||
self.new_block_ids: list[int] = []
|
||||
|
||||
# Mapping from request ID to blocks to track the blocks allocated
|
||||
# for each request, so that we can free the blocks when the request
|
||||
@@ -208,6 +209,8 @@ class SingleTypeKVCacheManager(ABC):
|
||||
cdiv(num_total_computed_tokens, self.block_size) - len(req_blocks)
|
||||
)
|
||||
req_blocks.extend(allocated_blocks)
|
||||
if type(self.kv_cache_spec) is FullAttentionSpec:
|
||||
self.new_block_ids.extend(b.block_id for b in allocated_blocks)
|
||||
|
||||
def allocate_new_blocks(
|
||||
self, request_id: str, num_tokens: int, num_tokens_main_model: int
|
||||
@@ -234,8 +237,16 @@ class SingleTypeKVCacheManager(ABC):
|
||||
else:
|
||||
new_blocks = self.block_pool.get_new_blocks(num_new_blocks)
|
||||
req_blocks.extend(new_blocks)
|
||||
if type(self.kv_cache_spec) is FullAttentionSpec:
|
||||
self.new_block_ids.extend(b.block_id for b in new_blocks)
|
||||
return new_blocks
|
||||
|
||||
def take_new_block_ids(self) -> list[int]:
|
||||
"""Drain and return block IDs allocated since the last call."""
|
||||
ids = self.new_block_ids
|
||||
self.new_block_ids = []
|
||||
return ids
|
||||
|
||||
def cache_blocks(self, request: Request, num_tokens: int) -> None:
|
||||
"""
|
||||
Cache the blocks for the request.
|
||||
|
||||
@@ -489,3 +489,11 @@ class KVCacheConfig:
|
||||
For models with multiple types of attention, there will be multiple groups,
|
||||
see `_get_kv_cache_config_uniform_page_size` for more details.
|
||||
"""
|
||||
|
||||
@property
|
||||
def has_mamba_layers(self) -> bool:
|
||||
return any(isinstance(g.kv_cache_spec, MambaSpec) for g in self.kv_cache_groups)
|
||||
|
||||
@property
|
||||
def needs_kv_cache_zeroing(self) -> bool:
|
||||
return self.has_mamba_layers
|
||||
|
||||
@@ -187,6 +187,7 @@ from vllm.v1.worker.workspace import lock_workspace
|
||||
|
||||
from .utils import (
|
||||
AttentionGroup,
|
||||
KVBlockZeroer,
|
||||
add_kv_sharing_layers_to_kv_cache_groups,
|
||||
bind_kv_cache,
|
||||
prepare_kernel_block_sizes,
|
||||
@@ -918,6 +919,26 @@ class GPUModelRunner(
|
||||
decode_threshold=self.reorder_batch_threshold,
|
||||
)
|
||||
|
||||
def _init_kv_zero_meta(self) -> None:
|
||||
"""One-time precomputation for _zero_block_ids.
|
||||
|
||||
Delegates to KVBlockZeroer.init_meta with the runner's state.
|
||||
Called from gpu_worker.py outside the CuMem pool context.
|
||||
"""
|
||||
self._kv_block_zeroer = KVBlockZeroer(self.device, self.pin_memory)
|
||||
self._kv_block_zeroer.init_meta(
|
||||
attn_groups_iter=self._kv_cache_spec_attn_group_iterator(),
|
||||
kernel_block_sizes=self._kernel_block_sizes,
|
||||
cache_dtype=self.cache_config.cache_dtype,
|
||||
runner_only_attn_layers=self.runner_only_attn_layers,
|
||||
static_forward_context=(self.compilation_config.static_forward_context),
|
||||
)
|
||||
|
||||
def _zero_block_ids(self, block_ids: list[int]) -> None:
|
||||
"""Zero the KV cache memory for the given block IDs."""
|
||||
if hasattr(self, "_kv_block_zeroer"):
|
||||
self._kv_block_zeroer.zero_block_ids(block_ids)
|
||||
|
||||
# Note: used for model runner override.
|
||||
def _init_device_properties(self) -> None:
|
||||
"""Initialize attributes from torch.cuda.get_device_properties"""
|
||||
@@ -951,6 +972,11 @@ class GPUModelRunner(
|
||||
for req_id in scheduler_output.finished_req_ids:
|
||||
self.input_batch.remove_request(req_id)
|
||||
|
||||
# Zero GPU memory for freshly allocated cache blocks to prevent
|
||||
# stale NaN/data from corrupting attention or SSM computation.
|
||||
if scheduler_output.new_block_ids_to_zero:
|
||||
self._zero_block_ids(scheduler_output.new_block_ids_to_zero)
|
||||
|
||||
# Free the cached encoder outputs.
|
||||
for mm_hash in scheduler_output.free_encoder_mm_hashes:
|
||||
self.encoder_cache.pop(mm_hash, None)
|
||||
@@ -6066,6 +6092,7 @@ class GPUModelRunner(
|
||||
kernel_block_sizes = prepare_kernel_block_sizes(
|
||||
kv_cache_config, self.attn_groups
|
||||
)
|
||||
self._kernel_block_sizes = kernel_block_sizes
|
||||
|
||||
# create metadata builders
|
||||
self.initialize_metadata_builders(kv_cache_config, kernel_block_sizes)
|
||||
|
||||
@@ -480,6 +480,14 @@ class Worker(WorkerBase):
|
||||
else:
|
||||
self.model_runner.initialize_kv_cache(kv_cache_config)
|
||||
|
||||
# Build KV-zero metadata outside the CuMem pool so the bookkeeping
|
||||
# GPU tensors (seg_addrs, block-id buffers) use the standard PyTorch
|
||||
# allocator and are not discarded during sleep/wake cycles.
|
||||
if kv_cache_config.needs_kv_cache_zeroing and hasattr(
|
||||
self.model_runner, "_init_kv_zero_meta"
|
||||
):
|
||||
self.model_runner._init_kv_zero_meta()
|
||||
|
||||
@instrument(span_name="Warmup (GPU)")
|
||||
def compile_or_warm_up_model(self) -> float:
|
||||
warmup_sizes = []
|
||||
|
||||
@@ -2,7 +2,10 @@
|
||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
||||
import math
|
||||
from collections import defaultdict
|
||||
from collections.abc import Iterable
|
||||
from dataclasses import dataclass, field
|
||||
from itertools import product as iprod
|
||||
from typing import Any
|
||||
|
||||
import torch
|
||||
|
||||
@@ -12,6 +15,8 @@ from vllm.model_executor.layers.attention import Attention
|
||||
from vllm.model_executor.models.interfaces import MultiModalEmbeddings
|
||||
from vllm.model_executor.models.utils import extract_layer_index
|
||||
from vllm.platforms import current_platform
|
||||
from vllm.triton_utils import tl, triton
|
||||
from vllm.utils.math_utils import largest_power_of_2_divisor
|
||||
from vllm.utils.mem_utils import MemorySnapshot, format_gib
|
||||
from vllm.v1.attention.backend import (
|
||||
AttentionBackend,
|
||||
@@ -21,6 +26,7 @@ from vllm.v1.attention.backend import (
|
||||
from vllm.v1.kv_cache_interface import (
|
||||
AttentionSpec,
|
||||
EncoderOnlyAttentionSpec,
|
||||
FullAttentionSpec,
|
||||
KVCacheConfig,
|
||||
KVCacheGroupSpec,
|
||||
KVCacheSpec,
|
||||
@@ -31,6 +37,186 @@ from vllm.v1.kv_cache_interface import (
|
||||
logger = init_logger(__name__)
|
||||
|
||||
|
||||
@triton.jit
|
||||
def _zero_kv_blocks_kernel(
|
||||
seg_addrs_ptr,
|
||||
block_ids_ptr,
|
||||
n_blocks,
|
||||
N_SEGS: tl.constexpr,
|
||||
PAGE_SIZE_EL: tl.constexpr,
|
||||
BLOCK_SIZE: tl.constexpr,
|
||||
):
|
||||
"""Zero KV cache blocks across all segments in a single launch.
|
||||
|
||||
Each segment is a contiguous region of one block's data. For backends
|
||||
where blocks are outermost (block_dim=0) there is one segment per
|
||||
buffer. For backends where K/V is outermost (block_dim=1) there are
|
||||
two segments per buffer (one for K, one for V).
|
||||
|
||||
seg_addrs_ptr holds absolute byte addresses (int64) for each segment,
|
||||
allowing segments to live in different CUDA allocations.
|
||||
|
||||
Programs are mapped as (block_index, seg_index, chunk_index).
|
||||
"""
|
||||
pid = tl.program_id(0)
|
||||
chunks = PAGE_SIZE_EL // BLOCK_SIZE
|
||||
work_per_block = N_SEGS * chunks
|
||||
block_index = pid // work_per_block
|
||||
if block_index >= n_blocks:
|
||||
return
|
||||
remainder = pid % work_per_block
|
||||
seg_index = remainder // chunks
|
||||
chunk_index = remainder % chunks
|
||||
block_id = tl.load(block_ids_ptr + block_index)
|
||||
seg_addr = tl.load(seg_addrs_ptr + seg_index)
|
||||
ptr = tl.cast(seg_addr, tl.pointer_type(tl.int32))
|
||||
offset = (
|
||||
block_id.to(tl.int64) * PAGE_SIZE_EL + chunk_index.to(tl.int64) * BLOCK_SIZE
|
||||
)
|
||||
cols = tl.arange(0, BLOCK_SIZE).to(tl.int64)
|
||||
tl.store(ptr + offset + cols, tl.zeros([BLOCK_SIZE], dtype=tl.int32))
|
||||
|
||||
|
||||
class KVBlockZeroer:
|
||||
"""Manages efficient zeroing of KV cache blocks via a Triton kernel.
|
||||
|
||||
Call :meth:`init_meta` once after KV caches are allocated to precompute
|
||||
segment addresses, then call :meth:`zero_block_ids` each step to zero
|
||||
newly-allocated blocks.
|
||||
"""
|
||||
|
||||
def __init__(self, device: torch.device, pin_memory: bool):
|
||||
self.device = device
|
||||
self.pin_memory = pin_memory
|
||||
self._meta: tuple[torch.Tensor, int, int, int] | None = None
|
||||
self._id_cap: int = 0
|
||||
self._ids_pinned: torch.Tensor | None = None
|
||||
self._ids_gpu: torch.Tensor | None = None
|
||||
|
||||
def init_meta(
|
||||
self,
|
||||
attn_groups_iter: Iterable["AttentionGroup"],
|
||||
kernel_block_sizes: list[int],
|
||||
cache_dtype: str,
|
||||
runner_only_attn_layers: set[str],
|
||||
static_forward_context: dict[str, Any],
|
||||
) -> None:
|
||||
"""One-time precomputation for zero_block_ids.
|
||||
|
||||
Builds absolute-address table for the Triton zeroing kernel.
|
||||
Each entry is the absolute byte address of a segment start on the
|
||||
GPU, so segments in different CUDA allocations work correctly.
|
||||
|
||||
Block IDs from the scheduler reference logical blocks whose size
|
||||
may differ from the kernel block size (virtual block splitting).
|
||||
PAGE_SIZE_EL accounts for this ratio so that
|
||||
``block_id * PAGE_SIZE_EL`` lands at the correct offset.
|
||||
|
||||
Only AttentionSpec layers are processed; Mamba layers are skipped.
|
||||
"""
|
||||
seen_ptrs: set[int] = set()
|
||||
seg_addrs: list[int] = []
|
||||
page_size_el: int | None = None
|
||||
|
||||
for group in attn_groups_iter:
|
||||
spec = group.kv_cache_spec
|
||||
if type(spec) is not FullAttentionSpec:
|
||||
continue
|
||||
if group.kv_cache_group_id >= len(kernel_block_sizes):
|
||||
continue
|
||||
kernel_bs = kernel_block_sizes[group.kv_cache_group_id]
|
||||
ratio = spec.block_size // kernel_bs
|
||||
block_dim = group.backend.get_kv_cache_block_dim(
|
||||
kernel_bs,
|
||||
spec.num_kv_heads,
|
||||
spec.head_size,
|
||||
cache_dtype_str=cache_dtype,
|
||||
)
|
||||
|
||||
for layer_name in group.layer_names:
|
||||
if layer_name in runner_only_attn_layers:
|
||||
continue
|
||||
kv = static_forward_context[layer_name].kv_cache[0]
|
||||
if isinstance(kv, list):
|
||||
continue
|
||||
dp = kv.data_ptr()
|
||||
if dp in seen_ptrs:
|
||||
continue
|
||||
seen_ptrs.add(dp)
|
||||
|
||||
el = kv.element_size()
|
||||
cur_bytes = kv.stride(block_dim) * el
|
||||
assert cur_bytes % 4 == 0
|
||||
kernel_block_el = cur_bytes // 4
|
||||
cur_page_el = kernel_block_el * ratio
|
||||
if page_size_el is None:
|
||||
page_size_el = cur_page_el
|
||||
else:
|
||||
assert page_size_el == cur_page_el, (
|
||||
f"Non-uniform page sizes: {page_size_el} vs {cur_page_el}"
|
||||
)
|
||||
|
||||
block_stride_bytes = cur_bytes
|
||||
outer_dims = [
|
||||
d
|
||||
for d in range(block_dim)
|
||||
if kv.stride(d) * el > block_stride_bytes
|
||||
]
|
||||
outer_strides = [kv.stride(d) * el for d in outer_dims]
|
||||
for outer in iprod(*(range(kv.shape[d]) for d in outer_dims)):
|
||||
off_bytes = sum(i * s for i, s in zip(outer, outer_strides))
|
||||
seg_addrs.append(dp + off_bytes)
|
||||
|
||||
if not seg_addrs or page_size_el is None:
|
||||
self._meta = None
|
||||
return
|
||||
|
||||
blk_size = min(largest_power_of_2_divisor(page_size_el), 1024)
|
||||
self._id_cap = 8192
|
||||
self._ids_pinned = torch.empty(
|
||||
self._id_cap,
|
||||
dtype=torch.int64,
|
||||
pin_memory=self.pin_memory,
|
||||
)
|
||||
self._ids_gpu = torch.empty(self._id_cap, dtype=torch.int64, device=self.device)
|
||||
self._meta = (
|
||||
torch.tensor(seg_addrs, dtype=torch.int64, device=self.device),
|
||||
page_size_el,
|
||||
blk_size,
|
||||
len(seg_addrs),
|
||||
)
|
||||
|
||||
def zero_block_ids(self, block_ids: list[int]) -> None:
|
||||
"""Zero the KV cache memory for the given block IDs."""
|
||||
if not block_ids or self._meta is None:
|
||||
return
|
||||
seg_addrs, page_size_el, blk_size, n_segs = self._meta
|
||||
n_blocks = len(block_ids)
|
||||
if n_blocks > self._id_cap:
|
||||
self._id_cap = n_blocks * 2
|
||||
self._ids_pinned = torch.empty(
|
||||
self._id_cap,
|
||||
dtype=torch.int64,
|
||||
pin_memory=self.pin_memory,
|
||||
)
|
||||
self._ids_gpu = torch.empty(
|
||||
self._id_cap, dtype=torch.int64, device=self.device
|
||||
)
|
||||
assert self._ids_pinned is not None and self._ids_gpu is not None
|
||||
self._ids_pinned[:n_blocks].numpy()[:] = block_ids
|
||||
idx = self._ids_gpu[:n_blocks]
|
||||
idx.copy_(self._ids_pinned[:n_blocks], non_blocking=True)
|
||||
grid = (n_blocks * n_segs * (page_size_el // blk_size),)
|
||||
_zero_kv_blocks_kernel[grid](
|
||||
seg_addrs,
|
||||
idx,
|
||||
n_blocks,
|
||||
N_SEGS=n_segs,
|
||||
PAGE_SIZE_EL=page_size_el,
|
||||
BLOCK_SIZE=blk_size,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AttentionGroup:
|
||||
backend: type[AttentionBackend]
|
||||
|
||||
Reference in New Issue
Block a user