Skip to main content
Glama
wolkwork

KNMI Weather MCP

by wolkwork

get_location_weather

Retrieve current weather data for any location in the Netherlands, including temperature, humidity, and wind speed from KNMI weather stations.

Instructions

Get current weather data for a location

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
locationYes

Implementation Reference

  • The core handler function for the 'get_location_weather' tool, decorated with @mcp.tool() for registration. It orchestrates weather data retrieval by coordinating station refresh, coordinate lookup, nearest station selection, and raw data fetching.
    @mcp.tool()
    async def get_location_weather(location: str, ctx: Context) -> Dict[str, Any]:
        """Get current weather data for a location"""
        logger.info(f"Starting weather request for {location}")
    
        try:
            # Log each step
            logger.info("Step 1: Refreshing stations")
            await station_manager.refresh_stations(ctx)
    
            logger.info("Step 2: Getting coordinates")
            coords = await get_coordinates(location)
            logger.debug(f"Coordinates found: {coords}")
    
            # Check if coordinates are within Netherlands
            if not station_manager._validate_coordinates(coords):
                raise ValueError(
                    f"Location '{location}' ({coords.latitude}, {coords.longitude}) is outside the "
                    "Netherlands. This tool only works for locations within the Netherlands."
                )
    
            logger.info("Step 3: Finding nearest station")
            station = station_manager.find_nearest_station(coords)
            logger.info(f"Using station: {station.name} ({station.id})")
    
            logger.info("Step 4: Getting weather data")
            weather_data = await station_manager.get_raw_station_data(station.id, ctx)
    
            logger.info("Weather data retrieved successfully")
            return weather_data
    
        except Exception as e:
            logger.error(f"Error getting weather: {str(e)}")
            return f"Error: Unable to get weather data for {location}. {str(e)}"
  • Helper function called by the handler to resolve the input location string to geographic Coordinates using the Nominatim geocoding API.
    async def get_coordinates(location: str) -> Coordinates:
        """Get coordinates for a location using OpenStreetMap Nominatim"""
        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(
                    "https://nominatim.openstreetmap.org/search",
                    params={
                        "q": f"{location}, Netherlands",  # Force Netherlands search
                        "format": "json",
                        "limit": 1,
                        "countrycodes": "nl",  # Restrict to Netherlands
                    },
                    headers={"User-Agent": "KNMI_Weather_MCP/1.0"},
                )
                response.raise_for_status()
    
                results = response.json()
                if not results:
                    raise ValueError(f"Location '{location}' not found in the Netherlands")
    
                place = results[0]
                return Coordinates(latitude=float(place["lat"]), longitude=float(place["lon"]))
    
        except Exception as e:
            logger.error(f"Error getting coordinates for {location}: {str(e)}")
            raise ValueError(f"Failed to get coordinates for location: {str(e)}") from e
  • Helper method in StationManager used by the handler to find the nearest weather station to the location's coordinates using the Haversine distance formula.
    def find_nearest_station(self, coords: Coordinates) -> WeatherStation:
        """Find the nearest weather station to given coordinates"""
        if not self._stations:
            raise ValueError("No stations available. Call refresh_stations first.")
    
        def calculate_distance(station: WeatherStation) -> float:
            """Calculate distance using Haversine formula"""
            # Convert coordinates to radians
            lat1, lon1 = radians(coords.latitude), radians(coords.longitude)
            lat2, lon2 = (
                radians(station.coordinates.latitude),
                radians(station.coordinates.longitude),
            )
    
            # Haversine formula
            dlat = lat2 - lat1
            dlon = lon2 - lon1
            a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
            c = 2 * atan2(sqrt(a), sqrt(1 - a))
    
            # Earth's radius in kilometers
            R = 6371.0
    
            return R * c
    
        # Find station with minimum distance
        nearest_station = min(self._stations.values(), key=calculate_distance)
    
        logger.info(f"Found nearest station: {nearest_station.name} ({nearest_station.id})")
        return nearest_station
  • Helper method in StationManager used by the handler to fetch and parse raw weather data from the KNMI API for the selected station, returning a dictionary of measurements and metadata.
    async def get_raw_station_data(self, station_id: str, ctx: Optional[Context] = None) -> Dict[str, Any]:
        """Get raw data for a specific station using KNMI Open Data API"""
        logger.info(f"Starting data fetch for station {station_id}")
    
        async with httpx.AsyncClient() as client:
            try:
                headers = {"Authorization": self._api_key}
    
                # Get station coordinates
                station = self._stations.get(station_id)
                if not station:
                    raise ValueError(f"Station {station_id} not found")
    
                # Validate coordinates are within bounds
                if not self._validate_coordinates(station.coordinates):
                    error_msg = (
                        f"Coordinates ({station.coordinates.latitude}, {station.coordinates.longitude}) are outside "
                        "valid bounds"
                    )
                    logger.error(error_msg)
                    raise ValueError(error_msg)
    
                # List files endpoint with station filter
                list_url = f"{self.BASE_URL}/datasets/{self.DATASET_NAME}/versions/{self.DATASET_VERSION}/files"
    
                # Get the latest file for this station (sort by lastModified in descending order)
                params = {
                    "maxKeys": "1",
                    "sorting": "desc",
                    "order_by": "lastModified",
                    "station": station_id,  # Filter for specific station
                }
    
                logger.info(f"Requesting latest 10-minute data for station {station_id}")
                logger.debug(f"Query parameters: {params}")
    
                # Get the latest file metadata
                response = await client.get(list_url, headers=headers, params=params)
                response.raise_for_status()
    
                files_data = response.json()
                if not files_data.get("files"):
                    raise ValueError(f"No data files available for station {station_id}")
    
                latest_file = files_data["files"][0]
                filename = latest_file["filename"]
    
                # Get download URL for the file
                url_endpoint = (
                    f"{self.BASE_URL}/datasets/{self.DATASET_NAME}/versions/{self.DATASET_VERSION}/files/{filename}/url"
                )
                url_response = await client.get(url_endpoint, headers=headers)
                url_response.raise_for_status()
    
                download_url = url_response.json().get("temporaryDownloadUrl")
                if not download_url:
                    raise ValueError("No download URL available")
    
                # Create a temporary directory to store the NetCDF file
                with tempfile.TemporaryDirectory() as temp_dir:
                    temp_file = Path(temp_dir) / filename
    
                    # Download the file
                    logger.info(f"Downloading file: {filename}")
                    file_response = await client.get(download_url)
                    file_response.raise_for_status()
    
                    # Save the binary content
                    temp_file.write_bytes(file_response.content)
    
                    # Open and read the NetCDF file
                    logger.info("Reading NetCDF file")
                    try:
                        with xr.open_dataset(temp_file) as ds:
                            # Log the structure of the dataset
                            logger.debug(f"NetCDF structure: {ds}")
                            logger.debug(f"Available variables: {list(ds.variables)}")
                            logger.debug(f"Dimensions: {ds.dims}")
    
                            # Verify we have the correct station data
                            if "station" in ds.dims:
                                stations_in_file = ds["station"].values
                                logger.debug(f"Stations in file: {stations_in_file}")
    
                                # Convert our station_id to match KNMI format (add '06' prefix if needed)
                                knmi_station_id = f"06{station_id}" if not station_id.startswith("06") else station_id
                                logger.debug(f"Looking for station ID {knmi_station_id} (original: {station_id})")
    
                                # Convert to the same type as in the file
                                file_station_type = type(stations_in_file[0])
                                comparable_station_id = file_station_type(knmi_station_id)
    
                                if comparable_station_id not in stations_in_file:
                                    raise ValueError(
                                        f"Station {station_id} (as {knmi_station_id}) not found in file. "
                                        f"Available stations: {stations_in_file}"
                                    )
    
                                # Find the index of our station
                                station_idx = np.where(stations_in_file == comparable_station_id)[0]
                                if len(station_idx) == 0:
                                    raise ValueError(f"Could not find index for station {knmi_station_id}")
                                station_idx = station_idx[0]
                                logger.debug(f"Found station {knmi_station_id} at index {station_idx}")
                            else:
                                raise ValueError("No station dimension found in file")
    
                            # Create a dictionary to store the measurements
                            measurements = {}
    
                            # Map NetCDF variables to our model fields
                            param_mapping = {
                                "ta": "temperature",  # Air temperature (°C)
                                "rh": "relative_humidity",  # Relative humidity (%)
                                "ff": "wind_speed",  # Wind speed (m/s)
                                "dd": "wind_direction",  # Wind direction (degrees)
                                "vis": "visibility",  # Visibility (meters)
                                "pp": "air_pressure",  # Air pressure (hPa)
                                "rr": "precipitation_amount",  # Precipitation amount (mm)
                                "dr": "precipitation_duration",  # Precipitation duration (minutes)
                            }
    
                            # Extract values for each parameter
                            for nc_param, model_field in param_mapping.items():
                                if nc_param in ds.variables:
                                    var = ds[nc_param]
                                    logger.debug(f"Found variable {nc_param} with dimensions {var.dims}")
    
                                    try:
                                        # Get the value for our specific station
                                        if "station" in var.dims:
                                            # Select our station first
                                            value = var.isel(station=station_idx)
    
                                            # Then get the latest time if it exists
                                            if "time" in value.dims:
                                                value = value.isel(time=-1)
    
                                            # Handle any remaining dimensions
                                            while len(value.dims) > 0:
                                                value = value.isel({value.dims[0]: 0})
    
                                            value = float(value.values)
                                        else:
                                            logger.warning(f"Variable {nc_param} does not have a station dimension")
                                            continue
    
                                        if not np.isnan(value):
                                            measurements[model_field] = value
                                            logger.debug(f"Got {model_field} = {value} for station {station_id}")
                                    except Exception as e:
                                        logger.warning(f"Could not extract value for {nc_param}: {e}")
    
                            # Get the timestamp from the time variable if it exists
                            timestamp = latest_file.get("lastModified")
                            if "time" in ds.variables:
                                try:
                                    time_var = ds["time"]
                                    # Get the latest timestamp
                                    time_value = time_var.isel(time=-1)
                                    # If multi-dimensional, take the first value
                                    while len(time_value.dims) > 0:
                                        time_value = time_value.isel({time_value.dims[0]: 0})
                                    timestamp = pd.Timestamp(time_value.values).isoformat()
                                except Exception as e:
                                    logger.warning(f"Could not parse time variable: {e}")
                                    logger.debug(f"Time variable structure: {time_var}")
    
                            return {
                                "measurements": measurements,
                                "metadata": {
                                    "station_id": station_id,
                                    "station_name": station.name,
                                    "timestamp": timestamp,
                                    "filename": filename,
                                    "variables": list(
                                        ds.variables.keys()
                                    ),  # Add list of available variables for debugging
                                },
                            }
  • The @mcp.tool() decorator on the get_location_weather function registers it as an MCP tool with the name 'get_location_weather', using the function signature and docstring for input/output schema.
    @mcp.tool()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wolkwork/knmi-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server