#!/usr/bin/env python3
"""
SSE Client Example
Demonstrates connecting to MCP-MAF server via SSE.
Useful for testing SSE transport layer.
"""
import asyncio
import httpx
import json
async def main():
"""Main function"""
print("=" * 60)
print(" MCP-MAF SSE Client Example")
print("=" * 60)
print()
server_url = "http://localhost:8000"
print(f"π‘ Connecting to SSE endpoint: {server_url}/sse")
print("π Press Ctrl+C to stop")
print()
try:
async with httpx.AsyncClient() as client:
async with client.stream('GET', f'{server_url}/sse') as response:
print(f"β
Connected! Status: {response.status_code}")
print()
async for line in response.aiter_lines():
if line.startswith('data:'):
# Parse SSE data
data_str = line[5:].strip() # Remove 'data:' prefix
try:
data = json.loads(data_str)
event_type = data.get('type', 'unknown')
if event_type == 'connected':
print(f"π Connected to server!")
print(f" Client ID: {data.get('client_id')}")
print(f" Server: {data.get('server', {}).get('name')} v{data.get('server', {}).get('version')}")
print()
elif event_type == 'ping':
print(f"π Heartbeat (ping) - {data.get('timestamp')}")
elif event_type == 'disconnected':
print(f"π Disconnected from server")
print(f" Timestamp: {data.get('timestamp')}")
break
else:
print(f"π¨ Event: {event_type}")
print(f" Data: {json.dumps(data, indent=2)}")
print()
except json.JSONDecodeError as e:
print(f"β οΈ Failed to parse JSON: {e}")
print(f" Raw data: {data_str}")
print()
except KeyboardInterrupt:
print("\n\nπ Interrupted by user")
except httpx.ConnectError:
print(f"β Failed to connect to {server_url}")
print(" Make sure the server is running:")
print(" $ make run")
except Exception as e:
print(f"β Error: {e}")
print()
print("=" * 60)
print("β
SSE Client Example Complete")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())