server.py•1.6 kB
from typing import Any
import httpx
import json
from mcp.server.fastmcp import FastMCP
mcp = FastMCP('weather')
# Constants
OPENMETEO_API_BASE = "https://api.open-meteo.com/v1"
USER_AGENT = "weather-app/1.0"
# Helper function to make a request to the Open-Meteo API
async def make_openmeteo_request(url: str) -> dict[str, Any] | None:
"""Make a request to the Open-Meteo API with proper error handling."""
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/json"
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers, timeout=30.0)
response.raise_for_status()
return response.json()
except Exception:
return None
@mcp.tool()
async def get_current_weather(latitude: float, longitude: float) -> str:
"""Get current weather for a location.
Args:
latitude: Latitude of the location
longitude: Longitude of the location
"""
url = f"{OPENMETEO_API_BASE}/forecast?latitude={latitude}&longitude={longitude}¤t=temperature_2m,is_day,showers,cloud_cover,wind_speed_10m,wind_direction_10m,pressure_msl,snowfall,precipitation,relative_humidity_2m,apparent_temperature,rain,weather_code,surface_pressure,wind_gusts_10m"
data = await make_openmeteo_request(url)
if not data:
return "Unable to fetch current weather data for this location."
return json.dumps(data)
def main():
print("Hello from mcp-server-weather!")
if __name__ == "__main__":
main()
mcp.run(transport='stdio')