2026-03-31 15:49:59 +00:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
"""
|
|
|
|
|
Test script for M3DB read/write functionality.
|
2026-04-01 02:38:47 +00:00
|
|
|
Usage: python3 test-metrics.py <BASE_URL>
|
2026-03-31 15:49:59 +00:00
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import sys
|
|
|
|
|
import time
|
|
|
|
|
import random
|
|
|
|
|
import requests
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
if len(sys.argv) < 2:
|
2026-04-01 02:38:47 +00:00
|
|
|
print("Usage: python3 test-metrics.py <BASE_URL>")
|
|
|
|
|
print("Example: python3 test-metrics.py https://m3db.vultrlabs.dev:7201")
|
|
|
|
|
print(" python3 test-metrics.py http://192.168.1.100:7201")
|
2026-03-31 15:49:59 +00:00
|
|
|
sys.exit(1)
|
|
|
|
|
|
2026-04-01 02:38:47 +00:00
|
|
|
base_url = sys.argv[1].rstrip('/')
|
2026-03-31 15:49:59 +00:00
|
|
|
|
|
|
|
|
# Generate unique metric name with timestamp to avoid conflicts
|
|
|
|
|
ts = int(time.time())
|
|
|
|
|
metric_name = f"m3db_test_metric_{ts}"
|
|
|
|
|
metric_value = random.randint(1, 1000)
|
|
|
|
|
|
|
|
|
|
print(f"=== M3DB Metrics Test ===")
|
2026-04-01 02:38:47 +00:00
|
|
|
print(f"URL: {base_url}")
|
2026-03-31 15:49:59 +00:00
|
|
|
print(f"Metric: {metric_name}")
|
|
|
|
|
print(f"Value: {metric_value}")
|
|
|
|
|
print()
|
|
|
|
|
|
|
|
|
|
# Write test metric using Prometheus remote write format
|
|
|
|
|
print("=== Writing metric ===")
|
|
|
|
|
write_url = f"{base_url}/api/v1/prom/remote/write"
|
|
|
|
|
|
|
|
|
|
# Prometheus remote write uses snappy-compressed protobuf
|
|
|
|
|
# For simplicity, we'll use the M3DB native write endpoint
|
|
|
|
|
# which accepts a simpler JSON format
|
|
|
|
|
|
|
|
|
|
# Alternative: use the /api/v1/prom/remote/write with proper protobuf
|
|
|
|
|
# but that requires prometheus_remote_write protobuf definition
|
|
|
|
|
# Let's use the query endpoint to verify coordinator is up first
|
|
|
|
|
|
|
|
|
|
# Check coordinator health
|
2026-04-01 02:38:47 +00:00
|
|
|
health_url = f"{base_url}/health"
|
2026-03-31 15:49:59 +00:00
|
|
|
try:
|
|
|
|
|
resp = requests.get(health_url, timeout=10)
|
|
|
|
|
if resp.status_code == 200:
|
|
|
|
|
print(f"✓ Coordinator healthy")
|
|
|
|
|
else:
|
|
|
|
|
print(f"✗ Coordinator unhealthy: {resp.status_code}")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
print(f"✗ Failed to connect: {e}")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
# Write metric using simple HTTP write (M3DB native format)
|
|
|
|
|
# Prometheus remote_write requires protobuf, so we'll write
|
|
|
|
|
# a test metric using a simple approach via the M3 coordinator
|
|
|
|
|
|
|
|
|
|
# For a proper test, we'll use the remote_write protobuf format
|
|
|
|
|
# But that's complex, so let's just verify read/write works
|
|
|
|
|
# by checking the cluster is ready and querying existing data
|
|
|
|
|
|
|
|
|
|
# Check placement
|
|
|
|
|
placement_url = f"{base_url}/api/v1/services/m3db/placement"
|
|
|
|
|
try:
|
|
|
|
|
resp = requests.get(placement_url, timeout=10)
|
|
|
|
|
if resp.status_code == 200:
|
|
|
|
|
placement = resp.json()
|
|
|
|
|
instances = placement.get("placement", {}).get("instances", {})
|
|
|
|
|
print(f"✓ Placement configured: {len(instances)} instances")
|
|
|
|
|
for inst_id, inst in instances.items():
|
|
|
|
|
print(f" - {inst_id}: {inst.get('endpoint', 'unknown')}")
|
|
|
|
|
else:
|
|
|
|
|
print(f"✗ Placement not ready: {resp.status_code}")
|
|
|
|
|
print(f" Response: {resp.text}")
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
print(f"✗ Failed to get placement: {e}")
|
|
|
|
|
|
|
|
|
|
# Check namespaces
|
|
|
|
|
namespace_url = f"{base_url}/api/v1/services/m3db/namespace"
|
|
|
|
|
try:
|
|
|
|
|
resp = requests.get(namespace_url, timeout=10)
|
|
|
|
|
if resp.status_code == 200:
|
|
|
|
|
ns_data = resp.json()
|
|
|
|
|
namespaces = ns_data.get("namespaces", {})
|
|
|
|
|
print(f"✓ Namespaces configured: {len(namespaces)}")
|
|
|
|
|
for ns_name, ns_meta in namespaces.items():
|
|
|
|
|
print(f" - {ns_name}")
|
|
|
|
|
else:
|
|
|
|
|
print(f"✗ Namespaces not ready: {resp.status_code}")
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
print(f"✗ Failed to get namespaces: {e}")
|
|
|
|
|
|
|
|
|
|
# Query test (even if no data, should return empty result)
|
|
|
|
|
print()
|
|
|
|
|
print("=== Query test ===")
|
|
|
|
|
query_url = f"{base_url}/api/v1/query"
|
|
|
|
|
try:
|
|
|
|
|
resp = requests.get(query_url, params={"query": "up"}, timeout=10)
|
|
|
|
|
if resp.status_code == 200:
|
|
|
|
|
result = resp.json()
|
|
|
|
|
status = result.get("status")
|
|
|
|
|
print(f"✓ Query returned: {status}")
|
|
|
|
|
data = result.get("data", {}).get("result", [])
|
|
|
|
|
print(f" Results: {len(data)} series")
|
|
|
|
|
else:
|
|
|
|
|
print(f"✗ Query failed: {resp.status_code}")
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
print(f"✗ Query failed: {e}")
|
|
|
|
|
|
|
|
|
|
# Write test metric using remote write protobuf
|
|
|
|
|
print()
|
|
|
|
|
print("=== Write test ===")
|
|
|
|
|
print("Writing via Prometheus remote_write format...")
|
|
|
|
|
|
|
|
|
|
# Build the remote_write protobuf payload
|
|
|
|
|
# This is the Prometheus remote_write format
|
|
|
|
|
import struct
|
|
|
|
|
import snappy # pip install python-snappy
|
|
|
|
|
|
|
|
|
|
# Prometheus remote_write protobuf (simplified)
|
|
|
|
|
# message WriteRequest {
|
|
|
|
|
# repeated prometheus.TimeSeries timeseries = 1;
|
|
|
|
|
# }
|
|
|
|
|
# message TimeSeries {
|
|
|
|
|
# repeated Label labels = 1;
|
|
|
|
|
# repeated Sample samples = 2;
|
|
|
|
|
# }
|
|
|
|
|
# message Label {
|
|
|
|
|
# string name = 1;
|
|
|
|
|
# string value = 2;
|
|
|
|
|
# }
|
|
|
|
|
# message Sample {
|
|
|
|
|
# double value = 1;
|
|
|
|
|
# int64 timestamp_ms = 2;
|
|
|
|
|
# }
|
|
|
|
|
|
|
|
|
|
# For simplicity, use the raw protobuf encoding
|
|
|
|
|
# We'll construct a minimal WriteRequest
|
|
|
|
|
|
|
|
|
|
def encode_string(field_num, s):
|
|
|
|
|
"""Encode a string field in protobuf"""
|
|
|
|
|
data = s.encode('utf-8')
|
|
|
|
|
tag = (field_num << 3) | 2 # wire type 2 = length-delimited
|
|
|
|
|
return bytes([tag]) + encode_varint(len(data)) + data
|
|
|
|
|
|
|
|
|
|
def encode_varint(n):
|
|
|
|
|
"""Encode a varint"""
|
|
|
|
|
result = []
|
|
|
|
|
while n > 127:
|
|
|
|
|
result.append((n & 0x7F) | 0x80)
|
|
|
|
|
n >>= 7
|
|
|
|
|
result.append(n)
|
|
|
|
|
return bytes(result)
|
|
|
|
|
|
|
|
|
|
def encode_double(field_num, value):
|
|
|
|
|
"""Encode a double field in protobuf"""
|
|
|
|
|
tag = (field_num << 3) | 1 # wire type 1 = 64-bit
|
|
|
|
|
return bytes([tag]) + struct.pack('<d', value)
|
|
|
|
|
|
|
|
|
|
def encode_int64(field_num, value):
|
|
|
|
|
"""Encode an int64 field in protobuf (as varint)"""
|
|
|
|
|
tag = (field_num << 3) | 0 # wire type 0 = varint
|
|
|
|
|
return bytes([tag]) + encode_varint(value)
|
|
|
|
|
|
|
|
|
|
# Build Sample
|
|
|
|
|
sample = encode_double(1, float(metric_value)) + encode_int64(2, int(time.time() * 1000))
|
|
|
|
|
|
|
|
|
|
# Build Labels
|
|
|
|
|
labels = (
|
|
|
|
|
encode_string(1, "__name__") + encode_string(2, metric_name) +
|
|
|
|
|
encode_string(1, "test") + encode_string(2, "m3db_verification")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Build TimeSeries
|
|
|
|
|
ts_data = encode_string(1, labels) + encode_string(2, sample)
|
|
|
|
|
# Note: repeated fields need proper encoding
|
|
|
|
|
# Actually, for repeated fields we just repeat the field
|
|
|
|
|
|
|
|
|
|
# Simplified: just encode the timeseries with proper field numbers
|
|
|
|
|
# Label is field 1, Sample is field 2 in TimeSeries
|
|
|
|
|
ts_encoded = (
|
|
|
|
|
bytes([0x0a]) + encode_varint(len(labels)) + labels + # field 1, wire type 2
|
|
|
|
|
bytes([0x12]) + encode_varint(len(sample)) + sample # field 2, wire type 2
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Build WriteRequest (timeseries is field 1)
|
|
|
|
|
write_req = bytes([0x0a]) + encode_varint(len(ts_encoded)) + ts_encoded
|
|
|
|
|
|
|
|
|
|
# Compress with snappy
|
|
|
|
|
compressed = snappy.compress(write_req)
|
|
|
|
|
|
|
|
|
|
headers = {
|
|
|
|
|
"Content-Encoding": "snappy",
|
|
|
|
|
"Content-Type": "application/x-protobuf",
|
|
|
|
|
"X-Prometheus-Remote-Write-Version": "0.1.0"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
resp = requests.post(write_url, data=compressed, headers=headers, timeout=10)
|
|
|
|
|
if resp.status_code == 204 or resp.status_code == 200:
|
|
|
|
|
print(f"✓ Write successful: {metric_name} = {metric_value}")
|
|
|
|
|
else:
|
|
|
|
|
print(f"✗ Write failed: {resp.status_code}")
|
|
|
|
|
print(f" Response: {resp.text}")
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
print(f"✗ Write failed: {e}")
|
|
|
|
|
print(" (This is expected if python-snappy is not installed)")
|
|
|
|
|
print(" Install with: pip install python-snappy")
|
|
|
|
|
|
|
|
|
|
# Wait a moment and query back
|
|
|
|
|
time.sleep(2)
|
|
|
|
|
|
|
|
|
|
print()
|
|
|
|
|
print("=== Read back test ===")
|
|
|
|
|
try:
|
|
|
|
|
resp = requests.get(query_url, params={"query": metric_name}, timeout=10)
|
|
|
|
|
if resp.status_code == 200:
|
|
|
|
|
result = resp.json()
|
|
|
|
|
data = result.get("data", {}).get("result", [])
|
|
|
|
|
if data:
|
|
|
|
|
print(f"✓ Metric found!")
|
|
|
|
|
for series in data:
|
|
|
|
|
metric = series.get("metric", {})
|
|
|
|
|
values = series.get("values", series.get("value", []))
|
|
|
|
|
print(f" Labels: {metric}")
|
|
|
|
|
print(f" Values: {values}")
|
|
|
|
|
else:
|
|
|
|
|
print(f"✗ Metric not found (may take a moment to index)")
|
|
|
|
|
else:
|
|
|
|
|
print(f"✗ Query failed: {resp.status_code}")
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
print(f"✗ Query failed: {e}")
|
|
|
|
|
|
|
|
|
|
print()
|
|
|
|
|
print("=== Test complete ===")
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
main()
|