mcp_server.py•7.07 kB
"""
MCP Server for Weather Information
Exposes weather API as MCP tools for Gemini CLI
"""
import asyncio
import json
from typing import Optional
import httpx
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# Initialize MCP server
server = Server("weather-info-mcp")
# Weather API base URL (defaults to localhost)
WEATHER_API_URL = "http://localhost:8000"
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools"""
return [
Tool(
name="get_weather",
description="Get current weather information for a specified city",
inputSchema={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "The name of the city to get weather for"
},
"country": {
"type": "string",
"description": "Optional country name for more accurate results"
}
},
"required": ["city"]
}
),
Tool(
name="get_weather_batch",
description="Get weather information for multiple cities at once",
inputSchema={
"type": "object",
"properties": {
"cities": {
"type": "string",
"description": "Comma-separated list of city names (e.g., 'London,Paris,Tokyo')"
}
},
"required": ["cities"]
}
),
Tool(
name="check_api_health",
description="Check if the weather API is running and healthy",
inputSchema={
"type": "object",
"properties": {}
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Handle tool calls"""
if name == "get_weather":
city = arguments.get("city")
country = arguments.get("country")
return await get_weather(city, country)
elif name == "get_weather_batch":
cities = arguments.get("cities")
return await get_weather_batch(cities)
elif name == "check_api_health":
return await check_api_health()
else:
raise ValueError(f"Unknown tool: {name}")
async def get_weather(city: str, country: Optional[str] = None) -> list[TextContent]:
"""Get current weather information for a specified city"""
try:
async with httpx.AsyncClient() as client:
url = f"{WEATHER_API_URL}/weather"
params = {"city": city}
if country:
params["country"] = country
response = await client.get(url, params=params, timeout=10.0)
response.raise_for_status()
weather_data = response.json()
result_text = f"""Weather Information for {weather_data['city']}, {weather_data['country']}:
- Temperature: {weather_data['temperature']}°C
- Condition: {weather_data['condition']}
- Humidity: {weather_data['humidity']}%
- Wind Speed: {weather_data['wind_speed']} km/h
- Description: {weather_data['description']}
- Last Updated: {weather_data['timestamp']}
Full JSON:
{json.dumps(weather_data, indent=2)}"""
return [TextContent(type="text", text=result_text)]
except httpx.ConnectError:
error_msg = json.dumps({
"error": "Cannot connect to weather API. Make sure the FastAPI server is running on http://localhost:8000"
}, indent=2)
return [TextContent(type="text", text=error_msg)]
except Exception as e:
error_msg = json.dumps({"error": f"Failed to fetch weather: {str(e)}"}, indent=2)
return [TextContent(type="text", text=error_msg)]
async def get_weather_batch(cities: str) -> list[TextContent]:
"""Get weather information for multiple cities at once"""
try:
city_list = [city.strip() for city in cities.split(",")]
results = []
async with httpx.AsyncClient() as client:
tasks = []
for city in city_list:
url = f"{WEATHER_API_URL}/weather"
tasks.append(client.get(url, params={"city": city}, timeout=10.0))
responses = await asyncio.gather(*tasks, return_exceptions=True)
for i, response in enumerate(responses):
if isinstance(response, Exception):
results.append({
"city": city_list[i],
"error": str(response)
})
else:
try:
response.raise_for_status()
results.append(response.json())
except Exception as e:
results.append({
"city": city_list[i],
"error": str(e)
})
result_text = json.dumps(results, indent=2)
return [TextContent(type="text", text=result_text)]
except Exception as e:
error_msg = json.dumps({"error": f"Failed to fetch batch weather: {str(e)}"}, indent=2)
return [TextContent(type="text", text=error_msg)]
async def check_api_health() -> list[TextContent]:
"""Check if the weather API is running and healthy"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{WEATHER_API_URL}/health", timeout=5.0)
response.raise_for_status()
result_text = json.dumps(response.json(), indent=2)
return [TextContent(type="text", text=result_text)]
except httpx.ConnectError:
error_msg = json.dumps({
"status": "unhealthy",
"error": "Cannot connect to weather API. Make sure it's running on http://localhost:8000"
}, indent=2)
return [TextContent(type="text", text=error_msg)]
except Exception as e:
error_msg = json.dumps({
"status": "error",
"error": str(e)
}, indent=2)
return [TextContent(type="text", text=error_msg)]
async def main():
"""Run the MCP server using stdio transport"""
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="weather-info-mcp",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
asyncio.run(main())