diff --git a/vllm/v1/worker/gpu/model_runner.py b/vllm/v1/worker/gpu/model_runner.py index 20da820c9..233cab715 100644 --- a/vllm/v1/worker/gpu/model_runner.py +++ b/vllm/v1/worker/gpu/model_runner.py @@ -168,6 +168,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): self.structured_outputs_worker = StructuredOutputsWorker( max_num_logits=self.max_num_reqs * (self.num_speculative_steps + 1), vocab_size=self.vocab_size, + device=self.device, ) # LoRA-related workers. self.lora_state = LoraState(max_num_reqs=self.max_num_reqs) diff --git a/vllm/v1/worker/gpu/structured_outputs.py b/vllm/v1/worker/gpu/structured_outputs.py index 2eaadbb0b..92d23668f 100644 --- a/vllm/v1/worker/gpu/structured_outputs.py +++ b/vllm/v1/worker/gpu/structured_outputs.py @@ -5,7 +5,7 @@ import torch from vllm.triton_utils import tl, triton from vllm.utils.math_utils import cdiv -from vllm.v1.worker.gpu.buffer_utils import UvaBufferPool +from vllm.v1.worker.gpu.buffer_utils import async_copy_to_gpu from vllm.v1.worker.gpu.input_batch import InputBatch @@ -14,13 +14,16 @@ class StructuredOutputsWorker: self, max_num_logits: int, vocab_size: int, + device: torch.device, ): - # NOTE(woosuk): Here, we use UvaBufferPool instead of UvaBackedTensor - # to save a unnecessary CPU-to-CPU copy. - self.logits_indices = UvaBufferPool(max_num_logits, torch.int32) - self.grammar_bitmask = UvaBufferPool( - (max_num_logits, cdiv(vocab_size, 32)), torch.int32 + self.logits_indices = torch.zeros( + max_num_logits, dtype=torch.int32, device=device ) + self.grammar_bitmask = torch.zeros( + (max_num_logits, cdiv(vocab_size, 32)), dtype=torch.int32, device=device + ) + self.device = device + self.copy_stream = torch.cuda.Stream() def apply_grammar_bitmask( self, @@ -32,6 +35,12 @@ class StructuredOutputsWorker: if not grammar_req_ids: return + # Asynchronously copy the bitmask to GPU. + with torch.cuda.stream(self.copy_stream): + bitmask = async_copy_to_gpu( + grammar_bitmask, out=self.grammar_bitmask[: grammar_bitmask.shape[0]] + ) + # Construct bitmask -> logits mapping mapping: list[int] = [] req_ids = input_batch.req_ids @@ -42,12 +51,19 @@ class StructuredOutputsWorker: logits_start_idx = cu_num_logits[req_idx] logits_end_idx = cu_num_logits[req_idx + 1] mapping.extend(range(logits_start_idx, logits_end_idx)) - # Copy the mapping. - mapping_np = np.array(mapping, dtype=np.int32) - logits_indices = self.logits_indices.copy_to_uva(mapping_np) - # Copy the bitmask. - bitmask = self.grammar_bitmask.copy_to_uva(grammar_bitmask) + # Asynchronously copy the mapping to GPU. + with torch.cuda.stream(self.copy_stream): + logits_indices = torch.tensor( + mapping, dtype=torch.int32, device="cpu", pin_memory=True + ) + logits_indices = self.logits_indices[: len(mapping)].copy_( + logits_indices, non_blocking=True + ) + + # Ensure all async copies are complete before launching the kernel. + current_stream = torch.cuda.current_stream() + current_stream.wait_stream(self.copy_stream) num_masks = bitmask.shape[0] assert num_masks == len(mapping) @@ -64,6 +80,10 @@ class StructuredOutputsWorker: BLOCK_SIZE=BLOCK_SIZE, ) + # Ensure the copy stream waits for the device tensors to finish being used + # before it re-uses or deallocates them + self.copy_stream.wait_stream(current_stream) + # Adapted from # https://github.com/mlc-ai/xgrammar/blob/main/python/xgrammar/kernels/apply_token_bitmask_inplace_triton.py