diff --git a/vllm/executor/ray_distributed_executor.py b/vllm/executor/ray_distributed_executor.py index cf834fdca..673d0fc5d 100644 --- a/vllm/executor/ray_distributed_executor.py +++ b/vllm/executor/ray_distributed_executor.py @@ -528,10 +528,18 @@ class RayDistributedExecutor(DistributedExecutorBase): envs.VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM) with InputNode() as input_data: # Example DAG: PP=2, TP=4 - # (ExecuteModelReq, None) -> 0 -> (ExecuteModelReq, IntermediateOutput) -> 4 -> SamplerOutput # noqa: E501 - # -> 1 -> (ExecuteModelReq, IntermediateOutput) -> 5 -> SamplerOutput # noqa: E501 - # -> 2 -> (ExecuteModelReq, IntermediateOutput) -> 6 -> SamplerOutput # noqa: E501 - # -> 3 -> (ExecuteModelReq, IntermediateOutput) -> 7 -> SamplerOutput # noqa: E501 + # + # For V0: + # ExecuteModelRequest -> 0 -> (ExecuteModelReq, IntermediateTensors) -> 4 -> SamplerOutput # noqa: E501 + # ExecuteModelRequest -> 1 -> (ExecuteModelReq, IntermediateTensors) -> 5 -> SamplerOutput # noqa: E501 + # ExecuteModelRequest -> 2 -> (ExecuteModelReq, IntermediateTensors) -> 6 -> SamplerOutput # noqa: E501 + # ExecuteModelRequest -> 3 -> (ExecuteModelReq, IntermediateTensors) -> 7 -> SamplerOutput # noqa: E501 + # + # For V1: + # SchedulerOutput -> 0 -> (SchedulerOutput, IntermediateTensors) -> 4 -> ModelRunnerOutput # noqa: E501 + # SchedulerOutput -> 1 -> (SchedulerOutput, IntermediateTensors) -> 5 -> ModelRunnerOutput # noqa: E501 + # SchedulerOutput -> 2 -> (SchedulerOutput, IntermediateTensors) -> 6 -> ModelRunnerOutput # noqa: E501 + # SchedulerOutput -> 3 -> (SchedulerOutput, IntermediateTensors) -> 7 -> ModelRunnerOutput # noqa: E501 # All workers in the first TP group will take in the # ExecuteModelRequest as input. diff --git a/vllm/executor/ray_utils.py b/vllm/executor/ray_utils.py index 7104004fc..a9661fe0e 100644 --- a/vllm/executor/ray_utils.py +++ b/vllm/executor/ray_utils.py @@ -114,8 +114,11 @@ try: def execute_model_ray( self, - scheduler_output: "SchedulerOutput", - ) -> "ModelRunnerOutput": + scheduler_output: Union["SchedulerOutput", + Tuple["SchedulerOutput", + "IntermediateTensors"]], + ) -> Union["ModelRunnerOutput", Tuple["SchedulerOutput", + "IntermediateTensors"]]: # this method is used to compile ray CG, # and it needs a special logic of self.setup_device_if_necessary() self.setup_device_if_necessary()