diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 7e7e3ca55..e73e7e1fb 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -1930,6 +1930,7 @@ class NixlConnectorWorker: if now < expires: break count = self.consumer_notification_counts_by_req.pop(req_id, 0) + self.xfer_stats.record_kv_expired_req() logger.warning( "Releasing expired KV blocks for request %s which were " "retrieved by %d decode worker(s) within %d seconds.", @@ -2499,13 +2500,14 @@ class NixlKVConnectorStats(KVConnectorStats): def reset(self): # Must be serializable - self.data: dict[str, list[float]] = { + self.data: dict[str, list[float | int]] = { "transfer_duration": [], "post_duration": [], "bytes_transferred": [], "num_descriptors": [], "num_failed_transfers": [], "num_failed_notifications": [], + "num_kv_expired_reqs": [], } def record_transfer(self, res: nixlXferTelemetry): @@ -2517,11 +2519,15 @@ class NixlKVConnectorStats(KVConnectorStats): def record_failed_transfer(self): """Record a failed NIXL transfer operation.""" - self.data["num_failed_transfers"].append(1.0) + self.data["num_failed_transfers"].append(1) def record_failed_notification(self): """Record a failed NIXL notification (send_notif).""" - self.data["num_failed_notifications"].append(1.0) + self.data["num_failed_notifications"].append(1) + + def record_kv_expired_req(self): + """Record a request that had its KV blocks expire.""" + self.data["num_kv_expired_reqs"].append(1) def clone_and_reset(self) -> "NixlKVConnectorStats": old = copy.copy(self) @@ -2529,7 +2535,13 @@ class NixlKVConnectorStats(KVConnectorStats): return old def is_empty(self) -> bool: - return self.num_successful_transfers == 0 + # Do not discard metrics update that are entirely failures related. + return ( + self.num_successful_transfers == 0 + and len(self.data["num_failed_transfers"]) == 0 + and len(self.data["num_failed_notifications"]) == 0 + and len(self.data["num_kv_expired_reqs"]) == 0 + ) def aggregate(self, other: KVConnectorStats) -> KVConnectorStats: if not other.is_empty(): @@ -2541,7 +2553,9 @@ class NixlKVConnectorStats(KVConnectorStats): def reduce(self) -> dict[str, int | float]: # Compute compact representative stats suitable for CLI logging - if self.is_empty(): + if self.num_successful_transfers == 0: + # CLI logging only reports successful transfers stats. If all requests in + # the interval were unsuccessful, Prom will report failures stats instead. return { "Num successful transfers": 0, "Avg xfer time (ms)": 0, @@ -2677,6 +2691,16 @@ class NixlPromMetrics(KVConnectorPromMetrics): counter_nixl_num_failed_notifications ) + counter_nixl_num_kv_expired_reqs = self._counter_cls( + name="vllm:nixl_num_kv_expired_reqs", + documentation="Number of requests that had their KV expire. " + "NOTE: This metric is tracked on the P instance.", + labelnames=labelnames, + ) + self.counter_nixl_num_kv_expired_reqs = self.make_per_engine( + counter_nixl_num_kv_expired_reqs + ) + def observe(self, transfer_stats_data: dict[str, Any], engine_idx: int = 0): for prom_obj, list_item_key in zip( [ @@ -2698,8 +2722,9 @@ class NixlPromMetrics(KVConnectorPromMetrics): [ self.counter_nixl_num_failed_transfers, self.counter_nixl_num_failed_notifications, + self.counter_nixl_num_kv_expired_reqs, ], - ["num_failed_transfers", "num_failed_notifications"], + ["num_failed_transfers", "num_failed_notifications", "num_kv_expired_reqs"], ): for list_item in transfer_stats_data[counter_item_key]: counter_obj[engine_idx].inc(list_item)