#!/usr/bin/env python3 """ Test script for M3DB read/write functionality. Usage: python3 test-metrics.py """ import sys import time import random import requests def main(): if len(sys.argv) < 2: print("Usage: python3 test-metrics.py ") print("Example: python3 test-metrics.py 192.168.1.100") sys.exit(1) host = sys.argv[1] base_url = f"http://{host}:7201" # Generate unique metric name with timestamp to avoid conflicts ts = int(time.time()) metric_name = f"m3db_test_metric_{ts}" metric_value = random.randint(1, 1000) print(f"=== M3DB Metrics Test ===") print(f"Host: {host}") print(f"Metric: {metric_name}") print(f"Value: {metric_value}") print() # Write test metric using Prometheus remote write format print("=== Writing metric ===") write_url = f"{base_url}/api/v1/prom/remote/write" # Prometheus remote write uses snappy-compressed protobuf # For simplicity, we'll use the M3DB native write endpoint # which accepts a simpler JSON format # Alternative: use the /api/v1/prom/remote/write with proper protobuf # but that requires prometheus_remote_write protobuf definition # Let's use the query endpoint to verify coordinator is up first # Check coordinator health health_url = f"{base_url}/api/v1/services/m3db/health" try: resp = requests.get(health_url, timeout=10) if resp.status_code == 200: print(f"✓ Coordinator healthy") else: print(f"✗ Coordinator unhealthy: {resp.status_code}") sys.exit(1) except requests.exceptions.RequestException as e: print(f"✗ Failed to connect: {e}") sys.exit(1) # Write metric using simple HTTP write (M3DB native format) # Prometheus remote_write requires protobuf, so we'll write # a test metric using a simple approach via the M3 coordinator # For a proper test, we'll use the remote_write protobuf format # But that's complex, so let's just verify read/write works # by checking the cluster is ready and querying existing data # Check placement placement_url = f"{base_url}/api/v1/services/m3db/placement" try: resp = requests.get(placement_url, timeout=10) if resp.status_code == 200: placement = resp.json() instances = placement.get("placement", {}).get("instances", {}) print(f"✓ Placement configured: {len(instances)} instances") for inst_id, inst in instances.items(): print(f" - {inst_id}: {inst.get('endpoint', 'unknown')}") else: print(f"✗ Placement not ready: {resp.status_code}") print(f" Response: {resp.text}") except requests.exceptions.RequestException as e: print(f"✗ Failed to get placement: {e}") # Check namespaces namespace_url = f"{base_url}/api/v1/services/m3db/namespace" try: resp = requests.get(namespace_url, timeout=10) if resp.status_code == 200: ns_data = resp.json() namespaces = ns_data.get("namespaces", {}) print(f"✓ Namespaces configured: {len(namespaces)}") for ns_name, ns_meta in namespaces.items(): print(f" - {ns_name}") else: print(f"✗ Namespaces not ready: {resp.status_code}") except requests.exceptions.RequestException as e: print(f"✗ Failed to get namespaces: {e}") # Query test (even if no data, should return empty result) print() print("=== Query test ===") query_url = f"{base_url}/api/v1/query" try: resp = requests.get(query_url, params={"query": "up"}, timeout=10) if resp.status_code == 200: result = resp.json() status = result.get("status") print(f"✓ Query returned: {status}") data = result.get("data", {}).get("result", []) print(f" Results: {len(data)} series") else: print(f"✗ Query failed: {resp.status_code}") except requests.exceptions.RequestException as e: print(f"✗ Query failed: {e}") # Write test metric using remote write protobuf print() print("=== Write test ===") print("Writing via Prometheus remote_write format...") # Build the remote_write protobuf payload # This is the Prometheus remote_write format import struct import snappy # pip install python-snappy # Prometheus remote_write protobuf (simplified) # message WriteRequest { # repeated prometheus.TimeSeries timeseries = 1; # } # message TimeSeries { # repeated Label labels = 1; # repeated Sample samples = 2; # } # message Label { # string name = 1; # string value = 2; # } # message Sample { # double value = 1; # int64 timestamp_ms = 2; # } # For simplicity, use the raw protobuf encoding # We'll construct a minimal WriteRequest def encode_string(field_num, s): """Encode a string field in protobuf""" data = s.encode('utf-8') tag = (field_num << 3) | 2 # wire type 2 = length-delimited return bytes([tag]) + encode_varint(len(data)) + data def encode_varint(n): """Encode a varint""" result = [] while n > 127: result.append((n & 0x7F) | 0x80) n >>= 7 result.append(n) return bytes(result) def encode_double(field_num, value): """Encode a double field in protobuf""" tag = (field_num << 3) | 1 # wire type 1 = 64-bit return bytes([tag]) + struct.pack('