Skip to main content
Glama

Weather MCP Server

complete_test.py•6.21 kB
""" Complete Working Test for Weather MCP Server Uses SSE streaming connection as per MCP protocol """ import httpx import json import asyncio async def parse_sse_message(text): """Parse Server-Sent Events format""" lines = text.strip().split('\n') for line in lines: if line.startswith('data: '): return json.loads(line[6:]) return None async def test_weather_mcp(): """Test the Weather MCP server with proper SSE handling""" base_url = "http://localhost:8000/mcp" headers = { "Accept": "application/json, text/event-stream", "Content-Type": "application/json" } print("šŸŒ¤ļø Weather MCP Server - Complete Test\n") print("="*50) async with httpx.AsyncClient(timeout=30.0) as client: # Test 1: Initialize (this creates the session) print("\n1ļøāƒ£ INITIALIZING SESSION") print("-" * 50) init_response = await client.post( base_url, headers=headers, json={ "jsonrpc": "2.0", "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": { "name": "weather-test", "version": "1.0.0" } }, "id": 1 } ) print(f"Status: {init_response.status_code}") if init_response.status_code != 200: print(f"āŒ Initialization failed!") print(f"Response: {init_response.text}") return init_data = await parse_sse_message(init_response.text) if init_data and 'result' in init_data: print("āœ… Session initialized successfully!") print(f"Server: {init_data['result']['serverInfo']['name']}") print(f"Version: {init_data['result']['serverInfo']['version']}") print(f"Protocol: {init_data['result']['protocolVersion']}") # Get session ID from response headers or Set-Cookie session_id = None if 'set-cookie' in init_response.headers: cookie_header = init_response.headers['set-cookie'] print(f"Cookie: {cookie_header}") # Extract session from cookie if present # Test 2: Send initialized notification print("\n2ļøāƒ£ SENDING INITIALIZED NOTIFICATION") print("-" * 50) notif_response = await client.post( base_url, headers=headers, json={ "jsonrpc": "2.0", "method": "notifications/initialized" } ) print(f"Status: {notif_response.status_code}") print("āœ… Notification sent") # Test 3: Call weather tool directly (simpler approach) print("\n3ļøāƒ£ TESTING WEATHER TOOL") print("-" * 50) test_cities = ["London", "Paris", "Tokyo"] for idx, city in enumerate(test_cities, start=2): print(f"\nšŸŒ Getting weather for {city}...") # Each request is independent in stateless HTTP mode weather_response = await client.post( base_url, headers=headers, json={ "jsonrpc": "2.0", "method": "tools/call", "params": { "name": "weather", "arguments": {"city": city} }, "id": idx } ) if weather_response.status_code == 200: weather_data = await parse_sse_message(weather_response.text) if weather_data and 'result' in weather_data: content = weather_data['result']['content'][0]['text'] weather_info = json.loads(content) if 'error' in weather_info: print(f" āš ļø Error: {weather_info['error']}") else: print(f" āœ… Success!") print(f" Temperature: {weather_info['temperature_c']}°C") print(f" Wind Speed: {weather_info['windspeed_kmh']} km/h") print(f" Location: ({weather_info['latitude']:.2f}, {weather_info['longitude']:.2f})") elif weather_data and 'error' in weather_data: print(f" āŒ Error: {weather_data['error']['message']}") else: print(f" āŒ Request failed: {weather_response.status_code}") print(f" Response: {weather_response.text[:200]}") print("\n" + "="*50) print("✨ Test completed!\n") print("āœ… Your MCP server is working correctly!") print("🐳 Docker Hub: https://hub.docker.com/r/125478963/weather-mcp") print("šŸ“¦ GitHub: https://github.com/rajeevchandra/weather-mcp-docker\n") if __name__ == "__main__": print(""" ╔════════════════════════════════════════════════╗ ā•‘ Weather MCP Server - Working Test ā•‘ ā•‘ ā•‘ ā•‘ This test demonstrates how to interact with ā•‘ ā•‘ your MCP server using the proper protocol. ā•‘ ā•šā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā• """) try: asyncio.run(test_weather_mcp()) except KeyboardInterrupt: print("\nāš ļø Test interrupted") except httpx.ConnectError: print("\nāŒ Cannot connect to server!") print(" Make sure the Docker container is running:") print(" docker run -d -p 8000:8000 125478963/weather-mcp:latest") except Exception as e: print(f"\nāŒ Error: {e}") import traceback traceback.print_exc()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rajeevchandra/weather-mcp-docker'

If you have feedback or need assistance with the MCP directory API, please join our Discord server