import asyncio
from typing import Any
import httpx
from fastmcp import FastMCP
mcp = FastMCP("weather")
# start server
API = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
async def make_nws_request(url: str) -> dict[str, Any] | None:
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/json"
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers, timeout=30.0)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error making request: {e}")
return None
def format_alert(feature: dict) -> str:
properties = feature.get("properties", {})
headline = properties.get("headline", "No headline")
description = properties.get("description", "No description")
return f"{headline}: {description}"
async def get_alerts_impl(state: str) -> str:
url = f"{API}/alerts/active?area={state}"
data = await make_nws_request(url)
if not data:
return "No alerts found or unable to fetch data."
alerts = data.get("features", [])
if not alerts:
return "No active alerts found."
formatted_alerts = [format_alert(alert) for alert in alerts]
return "\n\n".join(formatted_alerts)
@mcp.tool()
async def get_alerts(state: str) -> str:
return await get_alerts_impl(state)
# use 2 letter state codes
async def test():
print("Testing weather alerts...")
result = await get_alerts_impl("TX")
# states = [
# "AK", "AN", "AR", "AZ", "CA", "CO", "FL", "GM", "GU", "IA", "ID", "IN",
# "KS", "LA", "MD", "MN", "MO", "MS", "NC", "ND", "NE", "NM", "NV", "OK",
# "OR", "PK", "PZ", "SD", "TN", "TX", "WA", "WI"
# ]
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(test())
# Or run the MCP server
# mcp.run()