Skip to main content
Glama
wolkwork

KNMI Weather MCP

by wolkwork

get_location_weather

Retrieve current weather data for any location in the Netherlands, including temperature, humidity, and wind speed from KNMI weather stations.

Instructions

Get current weather data for a location

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
locationYes

Implementation Reference

  • The core handler function for the 'get_location_weather' tool, decorated with @mcp.tool() for registration. It orchestrates weather data retrieval by coordinating station refresh, coordinate lookup, nearest station selection, and raw data fetching.
    @mcp.tool()
    async def get_location_weather(location: str, ctx: Context) -> Dict[str, Any]:
        """Get current weather data for a location"""
        logger.info(f"Starting weather request for {location}")
    
        try:
            # Log each step
            logger.info("Step 1: Refreshing stations")
            await station_manager.refresh_stations(ctx)
    
            logger.info("Step 2: Getting coordinates")
            coords = await get_coordinates(location)
            logger.debug(f"Coordinates found: {coords}")
    
            # Check if coordinates are within Netherlands
            if not station_manager._validate_coordinates(coords):
                raise ValueError(
                    f"Location '{location}' ({coords.latitude}, {coords.longitude}) is outside the "
                    "Netherlands. This tool only works for locations within the Netherlands."
                )
    
            logger.info("Step 3: Finding nearest station")
            station = station_manager.find_nearest_station(coords)
            logger.info(f"Using station: {station.name} ({station.id})")
    
            logger.info("Step 4: Getting weather data")
            weather_data = await station_manager.get_raw_station_data(station.id, ctx)
    
            logger.info("Weather data retrieved successfully")
            return weather_data
    
        except Exception as e:
            logger.error(f"Error getting weather: {str(e)}")
            return f"Error: Unable to get weather data for {location}. {str(e)}"
  • Helper function called by the handler to resolve the input location string to geographic Coordinates using the Nominatim geocoding API.
    async def get_coordinates(location: str) -> Coordinates:
        """Get coordinates for a location using OpenStreetMap Nominatim"""
        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(
                    "https://nominatim.openstreetmap.org/search",
                    params={
                        "q": f"{location}, Netherlands",  # Force Netherlands search
                        "format": "json",
                        "limit": 1,
                        "countrycodes": "nl",  # Restrict to Netherlands
                    },
                    headers={"User-Agent": "KNMI_Weather_MCP/1.0"},
                )
                response.raise_for_status()
    
                results = response.json()
                if not results:
                    raise ValueError(f"Location '{location}' not found in the Netherlands")
    
                place = results[0]
                return Coordinates(latitude=float(place["lat"]), longitude=float(place["lon"]))
    
        except Exception as e:
            logger.error(f"Error getting coordinates for {location}: {str(e)}")
            raise ValueError(f"Failed to get coordinates for location: {str(e)}") from e
  • Helper method in StationManager used by the handler to find the nearest weather station to the location's coordinates using the Haversine distance formula.
    def find_nearest_station(self, coords: Coordinates) -> WeatherStation:
        """Find the nearest weather station to given coordinates"""
        if not self._stations:
            raise ValueError("No stations available. Call refresh_stations first.")
    
        def calculate_distance(station: WeatherStation) -> float:
            """Calculate distance using Haversine formula"""
            # Convert coordinates to radians
            lat1, lon1 = radians(coords.latitude), radians(coords.longitude)
            lat2, lon2 = (
                radians(station.coordinates.latitude),
                radians(station.coordinates.longitude),
            )
    
            # Haversine formula
            dlat = lat2 - lat1
            dlon = lon2 - lon1
            a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
            c = 2 * atan2(sqrt(a), sqrt(1 - a))
    
            # Earth's radius in kilometers
            R = 6371.0
    
            return R * c
    
        # Find station with minimum distance
        nearest_station = min(self._stations.values(), key=calculate_distance)
    
        logger.info(f"Found nearest station: {nearest_station.name} ({nearest_station.id})")
        return nearest_station
  • Helper method in StationManager used by the handler to fetch and parse raw weather data from the KNMI API for the selected station, returning a dictionary of measurements and metadata.
    async def get_raw_station_data(self, station_id: str, ctx: Optional[Context] = None) -> Dict[str, Any]:
        """Get raw data for a specific station using KNMI Open Data API"""
        logger.info(f"Starting data fetch for station {station_id}")
    
        async with httpx.AsyncClient() as client:
            try:
                headers = {"Authorization": self._api_key}
    
                # Get station coordinates
                station = self._stations.get(station_id)
                if not station:
                    raise ValueError(f"Station {station_id} not found")
    
                # Validate coordinates are within bounds
                if not self._validate_coordinates(station.coordinates):
                    error_msg = (
                        f"Coordinates ({station.coordinates.latitude}, {station.coordinates.longitude}) are outside "
                        "valid bounds"
                    )
                    logger.error(error_msg)
                    raise ValueError(error_msg)
    
                # List files endpoint with station filter
                list_url = f"{self.BASE_URL}/datasets/{self.DATASET_NAME}/versions/{self.DATASET_VERSION}/files"
    
                # Get the latest file for this station (sort by lastModified in descending order)
                params = {
                    "maxKeys": "1",
                    "sorting": "desc",
                    "order_by": "lastModified",
                    "station": station_id,  # Filter for specific station
                }
    
                logger.info(f"Requesting latest 10-minute data for station {station_id}")
                logger.debug(f"Query parameters: {params}")
    
                # Get the latest file metadata
                response = await client.get(list_url, headers=headers, params=params)
                response.raise_for_status()
    
                files_data = response.json()
                if not files_data.get("files"):
                    raise ValueError(f"No data files available for station {station_id}")
    
                latest_file = files_data["files"][0]
                filename = latest_file["filename"]
    
                # Get download URL for the file
                url_endpoint = (
                    f"{self.BASE_URL}/datasets/{self.DATASET_NAME}/versions/{self.DATASET_VERSION}/files/{filename}/url"
                )
                url_response = await client.get(url_endpoint, headers=headers)
                url_response.raise_for_status()
    
                download_url = url_response.json().get("temporaryDownloadUrl")
                if not download_url:
                    raise ValueError("No download URL available")
    
                # Create a temporary directory to store the NetCDF file
                with tempfile.TemporaryDirectory() as temp_dir:
                    temp_file = Path(temp_dir) / filename
    
                    # Download the file
                    logger.info(f"Downloading file: {filename}")
                    file_response = await client.get(download_url)
                    file_response.raise_for_status()
    
                    # Save the binary content
                    temp_file.write_bytes(file_response.content)
    
                    # Open and read the NetCDF file
                    logger.info("Reading NetCDF file")
                    try:
                        with xr.open_dataset(temp_file) as ds:
                            # Log the structure of the dataset
                            logger.debug(f"NetCDF structure: {ds}")
                            logger.debug(f"Available variables: {list(ds.variables)}")
                            logger.debug(f"Dimensions: {ds.dims}")
    
                            # Verify we have the correct station data
                            if "station" in ds.dims:
                                stations_in_file = ds["station"].values
                                logger.debug(f"Stations in file: {stations_in_file}")
    
                                # Convert our station_id to match KNMI format (add '06' prefix if needed)
                                knmi_station_id = f"06{station_id}" if not station_id.startswith("06") else station_id
                                logger.debug(f"Looking for station ID {knmi_station_id} (original: {station_id})")
    
                                # Convert to the same type as in the file
                                file_station_type = type(stations_in_file[0])
                                comparable_station_id = file_station_type(knmi_station_id)
    
                                if comparable_station_id not in stations_in_file:
                                    raise ValueError(
                                        f"Station {station_id} (as {knmi_station_id}) not found in file. "
                                        f"Available stations: {stations_in_file}"
                                    )
    
                                # Find the index of our station
                                station_idx = np.where(stations_in_file == comparable_station_id)[0]
                                if len(station_idx) == 0:
                                    raise ValueError(f"Could not find index for station {knmi_station_id}")
                                station_idx = station_idx[0]
                                logger.debug(f"Found station {knmi_station_id} at index {station_idx}")
                            else:
                                raise ValueError("No station dimension found in file")
    
                            # Create a dictionary to store the measurements
                            measurements = {}
    
                            # Map NetCDF variables to our model fields
                            param_mapping = {
                                "ta": "temperature",  # Air temperature (°C)
                                "rh": "relative_humidity",  # Relative humidity (%)
                                "ff": "wind_speed",  # Wind speed (m/s)
                                "dd": "wind_direction",  # Wind direction (degrees)
                                "vis": "visibility",  # Visibility (meters)
                                "pp": "air_pressure",  # Air pressure (hPa)
                                "rr": "precipitation_amount",  # Precipitation amount (mm)
                                "dr": "precipitation_duration",  # Precipitation duration (minutes)
                            }
    
                            # Extract values for each parameter
                            for nc_param, model_field in param_mapping.items():
                                if nc_param in ds.variables:
                                    var = ds[nc_param]
                                    logger.debug(f"Found variable {nc_param} with dimensions {var.dims}")
    
                                    try:
                                        # Get the value for our specific station
                                        if "station" in var.dims:
                                            # Select our station first
                                            value = var.isel(station=station_idx)
    
                                            # Then get the latest time if it exists
                                            if "time" in value.dims:
                                                value = value.isel(time=-1)
    
                                            # Handle any remaining dimensions
                                            while len(value.dims) > 0:
                                                value = value.isel({value.dims[0]: 0})
    
                                            value = float(value.values)
                                        else:
                                            logger.warning(f"Variable {nc_param} does not have a station dimension")
                                            continue
    
                                        if not np.isnan(value):
                                            measurements[model_field] = value
                                            logger.debug(f"Got {model_field} = {value} for station {station_id}")
                                    except Exception as e:
                                        logger.warning(f"Could not extract value for {nc_param}: {e}")
    
                            # Get the timestamp from the time variable if it exists
                            timestamp = latest_file.get("lastModified")
                            if "time" in ds.variables:
                                try:
                                    time_var = ds["time"]
                                    # Get the latest timestamp
                                    time_value = time_var.isel(time=-1)
                                    # If multi-dimensional, take the first value
                                    while len(time_value.dims) > 0:
                                        time_value = time_value.isel({time_value.dims[0]: 0})
                                    timestamp = pd.Timestamp(time_value.values).isoformat()
                                except Exception as e:
                                    logger.warning(f"Could not parse time variable: {e}")
                                    logger.debug(f"Time variable structure: {time_var}")
    
                            return {
                                "measurements": measurements,
                                "metadata": {
                                    "station_id": station_id,
                                    "station_name": station.name,
                                    "timestamp": timestamp,
                                    "filename": filename,
                                    "variables": list(
                                        ds.variables.keys()
                                    ),  # Add list of available variables for debugging
                                },
                            }
  • The @mcp.tool() decorator on the get_location_weather function registers it as an MCP tool with the name 'get_location_weather', using the function signature and docstring for input/output schema.
    @mcp.tool()

Tool Definition Quality

Score is being calculated. Check back soon.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wolkwork/knmi-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server