Skip to main content
Glama
weather.py15.4 kB
"""Weather data fetching from JMA APIs.""" from datetime import datetime, timedelta, timezone from typing import Any, Optional import httpx # Japan Standard Time (UTC+9) JST = timezone(timedelta(hours=9)) # Wind direction mapping (16 directions) WIND_DIRECTIONS = { 1: "NNE", 2: "NE", 3: "ENE", 4: "E", 5: "ESE", 6: "SE", 7: "SSE", 8: "S", 9: "SSW", 10: "SW", 11: "WSW", 12: "W", 13: "WNW", 14: "NW", 15: "NNW", 16: "N" } WIND_DIRECTIONS_JA = { 1: "北北東", 2: "北東", 3: "東北東", 4: "東", 5: "東南東", 6: "南東", 7: "南南東", 8: "南", 9: "南南西", 10: "南西", 11: "西南西", 12: "西", 13: "西北西", 14: "北西", 15: "北北西", 16: "北" } def get_latest_data_time() -> datetime: """Get the latest available data time (approximately 30 minutes behind current time).""" now = datetime.now(JST) # Data is available about 30-40 minutes after observation data_time = now - timedelta(minutes=40) # Round down to 10-minute intervals data_time = data_time.replace(minute=(data_time.minute // 10) * 10, second=0, microsecond=0) return data_time def format_time_for_api(dt: datetime) -> str: """Format datetime for JMA API URL.""" return dt.strftime('%Y%m%d%H%M00') def parse_observation_value(value: Optional[list[float | int | None]]) -> Optional[float]: """Parse observation value from [value, quality_flag] format.""" if value is None or len(value) < 2: return None val, quality = value if val is None or quality is None: return None return float(val) if val is not None else None async def fetch_amedas_data( station_code: Optional[str] = None, data_time: Optional[datetime] = None ) -> dict[str, Any]: """ Fetch AMeDAS observation data from JMA API. Args: station_code: Optional station code to filter (returns all if None) data_time: Optional datetime for historical data (uses latest if None) Returns: Dictionary with observation data """ if data_time is None: data_time = get_latest_data_time() time_str = format_time_for_api(data_time) url = f'https://www.jma.go.jp/bosai/amedas/data/map/{time_str}.json' async with httpx.AsyncClient() as client: response = await client.get(url, timeout=30.0) response.raise_for_status() raw_data = response.json() # Parse and format the data result: dict[str, Any] = { "observation_time": data_time.isoformat(), "observation_time_jst": data_time.strftime('%Y-%m-%d %H:%M JST'), "stations": {} } stations_to_process = {station_code: raw_data.get(station_code)} if station_code else raw_data for code, data in stations_to_process.items(): if data is None: continue station_data: dict[str, Any] = {"code": code} # Temperature (℃) if "temp" in data: station_data["temperature"] = { "value": parse_observation_value(data["temp"]), "unit": "℃" } # Humidity (%) if "humidity" in data: station_data["humidity"] = { "value": parse_observation_value(data["humidity"]), "unit": "%" } # Pressure (hPa) if "pressure" in data: station_data["pressure"] = { "value": parse_observation_value(data["pressure"]), "unit": "hPa" } # Sea level pressure (hPa) if "normalPressure" in data: station_data["sea_level_pressure"] = { "value": parse_observation_value(data["normalPressure"]), "unit": "hPa" } # Wind if "wind" in data: wind_speed = parse_observation_value(data["wind"]) wind_dir_code = parse_observation_value(data.get("windDirection", [None, None])) wind_dir = None wind_dir_ja = None if wind_dir_code is not None: wind_dir_code = int(wind_dir_code) wind_dir = WIND_DIRECTIONS.get(wind_dir_code) wind_dir_ja = WIND_DIRECTIONS_JA.get(wind_dir_code) station_data["wind"] = { "speed": wind_speed, "speed_unit": "m/s", "direction": wind_dir, "direction_ja": wind_dir_ja, "direction_code": wind_dir_code } # Precipitation precipitation = {} if "precipitation10m" in data: precipitation["10min"] = parse_observation_value(data["precipitation10m"]) if "precipitation1h" in data: precipitation["1h"] = parse_observation_value(data["precipitation1h"]) if "precipitation3h" in data: precipitation["3h"] = parse_observation_value(data["precipitation3h"]) if "precipitation24h" in data: precipitation["24h"] = parse_observation_value(data["precipitation24h"]) if precipitation: station_data["precipitation"] = { **precipitation, "unit": "mm" } # Sunshine if "sun1h" in data: station_data["sunshine"] = { "1h": parse_observation_value(data["sun1h"]), "unit": "hours" } # Snow snow = {} if "snow" in data: snow["depth"] = parse_observation_value(data["snow"]) if "snow1h" in data: snow["1h"] = parse_observation_value(data["snow1h"]) if "snow6h" in data: snow["6h"] = parse_observation_value(data["snow6h"]) if "snow12h" in data: snow["12h"] = parse_observation_value(data["snow12h"]) if "snow24h" in data: snow["24h"] = parse_observation_value(data["snow24h"]) if snow: station_data["snow"] = { **snow, "unit": "cm" } result["stations"][code] = station_data return result async def fetch_weather_warnings() -> dict[str, Any]: """ Fetch current weather warnings/advisories from JMA. Returns: Dictionary with warning data """ url = "https://www.jma.go.jp/bosai/warning/data/warning/000000.json" async with httpx.AsyncClient() as client: response = await client.get(url, timeout=30.0) response.raise_for_status() data: dict[str, Any] = response.json() return data async def fetch_forecast(area_code: str) -> list[dict[str, Any]]: """ Fetch weather forecast for a specific area. Args: area_code: JMA area code (e.g., "130000" for Tokyo) Returns: List of forecast data """ url = f"https://www.jma.go.jp/bosai/forecast/data/forecast/{area_code}.json" async with httpx.AsyncClient() as client: response = await client.get(url, timeout=30.0) response.raise_for_status() data: list[dict[str, Any]] = response.json() return data async def fetch_historical_amedas_data( station_code: str, target_time: datetime ) -> dict[str, Any]: """ Fetch historical AMeDAS data for a specific time. Args: station_code: Station code (e.g., '44132' for Tokyo) target_time: Target datetime (available for approximately past 1-2 weeks) Returns: Dictionary with observation data for the specified time """ # Round to 10-minute intervals target_time = target_time.replace( minute=(target_time.minute // 10) * 10, second=0, microsecond=0 ) time_str = format_time_for_api(target_time) url = f'https://www.jma.go.jp/bosai/amedas/data/map/{time_str}.json' async with httpx.AsyncClient() as client: response = await client.get(url, timeout=30.0) response.raise_for_status() raw_data = response.json() station_data = raw_data.get(station_code) if station_data is None: return { "error": f"No data found for station {station_code} at {target_time.isoformat()}", "observation_time": target_time.isoformat() } result = { "observation_time": target_time.isoformat(), "observation_time_jst": target_time.strftime('%Y-%m-%d %H:%M JST'), "station_code": station_code, "data": _parse_station_data(station_code, station_data) } return result async def fetch_time_series_data( station_code: str, hours: int = 24, interval_minutes: int = 60 ) -> dict[str, Any]: """ Fetch time series data for a station. Args: station_code: Station code (e.g., '44132' for Tokyo) hours: Number of hours to fetch (default: 24, max recommended: 168 for ~1 week) interval_minutes: Interval between data points in minutes (10, 30, or 60) Returns: Dictionary with time series data """ if interval_minutes not in [10, 30, 60]: interval_minutes = 60 # Limit hours to prevent too many requests hours = min(hours, 168) # Max 1 week start_time = get_latest_data_time() data_points = [] errors = [] # Calculate number of points num_points = (hours * 60) // interval_minutes async with httpx.AsyncClient() as client: for i in range(num_points): target_time = start_time - timedelta(minutes=i * interval_minutes) time_str = format_time_for_api(target_time) url = f'https://www.jma.go.jp/bosai/amedas/data/map/{time_str}.json' try: response = await client.get(url, timeout=10.0) response.raise_for_status() raw_data = response.json() station_data = raw_data.get(station_code) if station_data: parsed = _parse_station_data(station_code, station_data) parsed["observation_time"] = target_time.isoformat() parsed["observation_time_jst"] = target_time.strftime('%Y-%m-%d %H:%M') data_points.append(parsed) except httpx.HTTPStatusError as e: if e.response.status_code == 404: errors.append({ "time": target_time.isoformat(), "error": "Data not available (past retention period)" }) break # Stop if we hit the retention limit else: errors.append({ "time": target_time.isoformat(), "error": str(e) }) except Exception as e: errors.append({ "time": target_time.isoformat(), "error": str(e) }) # Reverse to chronological order data_points.reverse() return { "station_code": station_code, "requested_hours": hours, "interval_minutes": interval_minutes, "data_points": len(data_points), "time_series": data_points, "errors": errors if errors else None } def _parse_station_data(code: str, data: dict[str, Any]) -> dict[str, Any]: """Parse raw station data into structured format.""" station_data: dict[str, Any] = {"code": code} # Temperature (℃) if "temp" in data: station_data["temperature"] = { "value": parse_observation_value(data["temp"]), "unit": "℃" } # Humidity (%) if "humidity" in data: station_data["humidity"] = { "value": parse_observation_value(data["humidity"]), "unit": "%" } # Pressure (hPa) if "pressure" in data: station_data["pressure"] = { "value": parse_observation_value(data["pressure"]), "unit": "hPa" } # Sea level pressure (hPa) if "normalPressure" in data: station_data["sea_level_pressure"] = { "value": parse_observation_value(data["normalPressure"]), "unit": "hPa" } # Wind if "wind" in data: wind_speed = parse_observation_value(data["wind"]) wind_dir_code = parse_observation_value(data.get("windDirection", [None, None])) wind_dir = None wind_dir_ja = None if wind_dir_code is not None: wind_dir_code = int(wind_dir_code) wind_dir = WIND_DIRECTIONS.get(wind_dir_code) wind_dir_ja = WIND_DIRECTIONS_JA.get(wind_dir_code) station_data["wind"] = { "speed": wind_speed, "speed_unit": "m/s", "direction": wind_dir, "direction_ja": wind_dir_ja, "direction_code": wind_dir_code } # Precipitation precipitation = {} if "precipitation10m" in data: precipitation["10min"] = parse_observation_value(data["precipitation10m"]) if "precipitation1h" in data: precipitation["1h"] = parse_observation_value(data["precipitation1h"]) if "precipitation3h" in data: precipitation["3h"] = parse_observation_value(data["precipitation3h"]) if "precipitation24h" in data: precipitation["24h"] = parse_observation_value(data["precipitation24h"]) if precipitation: station_data["precipitation"] = { **precipitation, "unit": "mm" } # Sunshine if "sun1h" in data: station_data["sunshine"] = { "1h": parse_observation_value(data["sun1h"]), "unit": "hours" } # Snow snow = {} if "snow" in data: snow["depth"] = parse_observation_value(data["snow"]) if "snow1h" in data: snow["1h"] = parse_observation_value(data["snow1h"]) if "snow6h" in data: snow["6h"] = parse_observation_value(data["snow6h"]) if "snow12h" in data: snow["12h"] = parse_observation_value(data["snow12h"]) if "snow24h" in data: snow["24h"] = parse_observation_value(data["snow24h"]) if snow: station_data["snow"] = { **snow, "unit": "cm" } return station_data # Area codes for major prefectures AREA_CODES = { "hokkaido_sapporo": "016000", "aomori": "020000", "iwate": "030000", "miyagi": "040000", "akita": "050000", "yamagata": "060000", "fukushima": "070000", "ibaraki": "080000", "tochigi": "090000", "gunma": "100000", "saitama": "110000", "chiba": "120000", "tokyo": "130000", "kanagawa": "140000", "niigata": "150000", "toyama": "160000", "ishikawa": "170000", "fukui": "180000", "yamanashi": "190000", "nagano": "200000", "gifu": "210000", "shizuoka": "220000", "aichi": "230000", "mie": "240000", "shiga": "250000", "kyoto": "260000", "osaka": "270000", "hyogo": "280000", "nara": "290000", "wakayama": "300000", "tottori": "310000", "shimane": "320000", "okayama": "330000", "hiroshima": "340000", "yamaguchi": "350000", "tokushima": "360000", "kagawa": "370000", "ehime": "380000", "kochi": "390000", "fukuoka": "400000", "saga": "410000", "nagasaki": "420000", "kumamoto": "430000", "oita": "440000", "miyazaki": "450000", "kagoshima": "460000", "okinawa": "470000", }

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/koizumikento/jma-data-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server