"""Test script to verify the full Redis → Gateway → Satellite → Prometheus MCP flow."""
import asyncio
import json
import uuid
import redis.asyncio as aioredis
async def test_list_labels():
"""Test list_labels tool through Redis."""
redis_url = "redis://localhost:6379" # Will be port-forwarded
redis_client = aioredis.from_url(redis_url, decode_responses=True)
team_id = "test-team"
request_channel = f"sat:req:{team_id}"
request_id = str(uuid.uuid4())
response_channel = f"sat:resp:{request_id}"
print(f"\n{'='*60}")
print(f"Testing list_labels")
print(f"{'='*60}")
print(f"Request ID: {request_id}")
print(f"Request Channel: {request_channel}")
print(f"Response Channel: {response_channel}")
# Subscribe to response channel FIRST
pubsub = redis_client.pubsub()
await pubsub.subscribe(response_channel)
print(f"Subscribed to {response_channel}")
# Build MCP tool call request for list_labels
request = {
"request_id": request_id,
"type": "mcp_tool_call",
"mcp_name": "prometheusMCP",
"tool_name": "list_labels",
"arguments": {},
"investigation_id": "test-investigation",
}
print(f"\nSending request: {json.dumps(request, indent=2)}")
# Publish request
await redis_client.publish(request_channel, json.dumps(request))
print(f"Published to {request_channel}")
# Wait for response with timeout
print("\nWaiting for response...")
try:
async for message in pubsub.listen():
if message["type"] == "message":
response = json.loads(message["data"])
print(f"\n✅ RESPONSE RECEIVED:")
if response.get("success"):
mcp_response = response.get("response", {}).get("mcp_response", {})
result_json = mcp_response.get("result_json", "{}")
result = json.loads(result_json)
print(f"Success: {mcp_response.get('success')}")
print(f"Duration: {mcp_response.get('duration_ms')}ms")
labels = result.get("labels", [])
print(f"Labels found: {len(labels)}")
if labels:
print(f"Sample labels: {labels[:10]}...")
else:
print(f"Error: {response.get('error')}")
break
elif message["type"] == "subscribe":
continue
except asyncio.TimeoutError:
print("❌ TIMEOUT: No response received")
# Cleanup
await pubsub.unsubscribe(response_channel)
await pubsub.close()
await redis_client.close()
async def test_list_metrics():
"""Test list_metrics tool through Redis."""
redis_url = "redis://localhost:6379"
redis_client = aioredis.from_url(redis_url, decode_responses=True)
team_id = "test-team"
request_channel = f"sat:req:{team_id}"
request_id = str(uuid.uuid4())
response_channel = f"sat:resp:{request_id}"
print(f"\n{'='*60}")
print(f"Testing list_metrics")
print(f"{'='*60}")
print(f"Request ID: {request_id}")
pubsub = redis_client.pubsub()
await pubsub.subscribe(response_channel)
request = {
"request_id": request_id,
"type": "mcp_tool_call",
"mcp_name": "prometheusMCP",
"tool_name": "list_metrics",
"arguments": {
"with_metadata": False
},
"investigation_id": "test-investigation",
}
print(f"Sending: {json.dumps(request, indent=2)}")
await redis_client.publish(request_channel, json.dumps(request))
print("Waiting for response...")
try:
async for message in pubsub.listen():
if message["type"] == "message":
response = json.loads(message["data"])
print(f"\n✅ RESPONSE:")
if response.get("success"):
mcp_response = response.get("response", {}).get("mcp_response", {})
result_json = mcp_response.get("result_json", "{}")
result = json.loads(result_json)
print(f"Success: {mcp_response.get('success')}")
print(f"Duration: {mcp_response.get('duration_ms')}ms")
metrics = result.get("metrics", [])
print(f"Metrics found: {len(metrics)}")
if metrics:
print(f"Sample metrics: {metrics[:5]}...")
else:
print(f"Error: {response.get('error')}")
break
elif message["type"] == "subscribe":
continue
except asyncio.TimeoutError:
print("❌ TIMEOUT")
await pubsub.unsubscribe(response_channel)
await pubsub.close()
await redis_client.close()
async def test_query_instant():
"""Test query_instant tool through Redis."""
redis_url = "redis://localhost:6379"
redis_client = aioredis.from_url(redis_url, decode_responses=True)
team_id = "test-team"
request_channel = f"sat:req:{team_id}"
request_id = str(uuid.uuid4())
response_channel = f"sat:resp:{request_id}"
print(f"\n{'='*60}")
print(f"Testing query_instant (query='up')")
print(f"{'='*60}")
print(f"Request ID: {request_id}")
pubsub = redis_client.pubsub()
await pubsub.subscribe(response_channel)
request = {
"request_id": request_id,
"type": "mcp_tool_call",
"mcp_name": "prometheusMCP",
"tool_name": "query_instant",
"arguments": {
"query": "up"
},
"investigation_id": "test-investigation",
}
print(f"Sending: {json.dumps(request, indent=2)}")
await redis_client.publish(request_channel, json.dumps(request))
print("Waiting for response...")
try:
async for message in pubsub.listen():
if message["type"] == "message":
response = json.loads(message["data"])
print(f"\n✅ RESPONSE:")
if response.get("success"):
mcp_response = response.get("response", {}).get("mcp_response", {})
result_json = mcp_response.get("result_json", "{}")
result = json.loads(result_json)
print(f"Success: {mcp_response.get('success')}")
print(f"Duration: {mcp_response.get('duration_ms')}ms")
print(f"Status: {result.get('status')}")
print(f"Result type: {result.get('result_type')}")
series = result.get("series", [])
print(f"Series count: {len(series)}")
if series:
# Show first series
first = series[0]
print(f"Sample series: {first.get('metric', {})} = {first.get('value')}")
else:
print(f"Error: {response.get('error')}")
break
elif message["type"] == "subscribe":
continue
except asyncio.TimeoutError:
print("❌ TIMEOUT")
await pubsub.unsubscribe(response_channel)
await pubsub.close()
await redis_client.close()
async def test_get_label_values():
"""Test get_label_values tool through Redis."""
redis_url = "redis://localhost:6379"
redis_client = aioredis.from_url(redis_url, decode_responses=True)
team_id = "test-team"
request_channel = f"sat:req:{team_id}"
request_id = str(uuid.uuid4())
response_channel = f"sat:resp:{request_id}"
print(f"\n{'='*60}")
print(f"Testing get_label_values (label_name='job')")
print(f"{'='*60}")
print(f"Request ID: {request_id}")
pubsub = redis_client.pubsub()
await pubsub.subscribe(response_channel)
request = {
"request_id": request_id,
"type": "mcp_tool_call",
"mcp_name": "prometheusMCP",
"tool_name": "get_label_values",
"arguments": {
"label_name": "job"
},
"investigation_id": "test-investigation",
}
print(f"Sending: {json.dumps(request, indent=2)}")
await redis_client.publish(request_channel, json.dumps(request))
print("Waiting for response...")
try:
async for message in pubsub.listen():
if message["type"] == "message":
response = json.loads(message["data"])
print(f"\n✅ RESPONSE:")
if response.get("success"):
mcp_response = response.get("response", {}).get("mcp_response", {})
result_json = mcp_response.get("result_json", "{}")
result = json.loads(result_json)
print(f"Success: {mcp_response.get('success')}")
print(f"Duration: {mcp_response.get('duration_ms')}ms")
print(f"Label: {result.get('label')}")
values = result.get("values", [])
print(f"Values count: {len(values)}")
if values:
print(f"Values: {values[:10]}...")
else:
print(f"Error: {response.get('error')}")
break
elif message["type"] == "subscribe":
continue
except asyncio.TimeoutError:
print("❌ TIMEOUT")
await pubsub.unsubscribe(response_channel)
await pubsub.close()
await redis_client.close()
async def main():
"""Run all tests."""
print("=" * 60)
print("FULL FLOW TEST: Redis → Gateway → Satellite → Prometheus MCP")
print("=" * 60)
print("\nPrerequisites:")
print(" 1. kubectl port-forward svc/redis 6379:6379 -n deeptrace")
print(" 2. Satellite running with prometheus-mcp enabled")
print(" 3. prometheus-mcp deployed and healthy")
await test_list_labels()
await test_list_metrics()
await test_query_instant()
await test_get_label_values()
print("\n" + "=" * 60)
print("ALL TESTS COMPLETE")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())