#!/usr/bin/env python3
"""
Test script for SSE MCP connection.
This script tests connecting to the Looker MCP server in SSE mode.
"""
import json
import requests
import sseclient
import time
def main():
"""Test the SSE MCP connection."""
print("Testing SSE MCP connection...")
# Server base URL
base_url = "http://localhost:8000"
# First, check if the server is up
try:
health_check = requests.get(f"{base_url}/health")
print(f"Health check status: {health_check.status_code}")
except Exception as e:
print(f"Server health check failed: {str(e)}")
# Initialize request - this should be sent as JSON data
init_request = {
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "0.2.0",
"clientInfo": {
"name": "test-sse-client"
}
},
"id": 1
}
# SSE endpoint for events
sse_url = f"{base_url}/jsonrpc/sse"
try:
# First send initialization request
print(f"Sending initialization request to {sse_url}")
response = requests.post(
sse_url,
json=init_request,
stream=True,
headers={
"Accept": "text/event-stream",
"Content-Type": "application/json"
}
)
# Check if connection was successful
if response.status_code != 200:
print(f"❌ Failed to connect: HTTP {response.status_code}")
print(response.text)
return 1
print("Connection established, waiting for events...")
# Create SSE client
client = sseclient.SSEClient(response)
# Set a timeout
timeout_seconds = 10
start_time = time.time()
# Process events
for event in client.events():
print(f"Received event: {event.data}")
try:
data = json.loads(event.data)
if "result" in data:
print("\n✅ Successfully connected to SSE MCP server!")
print(f"Result: {json.dumps(data['result'], indent=2)}")
return 0
elif "error" in data:
print(f"\n❌ Received error: {data['error']}")
return 1
except json.JSONDecodeError:
print(f"\n❌ Received invalid JSON: {event.data}")
return 1
# Check timeout
if time.time() - start_time > timeout_seconds:
print(f"\n❌ Timeout after {timeout_seconds} seconds waiting for response")
return 1
print("\n❌ No events received")
return 1
except Exception as e:
print(f"❌ Error connecting to SSE server: {str(e)}")
return 1
if __name__ == "__main__":
import sys
sys.exit(main())