#!/usr/bin/env python3
"""
Test Weather MCP Server - SSE Transport
This script demonstrates connecting to the weather MCP server using SSE
(Server-Sent Events) transport over HTTP. Unlike STDIO, the server runs
as a standalone HTTP process and clients connect via URL.
The test:
1. Starts server/weather_sse.py as a subprocess (listens on port 8000)
2. Waits for the server to be ready
3. Connects via http://127.0.0.1:8000/sse
4. Runs a query and prints the result
5. Shuts down the server process
"""
import asyncio
import os
import signal
import subprocess
import time
from dotenv import load_dotenv
from langchain_groq import ChatGroq
from mcp_use import MCPAgent, MCPClient
# SSE server configuration
SSE_SERVER_PORT = 8000
SSE_SERVER_URL = f"http://127.0.0.1:{SSE_SERVER_PORT}/sse"
async def wait_for_server(url: str, timeout: float = 15.0) -> bool:
"""
Poll the SSE endpoint until the server responds with 200 or timeout.
Args:
url: The SSE endpoint URL (e.g., http://127.0.0.1:8000/sse)
timeout: Maximum seconds to wait
Returns:
True if server responded with 200, False if timeout
"""
import httpx
start = time.time()
while time.time() - start < timeout:
try:
async with httpx.AsyncClient() as client:
# GET to /sse - for SSE, server may hold connection open
# A successful connection indicates server is ready
response = await client.get(url, timeout=2.0)
if response.status_code == 200:
return True
except Exception:
pass
await asyncio.sleep(0.3)
return False
async def test_sse():
"""
Start SSE server, connect, run query, then shutdown server.
Uses MCPClient.from_dict() with url config instead of command/args.
The HttpConnector connects via HTTP and uses SSE for the MCP protocol.
"""
load_dotenv()
os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY")
print("=== Weather MCP Test: SSE Transport ===\n")
# Project root for spawning server with correct cwd
project_root = os.path.dirname(os.path.abspath(__file__))
# Start the SSE server as a subprocess
# It runs weather_sse.py which uses mcp.run(transport="sse")
print("Starting weather server in SSE mode on port 8000...")
server_process = subprocess.Popen(
["uv", "run", "server/weather_sse.py"],
cwd=project_root,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
# os.setsid creates new process group for clean shutdown on Unix
preexec_fn=os.setsid if hasattr(os, "setsid") else None,
)
try:
# Wait for server to accept connections
if not await wait_for_server(SSE_SERVER_URL):
# Fallback: give server a few more seconds to start
print("Waiting for server...")
await asyncio.sleep(3)
else:
print("Server is ready.")
print("Connecting to weather server via SSE (HTTP)...")
# Config for HTTP/SSE: "url" key tells MCPClient to use HttpConnector
# The client will GET /sse for event stream and POST to /messages/ for requests
config = {"mcpServers": {"weather": {"url": SSE_SERVER_URL}}}
client = MCPClient.from_dict(config)
llm = ChatGroq(model="llama-3.3-70b-versatile")
agent = MCPAgent(llm=llm, client=client, max_steps=5, memory_enabled=False)
print("Querying: What are the weather alerts for California?")
response = await agent.run("What are the weather alerts for California?")
print("\nResponse:", response)
await client.close_all_sessions()
finally:
# Shutdown server: kill process group (Unix) or terminate (Windows)
try:
if hasattr(os, "setsid"):
os.killpg(os.getpgid(server_process.pid), signal.SIGTERM)
else:
server_process.terminate()
server_process.wait(timeout=5)
except Exception:
server_process.kill()
print("\nDone.")
if __name__ == "__main__":
# Ensure we run from project root so relative paths (server/weather_sse.py) work
project_root = os.path.dirname(os.path.abspath(__file__))
os.chdir(project_root)
asyncio.run(test_sse())