[Model Runner V2] Use a different stream for grammar bitmask h2d copy (#33059)
Signed-off-by: Woosuk Kwon <woosuk@inferact.ai> Signed-off-by: Woosuk Kwon <woosuk.kwon@berkeley.edu> Co-authored-by: Nick Hill <nhill@redhat.com>
This commit is contained in:
@@ -168,6 +168,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
|
||||
self.structured_outputs_worker = StructuredOutputsWorker(
|
||||
max_num_logits=self.max_num_reqs * (self.num_speculative_steps + 1),
|
||||
vocab_size=self.vocab_size,
|
||||
device=self.device,
|
||||
)
|
||||
# LoRA-related workers.
|
||||
self.lora_state = LoraState(max_num_reqs=self.max_num_reqs)
|
||||
|
||||
@@ -5,7 +5,7 @@ import torch
|
||||
|
||||
from vllm.triton_utils import tl, triton
|
||||
from vllm.utils.math_utils import cdiv
|
||||
from vllm.v1.worker.gpu.buffer_utils import UvaBufferPool
|
||||
from vllm.v1.worker.gpu.buffer_utils import async_copy_to_gpu
|
||||
from vllm.v1.worker.gpu.input_batch import InputBatch
|
||||
|
||||
|
||||
@@ -14,13 +14,16 @@ class StructuredOutputsWorker:
|
||||
self,
|
||||
max_num_logits: int,
|
||||
vocab_size: int,
|
||||
device: torch.device,
|
||||
):
|
||||
# NOTE(woosuk): Here, we use UvaBufferPool instead of UvaBackedTensor
|
||||
# to save a unnecessary CPU-to-CPU copy.
|
||||
self.logits_indices = UvaBufferPool(max_num_logits, torch.int32)
|
||||
self.grammar_bitmask = UvaBufferPool(
|
||||
(max_num_logits, cdiv(vocab_size, 32)), torch.int32
|
||||
self.logits_indices = torch.zeros(
|
||||
max_num_logits, dtype=torch.int32, device=device
|
||||
)
|
||||
self.grammar_bitmask = torch.zeros(
|
||||
(max_num_logits, cdiv(vocab_size, 32)), dtype=torch.int32, device=device
|
||||
)
|
||||
self.device = device
|
||||
self.copy_stream = torch.cuda.Stream()
|
||||
|
||||
def apply_grammar_bitmask(
|
||||
self,
|
||||
@@ -32,6 +35,12 @@ class StructuredOutputsWorker:
|
||||
if not grammar_req_ids:
|
||||
return
|
||||
|
||||
# Asynchronously copy the bitmask to GPU.
|
||||
with torch.cuda.stream(self.copy_stream):
|
||||
bitmask = async_copy_to_gpu(
|
||||
grammar_bitmask, out=self.grammar_bitmask[: grammar_bitmask.shape[0]]
|
||||
)
|
||||
|
||||
# Construct bitmask -> logits mapping
|
||||
mapping: list[int] = []
|
||||
req_ids = input_batch.req_ids
|
||||
@@ -42,12 +51,19 @@ class StructuredOutputsWorker:
|
||||
logits_start_idx = cu_num_logits[req_idx]
|
||||
logits_end_idx = cu_num_logits[req_idx + 1]
|
||||
mapping.extend(range(logits_start_idx, logits_end_idx))
|
||||
# Copy the mapping.
|
||||
mapping_np = np.array(mapping, dtype=np.int32)
|
||||
logits_indices = self.logits_indices.copy_to_uva(mapping_np)
|
||||
|
||||
# Copy the bitmask.
|
||||
bitmask = self.grammar_bitmask.copy_to_uva(grammar_bitmask)
|
||||
# Asynchronously copy the mapping to GPU.
|
||||
with torch.cuda.stream(self.copy_stream):
|
||||
logits_indices = torch.tensor(
|
||||
mapping, dtype=torch.int32, device="cpu", pin_memory=True
|
||||
)
|
||||
logits_indices = self.logits_indices[: len(mapping)].copy_(
|
||||
logits_indices, non_blocking=True
|
||||
)
|
||||
|
||||
# Ensure all async copies are complete before launching the kernel.
|
||||
current_stream = torch.cuda.current_stream()
|
||||
current_stream.wait_stream(self.copy_stream)
|
||||
|
||||
num_masks = bitmask.shape[0]
|
||||
assert num_masks == len(mapping)
|
||||
@@ -64,6 +80,10 @@ class StructuredOutputsWorker:
|
||||
BLOCK_SIZE=BLOCK_SIZE,
|
||||
)
|
||||
|
||||
# Ensure the copy stream waits for the device tensors to finish being used
|
||||
# before it re-uses or deallocates them
|
||||
self.copy_stream.wait_stream(current_stream)
|
||||
|
||||
|
||||
# Adapted from
|
||||
# https://github.com/mlc-ai/xgrammar/blob/main/python/xgrammar/kernels/apply_token_bitmask_inplace_triton.py
|
||||
|
||||
Reference in New Issue
Block a user