[Refactor] Remove numpy split in async scheduling (#32034)

Signed-off-by: yewentao256 <zhyanwentao@126.com>
This commit is contained in:
Wentao Ye
2026-01-09 14:09:02 -05:00
committed by GitHub
parent f32c629eb4
commit 28ae32a5d3

View File

@@ -9,7 +9,6 @@ from collections.abc import AsyncGenerator, Iterable, Mapping
from copy import copy from copy import copy
from typing import Any, cast from typing import Any, cast
import numpy as np
import torch import torch
import vllm.envs as envs import vllm.envs as envs
@@ -32,7 +31,6 @@ from vllm.transformers_utils.config import maybe_register_config_serialize_by_va
from vllm.usage.usage_lib import UsageContext from vllm.usage.usage_lib import UsageContext
from vllm.utils.async_utils import cancel_task_threadsafe from vllm.utils.async_utils import cancel_task_threadsafe
from vllm.utils.collection_utils import as_list from vllm.utils.collection_utils import as_list
from vllm.utils.math_utils import cdiv
from vllm.v1.engine import EngineCoreRequest from vllm.v1.engine import EngineCoreRequest
from vllm.v1.engine.core_client import EngineCoreClient from vllm.v1.engine.core_client import EngineCoreClient
from vllm.v1.engine.exceptions import EngineDeadError, EngineGenerateError from vllm.v1.engine.exceptions import EngineDeadError, EngineGenerateError
@@ -495,6 +493,7 @@ class AsyncLLM(EngineClient):
log_stats = self.log_stats log_stats = self.log_stats
logger_manager = self.logger_manager logger_manager = self.logger_manager
input_processor = self.input_processor input_processor = self.input_processor
chunk_size = envs.VLLM_V1_OUTPUT_PROC_CHUNK_SIZE
async def output_handler(): async def output_handler():
try: try:
@@ -510,15 +509,10 @@ class AsyncLLM(EngineClient):
# Split outputs into chunks of at most # Split outputs into chunks of at most
# VLLM_V1_OUTPUT_PROC_CHUNK_SIZE, so that we don't block the # VLLM_V1_OUTPUT_PROC_CHUNK_SIZE, so that we don't block the
# event loop for too long. # event loop for too long.
if num_outputs <= envs.VLLM_V1_OUTPUT_PROC_CHUNK_SIZE: engine_core_outputs = outputs.outputs
slices = (outputs.outputs,) for start in range(0, num_outputs, chunk_size):
else: end = start + chunk_size
slices = np.array_split( outputs_slice = engine_core_outputs[start:end]
outputs.outputs,
cdiv(num_outputs, envs.VLLM_V1_OUTPUT_PROC_CHUNK_SIZE),
)
for i, outputs_slice in enumerate(slices):
# 2) Process EngineCoreOutputs. # 2) Process EngineCoreOutputs.
processed_outputs = output_processor.process_outputs( processed_outputs = output_processor.process_outputs(
outputs_slice, outputs.timestamp, iteration_stats outputs_slice, outputs.timestamp, iteration_stats
@@ -527,7 +521,7 @@ class AsyncLLM(EngineClient):
assert not processed_outputs.request_outputs assert not processed_outputs.request_outputs
# Allow other asyncio tasks to run between chunks # Allow other asyncio tasks to run between chunks
if i + 1 < len(slices): if end < num_outputs:
await asyncio.sleep(0) await asyncio.sleep(0)
# 3) Abort any reqs that finished due to stop strings. # 3) Abort any reqs that finished due to stop strings.