Files
m3db-vke-setup/victoriametrics/backfill.py
biondizzle bf6d62b9a8 Add VictoriaMetrics for historical metrics (Mar 13+)
- Single-node VM deployment with 200Gi NVMe, 2y retention
- Traefik IngressRoute at vm.vultrlabs.dev (TLS + basic auth)
- Backfill script: pulls vLLM/DCGM metrics from Mimir, writes to VM
- Retain StorageClass so historical data survives PVC deletion
- README with deployment + Grafana mixed-datasource instructions
2026-04-09 19:29:18 +00:00

212 lines
7.6 KiB
Python

#!/usr/bin/env python3
"""
Backfill historical metrics from Mimir to VictoriaMetrics.
Uses VictoriaMetrics /api/v1/import endpoint which happily accepts
data with any timestamp — no bufferPast gates, no block size hacks.
Usage:
# Run in-cluster (as a pod, see backfill-pod.yaml)
python3 backfill.py
# Or locally with port-forward
kubectl port-forward -n victoriametrics svc/victoriametrics 8428:8428
VM_URL=http://localhost:8428 python3 backfill.py
"""
import urllib.request
import urllib.error
import urllib.parse
import json
import ssl
import os
import time
import base64
import sys
# ── Configuration ──────────────────────────────────────────────────
MIMIR_URL = os.environ.get("MIMIR_URL", "https://metrics.vultrlabs.com/prometheus")
MIMIR_USER = os.environ.get("MIMIR_USERNAME", "REPLACE_WITH_MIMIR_USERNAME")
MIMIR_PASS = os.environ.get("MIMIR_PASSWORD", "REPLACE_WITH_MIMIR_PASSWORD")
VM_URL = os.environ.get("VM_URL", "http://victoriametrics.victoriametrics.svc.cluster.local:8428")
# Time range: March 13, 2026 00:00:00 UTC → now
START_TS = int(os.environ.get("START_TS", "1773360000")) # 2026-03-13T00:00:00Z
END_TS = int(os.environ.get("END_TS", str(int(time.time()))))
STEP = os.environ.get("STEP", "10s")
CHUNK_HOURS = int(os.environ.get("CHUNK_HOURS", "6"))
# Metrics to backfill
METRICS = [
"vllm:prompt_tokens_total",
"vllm:generation_tokens_total",
"DCGM_FI_DEV_GPU_UTIL",
]
# Extra labels to add to all imported data (e.g. tenant/cluster context)
EXTRA_LABELS = {
"tenant": "serverless-inference-cluster",
"cluster": "serverless-inference-cluster",
}
# ── Helpers ────────────────────────────────────────────────────────
def ssl_ctx():
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
def mimir_query(path):
"""Query Mimir API with basic auth."""
auth = base64.b64encode(f"{MIMIR_USER}:{MIMIR_PASS}".encode()).decode()
req = urllib.request.Request(f"{MIMIR_URL}{path}")
req.add_header("Authorization", f"Basic {auth}")
resp = urllib.request.urlopen(req, context=ssl_ctx(), timeout=300)
return json.loads(resp.read().decode())
def vm_import(lines):
"""Push data to VictoriaMetrics /api/v1/import."""
data = "\n".join(lines).encode("utf-8")
req = urllib.request.Request(
f"{VM_URL}/api/v1/import",
data=data,
method="POST",
)
req.add_header("Content-Type", "application/octet-stream")
try:
resp = urllib.request.urlopen(req, timeout=300)
return True
except urllib.error.HTTPError as e:
body = e.read().decode()[:200]
print(f" VM import ERROR {e.code}: {body}", flush=True)
return False
def format_prom_metric_name(raw_name):
"""Convert Mimir metric name to valid Prometheus metric name for VM.
VictoriaMetrics import format uses: metric_name{label1="val1",...} timestamp value
Colons in metric names are valid in Prometheus but we keep them as-is since
VM handles them fine.
"""
return raw_name
# ── Main ───────────────────────────────────────────────────────────
print(f"VictoriaMetrics Backfill", flush=True)
print(f"========================", flush=True)
print(f"Source: {MIMIR_URL}", flush=True)
print(f"Target: {VM_URL}", flush=True)
print(f"Range: {START_TS}{END_TS} ({CHUNK_HOURS}h chunks)", flush=True)
print(f"Metrics: {', '.join(METRICS)}", flush=True)
print(f"Extra labels: {EXTRA_LABELS}", flush=True)
print(flush=True)
total_samples = 0
total_errors = 0
for metric in METRICS:
print(f"\n{'='*60}", flush=True)
print(f"Metric: {metric}", flush=True)
print(f"{'='*60}", flush=True)
metric_samples = 0
chunk_start = START_TS
while chunk_start < END_TS:
chunk_end = min(chunk_start + CHUNK_HOURS * 3600, END_TS)
chunk_label = f"[{time.strftime('%Y-%m-%d %H:%M', time.gmtime(chunk_start))}{time.strftime('%Y-%m-%d %H:%M', time.gmtime(chunk_end))}]"
print(f" {chunk_label} ...", end="", flush=True)
try:
path = (
f"/api/v1/query_range?"
f"query={urllib.parse.quote(metric)}"
f"&start={chunk_start}&end={chunk_end}&step={STEP}"
)
data = mimir_query(path)
if data.get("status") != "success":
print(f" Mimir returned status={data.get('status')}", flush=True)
chunk_start = chunk_end
continue
series_list = data["data"]["result"]
if not series_list:
print(f" no data", flush=True)
chunk_start = chunk_end
continue
# Build import lines in VictoriaMetrics native format
# Format: metric_name{label1="val1",label2="val2"} timestamp value
import_lines = []
chunk_count = 0
for series in series_list:
labels = dict(series["metric"])
# Remove __name__ from labels (it's the metric name)
metric_name = labels.pop("__name__", metric)
# Add extra labels
labels.update(EXTRA_LABELS)
# Build label string
label_parts = [f'{k}="{v}"' for k, v in sorted(labels.items())]
label_str = ",".join(label_parts)
# Build import lines: one per sample
for ts_str, val_str in series["values"]:
# Convert timestamp (seconds) to ms for VM
ts_ms = int(float(ts_str) * 1000)
try:
val = float(val_str)
except (ValueError, TypeError):
# Handle +Inf, -Inf, NaN
if val_str == "+Inf":
val = float("inf")
elif val_str == "-Inf":
val = float("-inf")
else:
continue
import_lines.append(f'{metric_name}{{{label_str}}} {ts_ms} {val_str}')
chunk_count += 1
if import_lines:
ok = vm_import(import_lines)
if ok:
print(f" {chunk_count} samples imported", flush=True)
metric_samples += chunk_count
else:
print(f" IMPORT FAILED ({chunk_count} samples lost)", flush=True)
total_errors += chunk_count
else:
print(f" 0 samples", flush=True)
except Exception as e:
print(f" ERROR: {e}", flush=True)
total_errors += 1
chunk_start = chunk_end
print(f" Total for {metric}: {metric_samples} samples", flush=True)
total_samples += metric_samples
print(f"\n{'='*60}", flush=True)
print(f"BACKFILL COMPLETE", flush=True)
print(f"Total samples imported: {total_samples}", flush=True)
print(f"Total errors: {total_errors}", flush=True)
print(f"{'='*60}", flush=True)
# Verify by querying VM
print(f"\nVerifying import...", flush=True)
try:
verify_path = f"/api/v1/query?query={urllib.parse.quote('count(up)')}"
req = urllib.request.Request(f"{VM_URL}{verify_path}")
resp = urllib.request.urlopen(req, timeout=30)
print(f"VM is responding to queries ✓", flush=True)
except Exception as e:
print(f"VM query check failed: {e}", flush=True)