Files
vllm/vllm/v1/worker/gpu/pp_handler.py
2026-02-17 02:31:40 -08:00

110 lines
3.6 KiB
Python

# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Pipeline Parallelism handler for V2 Model Runner."""
import torch
from vllm.distributed.parallel_state import get_pp_group
from vllm.v1.worker.gpu.sample.output import SamplerOutput
class PPHandler:
"""Pipeline parallelism handler for Model Runner V2.
Manages sampled token synchronization between PP ranks.
Only instantiated when PP is enabled (pp_size > 1).
"""
def __init__(self, device: torch.device):
self.device = device
def maybe_broadcast_sampled_tokens(
self,
sampler_output: SamplerOutput,
num_sampled: torch.Tensor,
num_rejected: torch.Tensor,
) -> None:
"""Broadcast sampled tokens from the last PP rank to all other ranks.
No-ops if this is not the last rank.
Broadcasts sampled_token_ids [num_reqs, max_sample_len], num_sampled
[num_reqs], and num_rejected [num_reqs] to support both regular decode
and speculative decoding.
Args:
sampler_output: SamplerOutput from sampling.
num_sampled: Number of accepted tokens per request.
num_rejected: Number of rejected tokens per request.
"""
pp = get_pp_group()
if not pp.is_last_rank:
return
torch.distributed.broadcast(
sampler_output.sampled_token_ids.contiguous(),
src=pp.last_rank,
group=pp.device_group,
)
# NOTE: num_sampled/num_rejected are only needed
# for speculative decoding.
torch.distributed.broadcast(
num_sampled.contiguous(),
src=pp.last_rank,
group=pp.device_group,
)
torch.distributed.broadcast(
num_rejected.contiguous(),
src=pp.last_rank,
group=pp.device_group,
)
def maybe_receive_sampled_tokens(
self,
num_reqs: int,
max_sample_len: int = 1,
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor] | None:
"""Receive sampled tokens broadcast by the last PP rank.
Returns None if this is the last rank (which samples, not receives).
Args:
num_reqs: Number of requests in the batch.
max_sample_len: Maximum number of tokens sampled per request
(1 for regular decode, >1 for speculative decoding).
Returns:
None if called on last rank.
Otherwise, tuple of (sampled_tokens, num_sampled, num_rejected):
- sampled_tokens: shape [num_reqs, max_sample_len]
- num_sampled: shape [num_reqs]
- num_rejected: shape [num_reqs]
"""
pp = get_pp_group()
if pp.is_last_rank:
return None
sampled_tokens = torch.empty(
num_reqs, max_sample_len, dtype=torch.int64, device=self.device
)
torch.distributed.broadcast(
sampled_tokens,
src=pp.last_rank,
group=pp.device_group,
)
# NOTE: num_sampled/num_rejected are only needed
# for speculative decoding.
num_sampled = torch.empty(num_reqs, dtype=torch.int32, device=self.device)
torch.distributed.broadcast(
num_sampled,
src=pp.last_rank,
group=pp.device_group,
)
num_rejected = torch.empty(num_reqs, dtype=torch.int32, device=self.device)
torch.distributed.broadcast(
num_rejected,
src=pp.last_rank,
group=pp.device_group,
)
return sampled_tokens, num_sampled, num_rejected