From 28a82bb5e63ffebddac300788869793d633c15b5 Mon Sep 17 00:00:00 2001 From: Robert Shaw <114415538+robertgshaw2-redhat@users.noreply.github.com> Date: Tue, 11 Nov 2025 00:59:08 -0500 Subject: [PATCH] [Bugfix] Fix Stream Sync for Shared Expert Overlap (#28430) Signed-off-by: Vadim Gimpelson Signed-off-by: Robert Shaw Co-authored-by: Vadim Gimpelson (cherry picked from commit e605e8e3233f895340f46665f93ab37b307491aa) --- .../gsm8k/configs/Qwen1.5-MoE-W4A16-CT.yaml | 3 -- vllm/model_executor/layers/fused_moe/layer.py | 45 +++++++------------ 2 files changed, 15 insertions(+), 33 deletions(-) diff --git a/tests/evals/gsm8k/configs/Qwen1.5-MoE-W4A16-CT.yaml b/tests/evals/gsm8k/configs/Qwen1.5-MoE-W4A16-CT.yaml index ea9c95158..9297bf6dd 100644 --- a/tests/evals/gsm8k/configs/Qwen1.5-MoE-W4A16-CT.yaml +++ b/tests/evals/gsm8k/configs/Qwen1.5-MoE-W4A16-CT.yaml @@ -3,6 +3,3 @@ accuracy_threshold: 0.45 num_questions: 1319 num_fewshot: 5 max_model_len: 4096 -# Duo stream incompatabilbe with this model: https://github.com/vllm-project/vllm/issues/28220 -env: - VLLM_DISABLE_SHARED_EXPERTS_STREAM: "1" diff --git a/vllm/model_executor/layers/fused_moe/layer.py b/vllm/model_executor/layers/fused_moe/layer.py index f86a93e30..1b7bba3fc 100644 --- a/vllm/model_executor/layers/fused_moe/layer.py +++ b/vllm/model_executor/layers/fused_moe/layer.py @@ -2439,28 +2439,6 @@ class FusedMoE(CustomOp): staged_hidden_states.copy_(hidden_states, non_blocking=True) staged_router_logits.copy_(router_logits, non_blocking=True) - # If there are shared experts but we are not using a modular kernel, - # the shared experts must be called here - if has_separate_shared_experts: - assert self.shared_experts is not None - - if self.shared_experts_stream is not None: - # For chunked, we start the shared experts stream here - # (Note that no concurrency with the router/gate) - self.shared_experts_stream.wait_stream(current_stream()) - - with torch.cuda.stream(self.shared_experts_stream): - # Note that staged_hidden_states clone() is necessary - # here to avoid conflict with the main stream - shared_output = self.shared_experts( - staged_hidden_states.clone() - ) - else: - shared_output = self.shared_experts(staged_hidden_states) - - else: - shared_output = None - # Matrix multiply. final_hidden_states = self.quant_method.apply( layer=self, @@ -2489,11 +2467,7 @@ class FusedMoE(CustomOp): if has_separate_shared_experts: assert not isinstance(final_hidden_states, tuple) assert self.shared_experts is not None - - # Here we finish the shared experts stream - if self.shared_experts_stream is not None: - current_stream().wait_stream(self.shared_experts_stream) - + shared_output = self.shared_experts(staged_hidden_states) final_hidden_states = ( shared_output, final_hidden_states, @@ -2602,11 +2576,22 @@ class FusedMoE(CustomOp): assert self.shared_experts is not None if self.shared_experts_stream is not None: + # Clone BEFORE switching streams to avoid race condition + # where routed_expert kernel may mutate hidden_states. + hidden_states_clone = hidden_states.clone() + self.shared_experts_stream.wait_stream(current_stream()) + # Run shared experts in parallel on a separate stream with torch.cuda.stream(self.shared_experts_stream): - # Note that hidden_states clone() is necessary here to avoid - # conflict with the main stream - shared_output = self.shared_experts(hidden_states.clone()) + shared_output = self.shared_experts(hidden_states_clone) + + # Record that the clone will be used by shared_experts_stream + # to avoid gc issue from deallocation of hidden_states_clone + # For more details: https://docs.pytorch.org/docs/stable/generated/torch.Tensor.record_stream.html # noqa: E501 + # NOTE: we dont need shared_output.record_stream(current_stream()) + # because we synch the streams before using shared_output. + hidden_states_clone.record_stream(self.shared_experts_stream) + else: shared_output = self.shared_experts(hidden_states) else: