125 lines
4.0 KiB
Python
125 lines
4.0 KiB
Python
#!/usr/bin/env python3
|
|
"""M3DB Backfill - Pull vLLM/DCGM metrics from Mimir and write to M3DB"""
|
|
import struct
|
|
import urllib.request
|
|
import urllib.error
|
|
import urllib.parse
|
|
import json
|
|
import ssl
|
|
import snappy
|
|
import base64
|
|
import sys
|
|
|
|
print("Starting backfill script...", flush=True)
|
|
|
|
# Read credentials from environment (see .env)
|
|
import os
|
|
MIMIR_URL = "https://metrics.vultrlabs.com/prometheus"
|
|
MIMIR_USER = os.environ.get("MIMIR_USERNAME", "REPLACE_WITH_MIMIR_USERNAME")
|
|
MIMIR_PASS = os.environ.get("MIMIR_PASSWORD", "REPLACE_WITH_MIMIR_PASSWORD")
|
|
M3DB_URL = "http://m3coordinator.m3db.svc.cluster.local:7201"
|
|
|
|
START_TS = 1773187200 # 2026-03-11T00:00:00Z
|
|
END_TS = 1775040000 # 2026-04-01T11:40:00Z (just before node restart)
|
|
STEP = "10s"
|
|
CHUNK_HOURS = 6
|
|
|
|
METRICS = [
|
|
"vllm:prompt_tokens_total",
|
|
"vllm:generation_tokens_total",
|
|
"DCGM_FI_DEV_GPU_UTIL",
|
|
]
|
|
|
|
def enc(v):
|
|
b = v & 0x7f
|
|
v >>= 7
|
|
r = b""
|
|
while v:
|
|
r += bytes([0x80 | b])
|
|
b = v & 0x7f
|
|
v >>= 7
|
|
return r + bytes([b])
|
|
|
|
def es(f, d): return enc((f<<3)|2) + enc(len(d)) + d
|
|
def ed(f, v): return enc((f<<3)|1) + struct.pack("<d", v)
|
|
|
|
def build_ts(labels, samples):
|
|
ts = b""
|
|
for n, v in labels.items():
|
|
l = es(1, n.encode()) + es(2, v.encode())
|
|
ts += enc((1<<3)|2) + enc(len(l)) + l
|
|
for t_ms, val in samples:
|
|
s = ed(1, val) + enc((2<<3)|0) + enc(t_ms)
|
|
ts += enc((2<<3)|2) + enc(len(s)) + s
|
|
return ts
|
|
|
|
def ssl_ctx():
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
return ctx
|
|
|
|
def mimir_req(path):
|
|
auth = base64.b64encode(f"{MIMIR_USER}:{MIMIR_PASS}".encode()).decode()
|
|
req = urllib.request.Request(f"{MIMIR_URL}{path}")
|
|
req.add_header("Authorization", f"Basic {auth}")
|
|
resp = urllib.request.urlopen(req, context=ssl_ctx(), timeout=300)
|
|
return json.loads(resp.read().decode())
|
|
|
|
def write_m3db(data):
|
|
c = snappy.compress(data)
|
|
req = urllib.request.Request(f"{M3DB_URL}/api/v1/prom/remote/write", c, method="POST")
|
|
req.add_header("Content-Type", "application/x-protobuf")
|
|
req.add_header("X-Prometheus-Remote-Write-Version", "0.1.0")
|
|
req.add_header("Content-Encoding", "snappy")
|
|
try:
|
|
resp = urllib.request.urlopen(req, timeout=300)
|
|
return True
|
|
except urllib.error.HTTPError as e:
|
|
print(f" ERROR {e.code}: {e.read().decode()[:100]}", flush=True)
|
|
return False
|
|
|
|
print(f"Time range: {START_TS} to {END_TS}", flush=True)
|
|
total = 0
|
|
|
|
for metric in METRICS:
|
|
print(f"\n{metric}...", flush=True)
|
|
metric_total = 0
|
|
chunk_start = START_TS
|
|
chunks_done = 0
|
|
|
|
while chunk_start < END_TS:
|
|
chunk_end = min(chunk_start + CHUNK_HOURS * 3600, END_TS)
|
|
try:
|
|
path = f"/api/v1/query_range?query={urllib.parse.quote(metric)}&start={chunk_start}&end={chunk_end}&step={STEP}"
|
|
data = mimir_req(path)
|
|
if data["status"] != "success":
|
|
chunk_start = chunk_end
|
|
continue
|
|
|
|
series = data["data"]["result"]
|
|
samples = sum(len(s["values"]) for s in series)
|
|
if samples > 0:
|
|
wr = b""
|
|
for s in series:
|
|
labels = dict(s["metric"])
|
|
labels["cluster"] = "serverless-inference-cluster"
|
|
pts = [(int(float(v[0])*1000), float(v[1])) for v in s["values"]]
|
|
ts = build_ts(labels, pts)
|
|
wr += enc((1<<3)|2) + enc(len(ts)) + ts
|
|
if write_m3db(wr):
|
|
metric_total += samples
|
|
chunks_done += 1
|
|
if chunks_done % 10 == 0:
|
|
print(f" {chunks_done} chunks, {metric_total} samples...", flush=True)
|
|
|
|
except Exception as e:
|
|
print(f" Chunk error: {e}", flush=True)
|
|
|
|
chunk_start = chunk_end
|
|
|
|
print(f" Done: {metric_total} samples", flush=True)
|
|
total += metric_total
|
|
|
|
print(f"\nBackfill complete! Total: {total} samples", flush=True)
|