import sys
from mcp.server.fastmcp import FastMCP
import httpx
import os
import datetime
from dotenv import load_dotenv
# Ensure correct path for imports
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.insert(0, current_dir)
try:
from utils import convert_latlon_to_grid, get_base_date_time
except ImportError:
try:
from .utils import convert_latlon_to_grid, get_base_date_time
except ImportError:
import utils
convert_latlon_to_grid = utils.convert_latlon_to_grid
get_base_date_time = utils.get_base_date_time
# Load environment variables
load_dotenv()
# Initialize MCP Server
mcp = FastMCP("kma-weather")
API_BASE_URL = "http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0"
# Use DECODED key for httpx (let httpx handle encoding)
API_KEY = os.getenv("KMA_API_KEY_DECODED")
if not API_KEY:
sys.stderr.write("Warning: KMA_API_KEY_DECODED not found in environment variables.\n")
# --- Resources ---
@mcp.resource("weather://seoul/now")
async def get_seoul_weather_now() -> str:
"""Get current weather for Seoul (Grid: 60, 127)."""
return await fetch_ultra_srt_ncst(60, 127)
@mcp.resource("weather://{latitude}/{longitude}/now")
async def get_local_weather_now(latitude: float, longitude: float) -> str:
"""Get current weather for a specific location (Latitude, Longitude)."""
nx, ny = convert_latlon_to_grid(latitude, longitude)
return await fetch_ultra_srt_ncst(nx, ny)
# --- Tools ---
@mcp.tool()
async def get_ultra_short_term_forecast(latitude: float, longitude: float) -> str:
"""
Get Ultra Short Term Forecast (Next 6 hours).
useful for: checking immediate weather changes, near-future rain/snow probability.
"""
nx, ny = convert_latlon_to_grid(latitude, longitude)
base_date, base_time = get_base_date_time(45) # Forecast is available after 45 mins of every hour
params = {
"serviceKey": API_KEY,
"pageNo": "1",
"numOfRows": "60", # Enough to cover next 6 hours
"dataType": "JSON",
"base_date": base_date,
"base_time": base_time,
"nx": nx,
"ny": ny
}
return await make_api_request("getUltraSrtFcst", params)
@mcp.tool()
async def get_village_forecast(latitude: float, longitude: float) -> str:
"""
Get Village Forecast (Short Term, up to 3 days).
useful for: planning for tomorrow or the day after, checking daily highs/lows.
Note: Can be slow as it returns a lot of data.
"""
nx, ny = convert_latlon_to_grid(latitude, longitude)
# Logic for Village Forecast Base Time (Approximate)
# Available at: 02, 05, 08, 11, 14, 17, 20, 23 (+10 mins)
now = datetime.datetime.now()
base_date = now.strftime("%Y%m%d")
# Simple logic: Find the latest 3-hour interval that has passed by at least 15 mins
times = [2, 5, 8, 11, 14, 17, 20, 23]
selected_time = 23 # Fallback
selected_date = base_date
current_hour = now.hour
# If early morning (before 02:15 approx), use yesterday 23:00
if current_hour < 2 or (current_hour == 2 and now.minute < 15):
selected_date = (now - datetime.timedelta(days=1)).strftime("%Y%m%d")
selected_time = 23
else:
for t in times:
if current_hour >= t and (current_hour > t or now.minute >= 15):
selected_time = t
else:
break
base_time = f"{selected_time:02d}00"
params = {
"serviceKey": API_KEY,
"pageNo": "1",
"numOfRows": "1000",
"dataType": "JSON",
"base_date": selected_date,
"base_time": base_time,
"nx": nx,
"ny": ny
}
return await make_api_request("getVilageFcst", params)
# --- Helpers ---
async def fetch_ultra_srt_ncst(nx: int, ny: int) -> str:
"""Helper to fetch Ultra Short Term Live data (current weather status)."""
base_date, base_time = get_base_date_time(40) # Available after 40 mins
params = {
"serviceKey": API_KEY,
"pageNo": "1",
"numOfRows": "100",
"dataType": "JSON",
"base_date": base_date,
"base_time": base_time,
"nx": nx,
"ny": ny
}
return await make_api_request("getUltraSrtNcst", params)
async def make_api_request(endpoint: str, params: dict) -> str:
"""Generic API request handler."""
url = f"{API_BASE_URL}/{endpoint}"
async with httpx.AsyncClient() as client:
try:
# print(f"DEBUG status: Requesting {url} with base_time={params.get('base_time')}")
response = await client.get(url, params=params, timeout=10.0)
response.raise_for_status()
# Try to return pretty JSON text if possible/needed, or just raw text
return response.text
except Exception as e:
return f"Error calling {endpoint}: {str(e)}"
if __name__ == "__main__":
mcp.run()