#!/usr/bin/env python3
"""
SSE Client Example
Demonstrates connecting to MCP-MAF server via SSE.
Useful for testing SSE transport layer.
"""
import asyncio
import httpx
import json
async def main():
"""Main function"""
print("=" * 60)
print(" MCP-MAF SSE Client Example")
print("=" * 60)
print()
server_url = "http://localhost:8000"
print(f"š” Connecting to SSE endpoint: {server_url}/sse")
print("š Press Ctrl+C to stop")
print()
try:
async with httpx.AsyncClient() as client:
async with client.stream('GET', f'{server_url}/sse') as response:
print(f"ā
Connected! Status: {response.status_code}")
print()
async for line in response.aiter_lines():
if line.startswith('data:'):
# Parse SSE data
data_str = line[5:].strip() # Remove 'data:' prefix
try:
data = json.loads(data_str)
event_type = data.get('type', 'unknown')
if event_type == 'connected':
print(f"š Connected to server!")
print(f" Client ID: {data.get('client_id')}")
print(f" Server: {data.get('server', {}).get('name')} v{data.get('server', {}).get('version')}")
print()
elif event_type == 'ping':
print(f"š Heartbeat (ping) - {data.get('timestamp')}")
elif event_type == 'disconnected':
print(f"š Disconnected from server")
print(f" Timestamp: {data.get('timestamp')}")
break
else:
print(f"šØ Event: {event_type}")
print(f" Data: {json.dumps(data, indent=2)}")
print()
except json.JSONDecodeError as e:
print(f"ā ļø Failed to parse JSON: {e}")
print(f" Raw data: {data_str}")
print()
except KeyboardInterrupt:
print("\n\nš Interrupted by user")
except httpx.ConnectError:
print(f"ā Failed to connect to {server_url}")
print(" Make sure the server is running:")
print(" $ make run")
except Exception as e:
print(f"ā Error: {e}")
print()
print("=" * 60)
print("ā
SSE Client Example Complete")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())