get_location_weather
Retrieve current weather data for a location in the Netherlands, including temperature, humidity, and wind speed from KNMI stations.
Instructions
Get current weather data for a location
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| location | Yes |
Implementation Reference
- src/knmi_weather_mcp/server.py:79-112 (handler)The main handler function for the get_location_weather tool. It refreshes stations, gets coordinates via Nominatim, validates they're in the Netherlands, finds the nearest KNMI station, and fetches raw weather data.
@mcp.tool() async def get_location_weather(location: str, ctx: Context) -> Dict[str, Any]: """Get current weather data for a location""" logger.info(f"Starting weather request for {location}") try: # Log each step logger.info("Step 1: Refreshing stations") await station_manager.refresh_stations(ctx) logger.info("Step 2: Getting coordinates") coords = await get_coordinates(location) logger.debug(f"Coordinates found: {coords}") # Check if coordinates are within Netherlands if not station_manager._validate_coordinates(coords): raise ValueError( f"Location '{location}' ({coords.latitude}, {coords.longitude}) is outside the " "Netherlands. This tool only works for locations within the Netherlands." ) logger.info("Step 3: Finding nearest station") station = station_manager.find_nearest_station(coords) logger.info(f"Using station: {station.name} ({station.id})") logger.info("Step 4: Getting weather data") weather_data = await station_manager.get_raw_station_data(station.id, ctx) logger.info("Weather data retrieved successfully") return weather_data except Exception as e: logger.error(f"Error getting weather: {str(e)}") return f"Error: Unable to get weather data for {location}. {str(e)}" - src/knmi_weather_mcp/server.py:79-79 (registration)The tool is registered via the @mcp.tool() decorator on the FastMCP instance created earlier in the file.
@mcp.tool() - src/knmi_weather_mcp/models.py:7-46 (schema)Pydantic models defining the input/output schemas used by the tool: Coordinates (lat/lon validation), WeatherStation, WeatherData, and StationData.
class Coordinates(BaseModel): """Geographic coordinates""" latitude: float = Field(..., ge=-90, le=90) longitude: float = Field(..., ge=-180, le=180) class WeatherStation(BaseModel): """KNMI weather station""" id: str name: str coordinates: Coordinates elevation: float station_type: Optional[str] = None region: Optional[str] = None class WeatherData(BaseModel): """Weather measurement data""" temperature: float humidity: float timestamp: datetime station_name: str station_id: str wind_speed: Optional[float] = None wind_direction: Optional[float] = None precipitation: Optional[float] = None visibility: Optional[float] = None pressure: Optional[float] = None class StationData(BaseModel): """Raw station measurement data""" measurements: Dict[str, float] metadata: Dict[str, str] timestamp: datetime - Helper function that geocodes a location name to Coordinates using OpenStreetMap Nominatim API, restricting search to the Netherlands.
async def get_coordinates(location: str) -> Coordinates: """Get coordinates for a location using OpenStreetMap Nominatim""" try: async with httpx.AsyncClient() as client: response = await client.get( "https://nominatim.openstreetmap.org/search", params={ "q": f"{location}, Netherlands", # Force Netherlands search "format": "json", "limit": 1, "countrycodes": "nl", # Restrict to Netherlands }, headers={"User-Agent": "KNMI_Weather_MCP/1.0"}, ) response.raise_for_status() results = response.json() if not results: raise ValueError(f"Location '{location}' not found in the Netherlands") place = results[0] return Coordinates(latitude=float(place["lat"]), longitude=float(place["lon"])) except Exception as e: logger.error(f"Error getting coordinates for {location}: {str(e)}") raise ValueError(f"Failed to get coordinates for location: {str(e)}") from e - StationManager class providing helper methods used by the tool: refresh_stations(), find_nearest_station(), _validate_coordinates(), and get_raw_station_data().
class StationManager: """Manages KNMI weather stations""" _instance = None _initialized = False # API endpoints BASE_URL = "https://api.dataplatform.knmi.nl/open-data/v1" DATASET_NAME = "Actuele10mindataKNMIstations" DATASET_VERSION = "2" # Netherlands bounding box (mainland Netherlands) NL_BOUNDS = { "min_lat": 50.7, # Southernmost point "max_lat": 53.7, # Northernmost point "min_lon": 3.3, # Westernmost point "max_lon": 7.2, # Easternmost point } # Parameter mapping for 10-minute data PARAMETER_MAPPING = { "T": "temperature", # Air temperature "RH": "relative_humidity", # Relative humidity "FF": "wind_speed", # 10-min mean wind speed "DD": "wind_direction", # Wind direction "VIS": "visibility", # Visibility "P": "air_pressure", # Air pressure "RR": "precipitation_amount", # Precipitation amount "DR": "precipitation_duration", # Precipitation duration } def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if not StationManager._initialized: self._stations: Dict[str, WeatherStation] = {} self._last_update: Optional[datetime] = None self._lock = asyncio.Lock() self._api_key = os.getenv("KNMI_API_KEY") if not self._api_key: logger.error("KNMI_API_KEY environment variable is missing") raise ValueError("KNMI_API_KEY environment variable is required") # Strip "Bearer" prefix if it exists in the env var self._api_key = self._api_key.replace("Bearer ", "") logger.info("StationManager initialized successfully") StationManager._initialized = True @property def stations(self) -> Dict[str, WeatherStation]: return self._stations async def refresh_stations(self, ctx: Optional[Context] = None) -> None: """Fetch current stations from KNMI Open Data API""" async with self._lock: try: async with httpx.AsyncClient() as client: headers = {"Authorization": self._api_key} # Get the latest file to extract station information list_url = f"{self.BASE_URL}/datasets/{self.DATASET_NAME}/versions/{self.DATASET_VERSION}/files" params = {"maxKeys": "1", "sorting": "desc", "order_by": "lastModified"} logger.info("Fetching latest file for station information") response = await client.get(list_url, headers=headers, params=params) response.raise_for_status() files_data = response.json() if not files_data.get("files"): raise ValueError("No data files available") latest_file = files_data["files"][0] filename = latest_file["filename"] # Get download URL for the file url_endpoint = f"{self.BASE_URL}/datasets/{self.DATASET_NAME}/versions/{self.DATASET_VERSION}/files/{filename}/url" # noqa: E501 url_response = await client.get(url_endpoint, headers=headers) url_response.raise_for_status() download_url = url_response.json().get("temporaryDownloadUrl") if not download_url: raise ValueError("No download URL available") # Download and read the NetCDF file to get station information with tempfile.TemporaryDirectory() as temp_dir: temp_file = Path(temp_dir) / filename file_response = await client.get(download_url) file_response.raise_for_status() temp_file.write_bytes(file_response.content) with xr.open_dataset(temp_file) as ds: new_stations = self._parse_stations(ds) if not new_stations: logger.warning("No valid stations found in Netherlands, using fallback stations") new_stations = FALLBACK_STATIONS self._stations = new_stations self._last_update = datetime.now() logger.info(f"Successfully loaded {len(self._stations)} stations") except Exception as e: logger.error(f"Failed to refresh stations: {str(e)}") if not self._stations: self._stations = FALLBACK_STATIONS def _parse_stations(self, ds: xr.Dataset) -> Dict[str, WeatherStation]: """Parse station information from NetCDF file""" new_stations = {} # Get station IDs and coordinates if "station" in ds.dims: station_ids = ds["station"].values lats = ds["lat"].values lons = ds["lon"].values # Log the raw data found logger.debug(f"Raw station IDs found: {station_ids}") logger.debug(f"Raw latitudes found: {lats}") logger.debug(f"Raw longitudes found: {lons}") logger.debug(f"Dataset structure: {ds}") logger.debug(f"Dataset dimensions: {ds.dims}") logger.debug(f"Dataset variables: {list(ds.variables)}") logger.debug(f"Dataset attributes: {ds.attrs}") # Convert station IDs to our format (remove '06' prefix) station_ids = [str(sid).replace("06", "") if str(sid).startswith("06") else str(sid) for sid in station_ids] logger.debug(f"Converted station IDs: {station_ids}") # Try to get station names from the dataset station_names = None name_variables = [ "name", "station_name", "stn_name", "stationname", "station_names", "names", "stn_names", "stationnames", "NAMES", "NAME", "STN_NAME", "STATION_NAME", ] for name_var in name_variables: if name_var in ds.variables: try: station_names = ds[name_var].values logger.info(f"Found station names in variable: {name_var}") logger.debug(f"Raw station names: {station_names}") break except Exception as e: logger.debug(f"Could not read station names from {name_var}: {e}") if station_names is None: # Try to find station names in dataset attributes for attr_name in ds.attrs: if "name" in attr_name.lower() or "station" in attr_name.lower(): logger.debug(f"Found potential station name attribute: {attr_name}") try: attr_value = ds.attrs[attr_name] if isinstance(attr_value, (str, bytes)): logger.info(f"Found station names in attribute: {attr_name}") station_names = [attr_value] elif isinstance(attr_value, (list, np.ndarray)): logger.info(f"Found station names array in attribute: {attr_name}") station_names = attr_value break except Exception as e: logger.debug(f"Could not read station names from attribute {attr_name}: {e}") logger.info(f"Found {len(station_ids)} stations in NetCDF file") logger.debug(f"Available dimensions: {ds.dims}") logger.debug(f"Available variables: {list(ds.variables)}") for i, station_id in enumerate(station_ids): station_id = str(station_id) coords = Coordinates(latitude=float(lats[i]), longitude=float(lons[i])) # Get station name from dataset station_name = None if station_names is not None: try: name = station_names[i] if i < len(station_names) else None if isinstance(name, bytes): name = name.decode("utf-8") if isinstance(name, str) and name.strip(): station_name = name.strip() logger.debug(f"Found name '{station_name}' for station {station_id}") except Exception as e: logger.debug(f"Could not decode station name for {station_id}: {e}") if not station_name: station_name = f"Station {station_id}" logger.debug(f"Using default name '{station_name}' for station {station_id}") # Log coordinates before validation logger.debug(f"Station {station_id} coordinates: lat={coords.latitude}, lon={coords.longitude}") # Only add station if coordinates are within mainland Netherlands if self._validate_coordinates(coords): new_stations[station_id] = WeatherStation( id=station_id, name=station_name, coordinates=coords, elevation=0.0, # Could be extracted if available station_type="Weather", region="Netherlands", ) logger.debug(f"Added station {station_id}: {new_stations[station_id]}") else: logger.debug(f"Station {station_id} coordinates outside Netherlands bounds") return new_stations def find_nearest_station(self, coords: Coordinates) -> WeatherStation: """Find the nearest weather station to given coordinates""" if not self._stations: raise ValueError("No stations available. Call refresh_stations first.") def calculate_distance(station: WeatherStation) -> float: """Calculate distance using Haversine formula""" # Convert coordinates to radians lat1, lon1 = radians(coords.latitude), radians(coords.longitude) lat2, lon2 = ( radians(station.coordinates.latitude), radians(station.coordinates.longitude), ) # Haversine formula dlat = lat2 - lat1 dlon = lon2 - lon1 a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2 c = 2 * atan2(sqrt(a), sqrt(1 - a)) # Earth's radius in kilometers R = 6371.0 return R * c # Find station with minimum distance nearest_station = min(self._stations.values(), key=calculate_distance) logger.info(f"Found nearest station: {nearest_station.name} ({nearest_station.id})") return nearest_station def _validate_coordinates(self, coords: Coordinates) -> bool: """Check if coordinates are within the Netherlands""" return ( self.NL_BOUNDS["min_lat"] <= coords.latitude <= self.NL_BOUNDS["max_lat"] and self.NL_BOUNDS["min_lon"] <= coords.longitude <= self.NL_BOUNDS["max_lon"] ) async def get_raw_station_data(self, station_id: str, ctx: Optional[Context] = None) -> Dict[str, Any]: """Get raw data for a specific station using KNMI Open Data API""" logger.info(f"Starting data fetch for station {station_id}") async with httpx.AsyncClient() as client: try: headers = {"Authorization": self._api_key} # Get station coordinates station = self._stations.get(station_id) if not station: raise ValueError(f"Station {station_id} not found") # Validate coordinates are within bounds if not self._validate_coordinates(station.coordinates): error_msg = ( f"Coordinates ({station.coordinates.latitude}, {station.coordinates.longitude}) are outside " "valid bounds" ) logger.error(error_msg) raise ValueError(error_msg) # List files endpoint with station filter list_url = f"{self.BASE_URL}/datasets/{self.DATASET_NAME}/versions/{self.DATASET_VERSION}/files" # Get the latest file for this station (sort by lastModified in descending order) params = { "maxKeys": "1", "sorting": "desc", "order_by": "lastModified", "station": station_id, # Filter for specific station } logger.info(f"Requesting latest 10-minute data for station {station_id}") logger.debug(f"Query parameters: {params}") # Get the latest file metadata response = await client.get(list_url, headers=headers, params=params) response.raise_for_status() files_data = response.json() if not files_data.get("files"): raise ValueError(f"No data files available for station {station_id}") latest_file = files_data["files"][0] filename = latest_file["filename"] # Get download URL for the file url_endpoint = ( f"{self.BASE_URL}/datasets/{self.DATASET_NAME}/versions/{self.DATASET_VERSION}/files/{filename}/url" ) url_response = await client.get(url_endpoint, headers=headers) url_response.raise_for_status() download_url = url_response.json().get("temporaryDownloadUrl") if not download_url: raise ValueError("No download URL available") # Create a temporary directory to store the NetCDF file with tempfile.TemporaryDirectory() as temp_dir: temp_file = Path(temp_dir) / filename # Download the file logger.info(f"Downloading file: {filename}") file_response = await client.get(download_url) file_response.raise_for_status() # Save the binary content temp_file.write_bytes(file_response.content) # Open and read the NetCDF file logger.info("Reading NetCDF file") try: with xr.open_dataset(temp_file) as ds: # Log the structure of the dataset logger.debug(f"NetCDF structure: {ds}") logger.debug(f"Available variables: {list(ds.variables)}") logger.debug(f"Dimensions: {ds.dims}") # Verify we have the correct station data if "station" in ds.dims: stations_in_file = ds["station"].values logger.debug(f"Stations in file: {stations_in_file}") # Convert our station_id to match KNMI format (add '06' prefix if needed) knmi_station_id = f"06{station_id}" if not station_id.startswith("06") else station_id logger.debug(f"Looking for station ID {knmi_station_id} (original: {station_id})") # Convert to the same type as in the file file_station_type = type(stations_in_file[0]) comparable_station_id = file_station_type(knmi_station_id) if comparable_station_id not in stations_in_file: raise ValueError( f"Station {station_id} (as {knmi_station_id}) not found in file. " f"Available stations: {stations_in_file}" ) # Find the index of our station station_idx = np.where(stations_in_file == comparable_station_id)[0] if len(station_idx) == 0: raise ValueError(f"Could not find index for station {knmi_station_id}") station_idx = station_idx[0] logger.debug(f"Found station {knmi_station_id} at index {station_idx}") else: raise ValueError("No station dimension found in file") # Create a dictionary to store the measurements measurements = {} # Map NetCDF variables to our model fields param_mapping = { "ta": "temperature", # Air temperature (°C) "rh": "relative_humidity", # Relative humidity (%) "ff": "wind_speed", # Wind speed (m/s) "dd": "wind_direction", # Wind direction (degrees) "vis": "visibility", # Visibility (meters) "pp": "air_pressure", # Air pressure (hPa) "rr": "precipitation_amount", # Precipitation amount (mm) "dr": "precipitation_duration", # Precipitation duration (minutes) } # Extract values for each parameter for nc_param, model_field in param_mapping.items(): if nc_param in ds.variables: var = ds[nc_param] logger.debug(f"Found variable {nc_param} with dimensions {var.dims}") try: # Get the value for our specific station if "station" in var.dims: # Select our station first value = var.isel(station=station_idx) # Then get the latest time if it exists if "time" in value.dims: value = value.isel(time=-1) # Handle any remaining dimensions while len(value.dims) > 0: value = value.isel({value.dims[0]: 0}) value = float(value.values) else: logger.warning(f"Variable {nc_param} does not have a station dimension") continue if not np.isnan(value): measurements[model_field] = value logger.debug(f"Got {model_field} = {value} for station {station_id}") except Exception as e: logger.warning(f"Could not extract value for {nc_param}: {e}") # Get the timestamp from the time variable if it exists timestamp = latest_file.get("lastModified") if "time" in ds.variables: try: time_var = ds["time"] # Get the latest timestamp time_value = time_var.isel(time=-1) # If multi-dimensional, take the first value while len(time_value.dims) > 0: time_value = time_value.isel({time_value.dims[0]: 0}) timestamp = pd.Timestamp(time_value.values).isoformat() except Exception as e: logger.warning(f"Could not parse time variable: {e}") logger.debug(f"Time variable structure: {time_var}") return { "measurements": measurements, "metadata": { "station_id": station_id, "station_name": station.name, "timestamp": timestamp, "filename": filename, "variables": list( ds.variables.keys() ), # Add list of available variables for debugging }, } except Exception as e: logger.error(f"Error reading NetCDF file: {e}") # Try to read the file content to debug logger.debug(f"File content (first 1000 bytes): {file_response.content[:1000]}") raise ValueError(f"Failed to read NetCDF file: {str(e)}") from e except Exception as e: logger.error(f"Error in get_raw_station_data: {str(e)}") if isinstance(e, httpx.HTTPError): logger.error(f"HTTP Error details: {str(e)}") logger.error(f"Request URL: {e.request.url if e.request else 'Unknown'}") raise # Default fallback stations FALLBACK_STATIONS = { "260": WeatherStation( id="260", name="De Bilt", coordinates=Coordinates(latitude=52.101, longitude=5.177), elevation=2.0, station_type="Main", region="Utrecht", ), "240": WeatherStation( id="240", name="Amsterdam Schiphol", coordinates=Coordinates(latitude=52.318, longitude=4.790), elevation=-3.0, station_type="Airport", region="Noord-Holland", ), }