Signed-off-by: Jialin Ouyang <Jialin.Ouyang@gmail.com>
This commit is contained in:
@@ -53,6 +53,7 @@ class GCDebugger:
|
||||
self.config = config
|
||||
# Start time in micro second of this GC cycle
|
||||
self.start_time_ns: int = time.monotonic_ns()
|
||||
self.num_objects: int = 0
|
||||
# If config.top_objects is positive,
|
||||
# compute top collected objects by object types
|
||||
self.gc_top_collected_objects: str = ""
|
||||
@@ -68,19 +69,21 @@ class GCDebugger:
|
||||
# Before GC started, record GC start time
|
||||
# and top collected objects
|
||||
self.start_time_ns = time.monotonic_ns()
|
||||
if (top_objects := self.config.top_objects) > 0:
|
||||
self.gc_top_collected_objects = _compute_top_gc_collected_objects(
|
||||
gc.get_objects(generation), top_objects
|
||||
)
|
||||
objects = gc.get_objects(generation)
|
||||
self.num_objects = len(objects)
|
||||
self.gc_top_collected_objects = _compute_top_gc_collected_objects(
|
||||
objects, self.config.top_objects
|
||||
)
|
||||
elif phase == "stop":
|
||||
# After GC finished, Record GC elapsed time and
|
||||
# optionally top collected objects
|
||||
elpased_ms = (time.monotonic_ns() - self.start_time_ns) / 1e6
|
||||
logger.info(
|
||||
"GC took %.3fms to complete. "
|
||||
"Collected %s objects in GC generation %d.%s",
|
||||
"Collected %s objects (out of %d) in GC generation %d.%s",
|
||||
elpased_ms,
|
||||
str(info.get("collected", "?")),
|
||||
self.num_objects,
|
||||
generation,
|
||||
(
|
||||
f" Top collected objects: \n{self.gc_top_collected_objects}"
|
||||
|
||||
@@ -1013,8 +1013,8 @@ class Scheduler(SchedulerInterface):
|
||||
continue
|
||||
|
||||
req_index = model_runner_output.req_id_to_index[req_id]
|
||||
generated_token_ids: list[int] = (
|
||||
sampled_token_ids[req_index].tolist() if sampled_token_ids else []
|
||||
generated_token_ids = (
|
||||
sampled_token_ids[req_index] if sampled_token_ids else []
|
||||
)
|
||||
|
||||
scheduled_spec_token_ids = (
|
||||
|
||||
@@ -158,7 +158,7 @@ class ModelRunnerOutput:
|
||||
# num_generated_tokens is the number of tokens
|
||||
# generated in the current step. It can be different for
|
||||
# each request due to speculative/jump decoding.
|
||||
sampled_token_ids: list[np.ndarray]
|
||||
sampled_token_ids: list[list[int]]
|
||||
|
||||
# [num_reqs, max_num_logprobs + 1]
|
||||
# [num_reqs, max_num_logprobs + 1]
|
||||
@@ -220,7 +220,7 @@ def make_empty_encoder_model_runner_output(
|
||||
req_id_to_index: dict[str, int] = {rid: idx for idx, rid in enumerate(req_ids)}
|
||||
|
||||
# No tokens generated yet ⇒ one empty list per request
|
||||
sampled_token_ids: list[list[int]] = [np.array([0]) for _ in req_ids]
|
||||
sampled_token_ids: list[list[int]] = [[0] for _ in req_ids]
|
||||
|
||||
# Pooler outputs are not available yet ⇒ use None placeholders
|
||||
pooler_output: list[torch.Tensor | None] = [None for _ in req_ids]
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
|
||||
from dataclasses import replace
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
|
||||
@@ -205,7 +204,7 @@ class RejectionSampler(nn.Module):
|
||||
def parse_output(
|
||||
output_token_ids: torch.Tensor,
|
||||
vocab_size: int,
|
||||
) -> list[np.ndarray]:
|
||||
) -> list[list[int]]:
|
||||
"""Parse the output of the rejection sampler.
|
||||
Args:
|
||||
output_token_ids: The sampled token IDs in shape
|
||||
@@ -221,7 +220,10 @@ class RejectionSampler(nn.Module):
|
||||
valid_mask = (output_token_ids_np != PLACEHOLDER_TOKEN_ID) & (
|
||||
output_token_ids_np < vocab_size
|
||||
)
|
||||
return [row[valid_mask[i]] for i, row in enumerate(output_token_ids_np)]
|
||||
outputs = [
|
||||
row[valid_mask[i]].tolist() for i, row in enumerate(output_token_ids_np)
|
||||
]
|
||||
return outputs
|
||||
|
||||
def apply_logits_processors(
|
||||
self,
|
||||
|
||||
@@ -496,7 +496,7 @@ class EagleProposer:
|
||||
|
||||
def prepare_next_token_ids_cpu(
|
||||
self,
|
||||
sampled_token_ids: list[np.ndarray],
|
||||
sampled_token_ids: list[list[int]],
|
||||
requests: dict[str, CachedRequestState],
|
||||
gpu_input_batch: InputBatch,
|
||||
num_scheduled_tokens: dict[str, int],
|
||||
@@ -511,7 +511,7 @@ class EagleProposer:
|
||||
req_ids = gpu_input_batch.req_ids
|
||||
next_token_ids: list[int] = []
|
||||
for i, token_ids in enumerate(sampled_token_ids):
|
||||
if token_ids.shape[0] > 0:
|
||||
if token_ids:
|
||||
# Common case.
|
||||
next_token_id = token_ids[-1]
|
||||
else:
|
||||
@@ -522,9 +522,10 @@ class EagleProposer:
|
||||
seq_len = req_state.num_computed_tokens + num_scheduled_tokens[req_id]
|
||||
next_token_id = req_state.get_token_id(seq_len)
|
||||
next_token_ids.append(next_token_id)
|
||||
return torch.tensor(
|
||||
next_token_ids = torch.tensor(
|
||||
next_token_ids, dtype=torch.int32, device=self.input_ids.device
|
||||
)
|
||||
return next_token_ids
|
||||
|
||||
def prepare_next_token_ids_padded(
|
||||
self,
|
||||
|
||||
@@ -54,7 +54,7 @@ class NgramProposer:
|
||||
# Trigger Numba JIT compilation for N-gram proposer.
|
||||
# This usually takes less than 1 second.
|
||||
self.propose(
|
||||
[np.array([])] * 1024,
|
||||
[[]] * 1024,
|
||||
[""] * 1024,
|
||||
np.zeros(1024, dtype=np.int32),
|
||||
np.zeros((1024, self.max_model_len), dtype=np.int32),
|
||||
@@ -131,7 +131,7 @@ class NgramProposer:
|
||||
|
||||
def propose(
|
||||
self,
|
||||
sampled_token_ids: list[np.ndarray],
|
||||
sampled_token_ids: list[list[int]],
|
||||
req_ids: list[str],
|
||||
num_tokens_no_spec: np.ndarray,
|
||||
token_ids_cpu: np.ndarray,
|
||||
@@ -140,7 +140,7 @@ class NgramProposer:
|
||||
# find which requests need ngram proposals
|
||||
valid_ngram_requests = []
|
||||
for i, sampled_ids in enumerate(sampled_token_ids):
|
||||
num_sampled_ids = sampled_ids.shape[0]
|
||||
num_sampled_ids = len(sampled_ids)
|
||||
if not num_sampled_ids:
|
||||
# Skip speculative decoding.
|
||||
continue
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
||||
import numpy as np
|
||||
|
||||
from vllm.config import VllmConfig
|
||||
from vllm.v1.worker.gpu_input_batch import InputBatch
|
||||
|
||||
@@ -34,16 +32,16 @@ class SuffixDecodingProposer:
|
||||
def propose(
|
||||
self,
|
||||
input_batch: InputBatch,
|
||||
sampled_token_ids: list[np.ndarray],
|
||||
sampled_token_ids: list[list[int]],
|
||||
) -> list[list[int]]:
|
||||
"""
|
||||
Propose speculative tokens for each request in the input batch. Suffix Decoding
|
||||
will speculate a dynamic number of tokens for each request every decoding step,
|
||||
so each entry in the returned list may have different lengths.
|
||||
"""
|
||||
draft_token_ids: list[np.ndarray] = []
|
||||
draft_token_ids: list[list[int]] = []
|
||||
for i, sampled_ids in enumerate(sampled_token_ids):
|
||||
if sampled_ids.shape[0] == 0:
|
||||
if not sampled_ids:
|
||||
# Skip speculative decoding for partial prefills.
|
||||
draft_token_ids.append([])
|
||||
continue
|
||||
@@ -72,7 +70,7 @@ class SuffixDecodingProposer:
|
||||
self.suffix_cache.start_request(req_id, prompt_token_ids)
|
||||
|
||||
# Append the newly sampled ids to the suffix cache for this request.
|
||||
self.suffix_cache.add_active_response(req_id, sampled_ids.tolist())
|
||||
self.suffix_cache.add_active_response(req_id, sampled_ids)
|
||||
|
||||
# Suffix decoding only uses the most recent tokens up to max_tree_depth, so
|
||||
# we extract the pattern from the end of the input.
|
||||
|
||||
@@ -221,16 +221,14 @@ class AsyncGPUModelRunnerOutput(AsyncModelRunnerOutput):
|
||||
del self._sampled_token_ids
|
||||
max_gen_len = self.sampled_token_ids_cpu.shape[-1]
|
||||
if max_gen_len == 1:
|
||||
valid_sampled_token_ids: list[np.ndarray] = [
|
||||
row for row in self.sampled_token_ids_cpu.numpy()
|
||||
]
|
||||
valid_sampled_token_ids = self.sampled_token_ids_cpu.tolist()
|
||||
else:
|
||||
valid_sampled_token_ids = RejectionSampler.parse_output(
|
||||
self.sampled_token_ids_cpu,
|
||||
self.vocab_size,
|
||||
)
|
||||
for i in self._invalid_req_indices:
|
||||
valid_sampled_token_ids[i] = np.array([])
|
||||
valid_sampled_token_ids[i].clear()
|
||||
|
||||
output = self._model_runner_output
|
||||
output.sampled_token_ids = valid_sampled_token_ids
|
||||
@@ -2466,7 +2464,7 @@ class GPUModelRunner(
|
||||
) -> tuple[
|
||||
dict[str, int],
|
||||
LogprobsLists | None,
|
||||
list[np.ndarray],
|
||||
list[list[int]],
|
||||
dict[str, LogprobsTensors | None],
|
||||
list[str],
|
||||
dict[str, int],
|
||||
@@ -2492,7 +2490,6 @@ class GPUModelRunner(
|
||||
num_sampled_tokens = sampler_output.sampled_token_ids.shape[0]
|
||||
sampled_token_ids = sampler_output.sampled_token_ids
|
||||
invalid_req_indices = []
|
||||
valid_sampled_token_ids: list[np.ndarray]
|
||||
if not self.use_async_scheduling:
|
||||
# Get the valid generated tokens.
|
||||
max_gen_len = sampled_token_ids.shape[-1]
|
||||
@@ -2507,7 +2504,7 @@ class GPUModelRunner(
|
||||
)
|
||||
# Mask out the sampled tokens that should not be sampled.
|
||||
for i in discard_sampled_tokens_req_indices:
|
||||
valid_sampled_token_ids[int(i)] = np.array([])
|
||||
valid_sampled_token_ids[int(i)].clear()
|
||||
else:
|
||||
valid_sampled_token_ids = []
|
||||
invalid_req_indices = discard_sampled_tokens_req_indices.tolist()
|
||||
@@ -2537,24 +2534,19 @@ class GPUModelRunner(
|
||||
[0] if spec_decode_metadata and logprobs_tensors else None
|
||||
)
|
||||
for req_idx in range(num_sampled_tokens):
|
||||
sampled_ids: np.ndarray | None
|
||||
if self.use_async_scheduling:
|
||||
sampled_ids = (
|
||||
np.array([-1]) if req_idx not in invalid_req_indices_set else None
|
||||
)
|
||||
sampled_ids = [-1] if req_idx not in invalid_req_indices_set else None
|
||||
else:
|
||||
sampled_ids = valid_sampled_token_ids[req_idx]
|
||||
|
||||
num_sampled_ids: int = (
|
||||
sampled_ids.shape[0] if sampled_ids is not None else 0
|
||||
)
|
||||
num_sampled_ids: int = len(sampled_ids) if sampled_ids else 0
|
||||
|
||||
if cu_num_accepted_tokens is not None:
|
||||
cu_num_accepted_tokens.append(
|
||||
cu_num_accepted_tokens[-1] + num_sampled_ids
|
||||
)
|
||||
|
||||
if sampled_ids is None or num_sampled_ids == 0:
|
||||
if not sampled_ids:
|
||||
continue
|
||||
|
||||
start_idx = self.input_batch.num_tokens_no_spec[req_idx]
|
||||
@@ -2938,9 +2930,7 @@ class GPUModelRunner(
|
||||
|
||||
self.input_batch.prev_sampled_token_ids = None
|
||||
|
||||
def propose_draft_token_ids(
|
||||
sampled_token_ids: torch.Tensor | list[np.ndarray],
|
||||
) -> None:
|
||||
def propose_draft_token_ids(sampled_token_ids):
|
||||
assert spec_decode_common_attn_metadata is not None
|
||||
with record_function_or_nullcontext("gpu_model_runner: draft"):
|
||||
self._draft_token_ids = self.propose_draft_token_ids(
|
||||
@@ -3113,14 +3103,14 @@ class GPUModelRunner(
|
||||
def propose_draft_token_ids(
|
||||
self,
|
||||
scheduler_output: "SchedulerOutput",
|
||||
sampled_token_ids: torch.Tensor | list[np.ndarray],
|
||||
sampled_token_ids: torch.Tensor | list[list[int]],
|
||||
sampling_metadata: SamplingMetadata,
|
||||
hidden_states: torch.Tensor,
|
||||
sample_hidden_states: torch.Tensor,
|
||||
aux_hidden_states: list[torch.Tensor] | None,
|
||||
spec_decode_metadata: SpecDecodeMetadata | None,
|
||||
common_attn_metadata: CommonAttentionMetadata,
|
||||
) -> torch.Tensor | list[list[int]]:
|
||||
) -> list[list[int]] | torch.Tensor:
|
||||
num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens
|
||||
spec_config = self.speculative_config
|
||||
assert spec_config is not None
|
||||
@@ -3154,7 +3144,7 @@ class GPUModelRunner(
|
||||
for num_draft, tokens in zip(
|
||||
spec_decode_metadata.num_draft_tokens, sampled_token_ids
|
||||
):
|
||||
indices.append(offset + tokens.shape[0] - 1)
|
||||
indices.append(offset + len(tokens) - 1)
|
||||
offset += num_draft + 1
|
||||
indices = torch.tensor(indices, device=self.device)
|
||||
hidden_states = sample_hidden_states[indices]
|
||||
@@ -5150,7 +5140,7 @@ class GPUModelRunner(
|
||||
|
||||
return kv_cache_spec
|
||||
|
||||
def _to_list(self, sampled_token_ids: torch.Tensor) -> list[np.ndarray]:
|
||||
def _to_list(self, sampled_token_ids: torch.Tensor) -> list[list[int]]:
|
||||
# This is a short term mitigation for issue mentioned in
|
||||
# https://github.com/vllm-project/vllm/issues/22754.
|
||||
# `tolist` would trigger a cuda wise stream sync, which
|
||||
@@ -5163,4 +5153,4 @@ class GPUModelRunner(
|
||||
pinned.copy_(sampled_token_ids, non_blocking=True)
|
||||
self.transfer_event.record()
|
||||
self.transfer_event.synchronize()
|
||||
return [row for row in pinned.numpy()]
|
||||
return pinned.tolist()
|
||||
|
||||
@@ -1262,15 +1262,13 @@ class TPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
|
||||
|
||||
max_gen_len = selected_token_ids.shape[-1]
|
||||
if max_gen_len == 1:
|
||||
valid_sampled_token_ids: list[np.ndarray] = [
|
||||
row for row in selected_token_ids.numpy()
|
||||
]
|
||||
valid_sampled_token_ids = selected_token_ids.tolist()
|
||||
|
||||
# Mask out the sampled tokens that should not be sampled.
|
||||
# TODO: Keep in sync with gpu_model_runner.py, in particular
|
||||
# the "else" case here
|
||||
for i in discard_sampled_tokens_req_indices:
|
||||
valid_sampled_token_ids[i] = np.array([])
|
||||
valid_sampled_token_ids[i].clear()
|
||||
|
||||
# Append sampled tokens
|
||||
for i, req_state, seq_len in request_seq_lens:
|
||||
@@ -1283,7 +1281,7 @@ class TPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
|
||||
valid_mask = selected_token_ids != INVALID_TOKEN_ID
|
||||
gen_lens = valid_mask.sum(dim=1).tolist()
|
||||
valid_sampled_token_ids = [
|
||||
seq.numpy() for seq in selected_token_ids[valid_mask].split(gen_lens)
|
||||
seq.tolist() for seq in selected_token_ids[valid_mask].split(gen_lens)
|
||||
]
|
||||
self.input_batch.num_tokens[:num_reqs] += gen_lens
|
||||
for i, req_state, seq_len in request_seq_lens:
|
||||
|
||||
Reference in New Issue
Block a user