[V1][Core][1/n] Logging and Metrics (#11962)

Signed-off-by: rshaw@neuralmagic.com <rshaw@neuralmagic.com>
This commit is contained in:
Robert Shaw
2025-01-12 16:02:02 -05:00
committed by GitHub
parent 263a870ee1
commit 9597a095f2
11 changed files with 129 additions and 84 deletions

View File

@@ -17,9 +17,9 @@ from vllm.transformers_utils.config import (
maybe_register_config_serialize_by_value)
from vllm.utils import get_exception_traceback, zmq_socket_ctx
from vllm.v1.core.scheduler import Scheduler
from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs,
EngineCoreProfile, EngineCoreRequest,
EngineCoreRequestType, EngineCoreRequestUnion)
from vllm.v1.engine import (EngineCoreOutputs, EngineCoreProfile,
EngineCoreRequest, EngineCoreRequestType,
EngineCoreRequestUnion)
from vllm.v1.engine.mm_input_mapper import MMInputMapperServer
from vllm.v1.executor.abstract import Executor
from vllm.v1.request import Request, RequestStatus
@@ -28,9 +28,7 @@ from vllm.version import __version__ as VLLM_VERSION
logger = init_logger(__name__)
POLLING_TIMEOUT_MS = 5000
POLLING_TIMEOUT_S = POLLING_TIMEOUT_MS // 1000
LOGGING_TIME_S = 5
POLLING_TIMEOUT_S = 2.5
class EngineCore:
@@ -40,10 +38,8 @@ class EngineCore:
self,
vllm_config: VllmConfig,
executor_class: Type[Executor],
log_stats: bool = False,
):
assert vllm_config.model_config.runner_type != "pooling"
self.log_stats = log_stats
logger.info("Initializing an LLM engine (v%s) with config: %s",
VLLM_VERSION, vllm_config)
@@ -62,8 +58,6 @@ class EngineCore:
vllm_config.cache_config,
vllm_config.lora_config)
self._last_logging_time = time.time()
self.mm_input_mapper_server = MMInputMapperServer(
vllm_config.model_config)
@@ -114,11 +108,12 @@ class EngineCore:
self.scheduler.finish_requests(request_ids,
RequestStatus.FINISHED_ABORTED)
def step(self) -> List[EngineCoreOutput]:
def step(self) -> EngineCoreOutputs:
"""Schedule, execute, and make output."""
if not self.scheduler.has_unfinished_requests():
return []
return EngineCoreOutputs(
outputs=[], scheduler_stats=self.scheduler.make_stats())
scheduler_output = self.scheduler.schedule()
output = self.model_executor.execute_model(scheduler_output)
@@ -145,7 +140,9 @@ class EngineCoreProc(EngineCore):
executor_class: Type[Executor],
log_stats: bool = False,
):
super().__init__(vllm_config, executor_class, log_stats)
super().__init__(vllm_config, executor_class)
self.log_stats = log_stats
# Background Threads and Queues for IO. These enable us to
# overlap ZMQ socket IO with GPU since they release the GIL,
@@ -153,7 +150,7 @@ class EngineCoreProc(EngineCore):
# model forward pass.
# Threads handle Socket <-> Queues and core_busy_loop uses Queue.
self.input_queue: queue.Queue[EngineCoreRequestUnion] = queue.Queue()
self.output_queue: queue.Queue[List[EngineCoreOutput]] = queue.Queue()
self.output_queue: queue.Queue[EngineCoreOutputs] = queue.Queue()
threading.Thread(target=self.process_input_socket,
args=(input_path, ),
daemon=True).start()
@@ -217,8 +214,10 @@ class EngineCoreProc(EngineCore):
self._handle_client_request(req)
break
except queue.Empty:
self._log_stats()
logger.debug("EngineCore busy loop waiting.")
# Break out the loop so we can log_stats in step().
if self.log_stats:
break
except BaseException:
raise
@@ -230,28 +229,9 @@ class EngineCoreProc(EngineCore):
# 3) Step the engine core.
outputs = self.step()
# 4) Put EngineCoreOutputs into the output queue.
# 5) Put EngineCoreOutputs into the output queue.
self.output_queue.put_nowait(outputs)
self._log_stats()
def _log_stats(self):
"""Log basic stats every LOGGING_TIME_S"""
if not self.log_stats:
return
now = time.time()
if now - self._last_logging_time > LOGGING_TIME_S:
logger.info(
"RUNNING: %s | WAITING: %s",
len(self.scheduler.running),
len(self.scheduler.waiting),
)
self._last_logging_time = now
def _handle_client_request(self, request: EngineCoreRequestUnion) -> None:
"""Handle EngineCoreRequest or EngineCoreABORT from Client."""
@@ -301,7 +281,6 @@ class EngineCoreProc(EngineCore):
with zmq_socket_ctx(output_path, zmq.constants.PUSH) as socket:
while True:
engine_core_outputs = self.output_queue.get()
outputs = EngineCoreOutputs(outputs=engine_core_outputs)
outputs = self.output_queue.get()
encoder.encode_into(outputs, buffer)
socket.send_multipart((buffer, ), copy=False)