KNMI Weather MCP

import asyncio import logging import os import tempfile from datetime import datetime from math import atan2, cos, radians, sin, sqrt from pathlib import Path from typing import Any, Dict, Optional import httpx import numpy as np import pandas as pd import xarray as xr from dotenv import load_dotenv from fastmcp import Context from knmi_weather_mcp.models import Coordinates, WeatherStation load_dotenv() # Get logger for this module logger = logging.getLogger("knmi_weather.station") class StationManager: """Manages KNMI weather stations""" _instance = None _initialized = False # API endpoints BASE_URL = "https://api.dataplatform.knmi.nl/open-data/v1" DATASET_NAME = "Actuele10mindataKNMIstations" DATASET_VERSION = "2" # Netherlands bounding box (mainland Netherlands) NL_BOUNDS = { "min_lat": 50.7, # Southernmost point "max_lat": 53.7, # Northernmost point "min_lon": 3.3, # Westernmost point "max_lon": 7.2, # Easternmost point } # Parameter mapping for 10-minute data PARAMETER_MAPPING = { "T": "temperature", # Air temperature "RH": "relative_humidity", # Relative humidity "FF": "wind_speed", # 10-min mean wind speed "DD": "wind_direction", # Wind direction "VIS": "visibility", # Visibility "P": "air_pressure", # Air pressure "RR": "precipitation_amount", # Precipitation amount "DR": "precipitation_duration", # Precipitation duration } def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if not StationManager._initialized: self._stations: Dict[str, WeatherStation] = {} self._last_update: Optional[datetime] = None self._lock = asyncio.Lock() self._api_key = os.getenv("KNMI_API_KEY") if not self._api_key: logger.error("KNMI_API_KEY environment variable is missing") raise ValueError("KNMI_API_KEY environment variable is required") # Strip "Bearer" prefix if it exists in the env var self._api_key = self._api_key.replace("Bearer ", "") logger.info("StationManager initialized successfully") StationManager._initialized = True @property def stations(self) -> Dict[str, WeatherStation]: return self._stations async def refresh_stations(self, ctx: Optional[Context] = None) -> None: """Fetch current stations from KNMI Open Data API""" async with self._lock: try: async with httpx.AsyncClient() as client: headers = {"Authorization": self._api_key} # Get the latest file to extract station information list_url = f"{self.BASE_URL}/datasets/{self.DATASET_NAME}/versions/{self.DATASET_VERSION}/files" params = {"maxKeys": "1", "sorting": "desc", "order_by": "lastModified"} logger.info("Fetching latest file for station information") response = await client.get(list_url, headers=headers, params=params) response.raise_for_status() files_data = response.json() if not files_data.get("files"): raise ValueError("No data files available") latest_file = files_data["files"][0] filename = latest_file["filename"] # Get download URL for the file url_endpoint = f"{self.BASE_URL}/datasets/{self.DATASET_NAME}/versions/{self.DATASET_VERSION}/files/{filename}/url" # noqa: E501 url_response = await client.get(url_endpoint, headers=headers) url_response.raise_for_status() download_url = url_response.json().get("temporaryDownloadUrl") if not download_url: raise ValueError("No download URL available") # Download and read the NetCDF file to get station information with tempfile.TemporaryDirectory() as temp_dir: temp_file = Path(temp_dir) / filename file_response = await client.get(download_url) file_response.raise_for_status() temp_file.write_bytes(file_response.content) with xr.open_dataset(temp_file) as ds: new_stations = self._parse_stations(ds) if not new_stations: logger.warning("No valid stations found in Netherlands, using fallback stations") new_stations = FALLBACK_STATIONS self._stations = new_stations self._last_update = datetime.now() logger.info(f"Successfully loaded {len(self._stations)} stations") except Exception as e: logger.error(f"Failed to refresh stations: {str(e)}") if not self._stations: self._stations = FALLBACK_STATIONS def _parse_stations(self, ds: xr.Dataset) -> Dict[str, WeatherStation]: """Parse station information from NetCDF file""" new_stations = {} # Get station IDs and coordinates if "station" in ds.dims: station_ids = ds["station"].values lats = ds["lat"].values lons = ds["lon"].values # Log the raw data found logger.debug(f"Raw station IDs found: {station_ids}") logger.debug(f"Raw latitudes found: {lats}") logger.debug(f"Raw longitudes found: {lons}") logger.debug(f"Dataset structure: {ds}") logger.debug(f"Dataset dimensions: {ds.dims}") logger.debug(f"Dataset variables: {list(ds.variables)}") logger.debug(f"Dataset attributes: {ds.attrs}") # Convert station IDs to our format (remove '06' prefix) station_ids = [str(sid).replace("06", "") if str(sid).startswith("06") else str(sid) for sid in station_ids] logger.debug(f"Converted station IDs: {station_ids}") # Try to get station names from the dataset station_names = None name_variables = [ "name", "station_name", "stn_name", "stationname", "station_names", "names", "stn_names", "stationnames", "NAMES", "NAME", "STN_NAME", "STATION_NAME", ] for name_var in name_variables: if name_var in ds.variables: try: station_names = ds[name_var].values logger.info(f"Found station names in variable: {name_var}") logger.debug(f"Raw station names: {station_names}") break except Exception as e: logger.debug(f"Could not read station names from {name_var}: {e}") if station_names is None: # Try to find station names in dataset attributes for attr_name in ds.attrs: if "name" in attr_name.lower() or "station" in attr_name.lower(): logger.debug(f"Found potential station name attribute: {attr_name}") try: attr_value = ds.attrs[attr_name] if isinstance(attr_value, (str, bytes)): logger.info(f"Found station names in attribute: {attr_name}") station_names = [attr_value] elif isinstance(attr_value, (list, np.ndarray)): logger.info(f"Found station names array in attribute: {attr_name}") station_names = attr_value break except Exception as e: logger.debug(f"Could not read station names from attribute {attr_name}: {e}") logger.info(f"Found {len(station_ids)} stations in NetCDF file") logger.debug(f"Available dimensions: {ds.dims}") logger.debug(f"Available variables: {list(ds.variables)}") for i, station_id in enumerate(station_ids): station_id = str(station_id) coords = Coordinates(latitude=float(lats[i]), longitude=float(lons[i])) # Get station name from dataset station_name = None if station_names is not None: try: name = station_names[i] if i < len(station_names) else None if isinstance(name, bytes): name = name.decode("utf-8") if isinstance(name, str) and name.strip(): station_name = name.strip() logger.debug(f"Found name '{station_name}' for station {station_id}") except Exception as e: logger.debug(f"Could not decode station name for {station_id}: {e}") if not station_name: station_name = f"Station {station_id}" logger.debug(f"Using default name '{station_name}' for station {station_id}") # Log coordinates before validation logger.debug(f"Station {station_id} coordinates: lat={coords.latitude}, lon={coords.longitude}") # Only add station if coordinates are within mainland Netherlands if self._validate_coordinates(coords): new_stations[station_id] = WeatherStation( id=station_id, name=station_name, coordinates=coords, elevation=0.0, # Could be extracted if available station_type="Weather", region="Netherlands", ) logger.debug(f"Added station {station_id}: {new_stations[station_id]}") else: logger.debug(f"Station {station_id} coordinates outside Netherlands bounds") return new_stations def find_nearest_station(self, coords: Coordinates) -> WeatherStation: """Find the nearest weather station to given coordinates""" if not self._stations: raise ValueError("No stations available. Call refresh_stations first.") def calculate_distance(station: WeatherStation) -> float: """Calculate distance using Haversine formula""" # Convert coordinates to radians lat1, lon1 = radians(coords.latitude), radians(coords.longitude) lat2, lon2 = ( radians(station.coordinates.latitude), radians(station.coordinates.longitude), ) # Haversine formula dlat = lat2 - lat1 dlon = lon2 - lon1 a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2 c = 2 * atan2(sqrt(a), sqrt(1 - a)) # Earth's radius in kilometers R = 6371.0 return R * c # Find station with minimum distance nearest_station = min(self._stations.values(), key=calculate_distance) logger.info(f"Found nearest station: {nearest_station.name} ({nearest_station.id})") return nearest_station def _validate_coordinates(self, coords: Coordinates) -> bool: """Check if coordinates are within the Netherlands""" return ( self.NL_BOUNDS["min_lat"] <= coords.latitude <= self.NL_BOUNDS["max_lat"] and self.NL_BOUNDS["min_lon"] <= coords.longitude <= self.NL_BOUNDS["max_lon"] ) async def get_raw_station_data(self, station_id: str, ctx: Optional[Context] = None) -> Dict[str, Any]: """Get raw data for a specific station using KNMI Open Data API""" logger.info(f"Starting data fetch for station {station_id}") async with httpx.AsyncClient() as client: try: headers = {"Authorization": self._api_key} # Get station coordinates station = self._stations.get(station_id) if not station: raise ValueError(f"Station {station_id} not found") # Validate coordinates are within bounds if not self._validate_coordinates(station.coordinates): error_msg = ( f"Coordinates ({station.coordinates.latitude}, {station.coordinates.longitude}) are outside " "valid bounds" ) logger.error(error_msg) raise ValueError(error_msg) # List files endpoint with station filter list_url = f"{self.BASE_URL}/datasets/{self.DATASET_NAME}/versions/{self.DATASET_VERSION}/files" # Get the latest file for this station (sort by lastModified in descending order) params = { "maxKeys": "1", "sorting": "desc", "order_by": "lastModified", "station": station_id, # Filter for specific station } logger.info(f"Requesting latest 10-minute data for station {station_id}") logger.debug(f"Query parameters: {params}") # Get the latest file metadata response = await client.get(list_url, headers=headers, params=params) response.raise_for_status() files_data = response.json() if not files_data.get("files"): raise ValueError(f"No data files available for station {station_id}") latest_file = files_data["files"][0] filename = latest_file["filename"] # Get download URL for the file url_endpoint = ( f"{self.BASE_URL}/datasets/{self.DATASET_NAME}/versions/{self.DATASET_VERSION}/files/{filename}/url" ) url_response = await client.get(url_endpoint, headers=headers) url_response.raise_for_status() download_url = url_response.json().get("temporaryDownloadUrl") if not download_url: raise ValueError("No download URL available") # Create a temporary directory to store the NetCDF file with tempfile.TemporaryDirectory() as temp_dir: temp_file = Path(temp_dir) / filename # Download the file logger.info(f"Downloading file: {filename}") file_response = await client.get(download_url) file_response.raise_for_status() # Save the binary content temp_file.write_bytes(file_response.content) # Open and read the NetCDF file logger.info("Reading NetCDF file") try: with xr.open_dataset(temp_file) as ds: # Log the structure of the dataset logger.debug(f"NetCDF structure: {ds}") logger.debug(f"Available variables: {list(ds.variables)}") logger.debug(f"Dimensions: {ds.dims}") # Verify we have the correct station data if "station" in ds.dims: stations_in_file = ds["station"].values logger.debug(f"Stations in file: {stations_in_file}") # Convert our station_id to match KNMI format (add '06' prefix if needed) knmi_station_id = f"06{station_id}" if not station_id.startswith("06") else station_id logger.debug(f"Looking for station ID {knmi_station_id} (original: {station_id})") # Convert to the same type as in the file file_station_type = type(stations_in_file[0]) comparable_station_id = file_station_type(knmi_station_id) if comparable_station_id not in stations_in_file: raise ValueError( f"Station {station_id} (as {knmi_station_id}) not found in file. " f"Available stations: {stations_in_file}" ) # Find the index of our station station_idx = np.where(stations_in_file == comparable_station_id)[0] if len(station_idx) == 0: raise ValueError(f"Could not find index for station {knmi_station_id}") station_idx = station_idx[0] logger.debug(f"Found station {knmi_station_id} at index {station_idx}") else: raise ValueError("No station dimension found in file") # Create a dictionary to store the measurements measurements = {} # Map NetCDF variables to our model fields param_mapping = { "ta": "temperature", # Air temperature (°C) "rh": "relative_humidity", # Relative humidity (%) "ff": "wind_speed", # Wind speed (m/s) "dd": "wind_direction", # Wind direction (degrees) "vis": "visibility", # Visibility (meters) "pp": "air_pressure", # Air pressure (hPa) "rr": "precipitation_amount", # Precipitation amount (mm) "dr": "precipitation_duration", # Precipitation duration (minutes) } # Extract values for each parameter for nc_param, model_field in param_mapping.items(): if nc_param in ds.variables: var = ds[nc_param] logger.debug(f"Found variable {nc_param} with dimensions {var.dims}") try: # Get the value for our specific station if "station" in var.dims: # Select our station first value = var.isel(station=station_idx) # Then get the latest time if it exists if "time" in value.dims: value = value.isel(time=-1) # Handle any remaining dimensions while len(value.dims) > 0: value = value.isel({value.dims[0]: 0}) value = float(value.values) else: logger.warning(f"Variable {nc_param} does not have a station dimension") continue if not np.isnan(value): measurements[model_field] = value logger.debug(f"Got {model_field} = {value} for station {station_id}") except Exception as e: logger.warning(f"Could not extract value for {nc_param}: {e}") # Get the timestamp from the time variable if it exists timestamp = latest_file.get("lastModified") if "time" in ds.variables: try: time_var = ds["time"] # Get the latest timestamp time_value = time_var.isel(time=-1) # If multi-dimensional, take the first value while len(time_value.dims) > 0: time_value = time_value.isel({time_value.dims[0]: 0}) timestamp = pd.Timestamp(time_value.values).isoformat() except Exception as e: logger.warning(f"Could not parse time variable: {e}") logger.debug(f"Time variable structure: {time_var}") return { "measurements": measurements, "metadata": { "station_id": station_id, "station_name": station.name, "timestamp": timestamp, "filename": filename, "variables": list( ds.variables.keys() ), # Add list of available variables for debugging }, } except Exception as e: logger.error(f"Error reading NetCDF file: {e}") # Try to read the file content to debug logger.debug(f"File content (first 1000 bytes): {file_response.content[:1000]}") raise ValueError(f"Failed to read NetCDF file: {str(e)}") from e except Exception as e: logger.error(f"Error in get_raw_station_data: {str(e)}") if isinstance(e, httpx.HTTPError): logger.error(f"HTTP Error details: {str(e)}") logger.error(f"Request URL: {e.request.url if e.request else 'Unknown'}") raise # Default fallback stations FALLBACK_STATIONS = { "260": WeatherStation( id="260", name="De Bilt", coordinates=Coordinates(latitude=52.101, longitude=5.177), elevation=2.0, station_type="Main", region="Utrecht", ), "240": WeatherStation( id="240", name="Amsterdam Schiphol", coordinates=Coordinates(latitude=52.318, longitude=4.790), elevation=-3.0, station_type="Airport", region="Noord-Holland", ), }