diff --git a/vllm/distributed/eplb/async_worker.py b/vllm/distributed/eplb/async_worker.py index 9d7366996..9b38c37f2 100644 --- a/vllm/distributed/eplb/async_worker.py +++ b/vllm/distributed/eplb/async_worker.py @@ -30,6 +30,7 @@ def start_async_worker( ep_group = get_ep_group().device_group rank = ep_group.rank() device_index = state.cuda_device_index + assert state.is_async def thread_target() -> None: assert device_index is not None @@ -42,9 +43,9 @@ def start_async_worker( transfer_run_periodically( state=state, ep_group=ep_group, + cuda_stream=cuda_stream, is_profile=is_profile, rank_mapping=rank_mapping, - cuda_stream=cuda_stream, ) ) except Exception as exc: # pragma: no cover - diagnostic path @@ -60,17 +61,16 @@ def start_async_worker( async def transfer_run_periodically( state: "EplbState", ep_group: ProcessGroup, + cuda_stream: torch.cuda.Stream, is_profile: bool = False, rank_mapping: dict[int, int] | None = None, - cuda_stream: torch.cuda.Stream = None, ) -> None: while True: await asyncio.to_thread(state.rearrange_event.wait) logger.info("async worker woke up for EPLB transfer") + assert state.is_async for model_state in state.model_states.values(): - if not model_state.is_async_enabled: - continue current_num_layers = model_state.model.num_moe_layers while ( model_state.rebalanced diff --git a/vllm/distributed/eplb/eplb_state.py b/vllm/distributed/eplb/eplb_state.py index a482c6f55..26571cd80 100644 --- a/vllm/distributed/eplb/eplb_state.py +++ b/vllm/distributed/eplb/eplb_state.py @@ -182,10 +182,6 @@ class EplbModelState: """ intermediate variable between `move_to_buffer` and `move_to_workspace`. """ - is_async_enabled: bool - """ - The flag indicates whether the EPLB is running in async mode. - """ cuda_device_index: int | None """ CUDA device index for the async EPLB worker thread. @@ -518,7 +514,6 @@ class EplbState: recv_expert_ids=np.array([]), recv_dst_rows=np.array([]), ), - is_async_enabled=self.is_async, cuda_device_index=self.cuda_device_index, new_physical_to_logical_map=new_physical_to_logical_map, new_logical_to_physical_map=new_logical_to_physical_map, @@ -630,19 +625,12 @@ class EplbState: if self.is_async: for eplb_model_state in self.model_states.values(): - if not eplb_model_state.is_async_enabled: - continue - all_ranks_buffer_ready = False if eplb_model_state.pending_global_ready_check: all_ranks_buffer_ready = self._all_ranks_buffer_ready( eplb_model_state ) - if ( - eplb_model_state.is_async_enabled - and eplb_model_state.ep_buffer_ready - and all_ranks_buffer_ready - ): + if eplb_model_state.ep_buffer_ready and all_ranks_buffer_ready: self.move_to_workspace( model_state=eplb_model_state, ep_group=ep_group, @@ -664,8 +652,8 @@ class EplbState: ) if self.expert_rearrangement_step >= self.expert_rearrangement_step_interval: - if any( - eplb_model_state.is_async_enabled and eplb_model_state.rebalanced + if self.is_async and any( + eplb_model_state.rebalanced for eplb_model_state in self.model_states.values() ): # Still performing asynchronous rearrangement @@ -822,7 +810,7 @@ class EplbState: eplb_model_state.physical_to_logical_map, ) - if not eplb_model_state.is_async_enabled or is_profile: + if not self.is_async or is_profile: # Update expert weights rearrange_expert_weights_inplace( eplb_model_state.physical_to_logical_map,