from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Literal
import httpx
from pydantic import BaseModel, Field
# NOTE ABOUT AUTHENTICATION
# QWeather supports both JWT (Authorization: Bearer ...) and API KEY (X-QW-Api-Key).
# This project uses API KEY mode because it is simple to configure via environment variables
# and does not require generating short-lived JWTs on the server.
class QWeatherError(Exception):
"""Base error type for QWeather client failures."""
class QWeatherNetworkError(QWeatherError):
"""Network failure while talking to QWeather (timeout, DNS, connection reset, ...)."""
@dataclass(frozen=True)
class QWeatherAPIError(QWeatherError):
"""QWeather returned a non-success business code.
QWeather APIs typically return HTTP 200 with a JSON payload containing a string `code`.
Therefore we must check both HTTP status and payload `code`.
"""
code: str
message: str
class _QWeatherNow(BaseModel):
obsTime: datetime
temp: str
text: str
humidity: str
windSpeed: str
windDir: str
class QWeatherNowResponse(BaseModel):
code: str
updateTime: datetime | None = None
now: _QWeatherNow | None = None
class _QWeatherGeoLocation(BaseModel):
name: str
id: str
lat: str | None = None
lon: str | None = None
class QWeatherGeoLookupResponse(BaseModel):
code: str
location: list[_QWeatherGeoLocation] = Field(default_factory=list)
class _QWeatherDaily(BaseModel):
fxDate: str
tempMax: str
tempMin: str
textDay: str
textNight: str
humidity: str
windSpeedDay: str
windDirDay: str
class QWeatherDailyForecastResponse(BaseModel):
code: str
updateTime: datetime | None = None
daily: list[_QWeatherDaily] = Field(default_factory=list)
class _QWeatherHourly(BaseModel):
fxTime: datetime
temp: str
text: str
windSpeed: str
windDir: str
humidity: str
class QWeatherHourlyForecastResponse(BaseModel):
code: str
updateTime: datetime | None = None
hourly: list[_QWeatherHourly] = Field(default_factory=list)
class _QWeatherWarning(BaseModel):
typeName: str
severity: str | None = None
severityColor: str | None = None
title: str
text: str
startTime: datetime | None = None
endTime: datetime | None = None
class QWeatherWarningNowResponse(BaseModel):
code: str
updateTime: datetime | None = None
warning: list[_QWeatherWarning] = Field(default_factory=list)
Unit = Literal["m", "i"]
def _looks_like_coordinates(value: str) -> bool:
parts = [p.strip() for p in value.split(",")]
if len(parts) != 2:
return False
try:
float(parts[0])
float(parts[1])
except ValueError:
return False
return True
def _looks_like_location_id(value: str) -> bool:
# LocationID is usually a numeric string like 101010100.
return value.strip().isdigit()
def _parse_lon_lat(value: str) -> tuple[float, float]:
parts = [p.strip() for p in value.split(",")]
if len(parts) != 2:
raise ValueError("invalid coordinates format")
lon = float(parts[0])
lat = float(parts[1])
return lon, lat
class QWeatherClient:
"""A minimal async HTTP client for QWeather Web API.
Why we handle parameters this way:
- `location` supports multiple formats in QWeather's spec (LocationID / lon,lat).
- Additionally, developers often have *city names* ("北京", "Beijing").
The `/v7/weather/now` endpoint does not guarantee it can resolve free-form names in all
cases, so we optionally resolve names via GeoAPI (`/geo/v2/city/lookup`) and then query
weather by returned LocationID.
Security notes:
- API Host and API Key MUST be provided via environment variables (see Settings).
- Do not hardcode secrets into source code or commit them into Git.
"""
def __init__(
self,
*,
api_host: str,
api_key: str,
timeout_seconds: float = 5.0,
transport: httpx.AsyncBaseTransport | None = None,
) -> None:
api_host = api_host.strip().rstrip("/")
if not api_host:
raise ValueError("api_host is required")
if not api_key:
raise ValueError("api_key is required")
self._api_host = api_host
self._api_key = api_key
self._client = httpx.AsyncClient(
base_url=self._api_host,
timeout=httpx.Timeout(timeout_seconds),
headers={"X-QW-Api-Key": self._api_key},
transport=transport,
)
async def __aenter__(self) -> "QWeatherClient":
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: object | None,
) -> None:
await self.aclose()
async def aclose(self) -> None:
await self._client.aclose()
async def geo_lookup_city(self, *, location: str, lang: str = "zh") -> _QWeatherGeoLocation:
"""Resolve a city name into a QWeather LocationID.
We only return the top-1 match for now (number=1) to keep behavior deterministic.
"""
try:
resp = await self._client.get(
"/geo/v2/city/lookup",
params={"location": location, "lang": lang, "number": 1},
)
except httpx.HTTPError as e:
raise QWeatherNetworkError(str(e)) from e
if resp.status_code >= 400:
raise QWeatherAPIError(
code=str(resp.status_code), message="geo api http error")
payload: dict[str, Any] = resp.json()
data = QWeatherGeoLookupResponse.model_validate(payload)
if data.code != "200":
raise QWeatherAPIError(code=data.code, message="geo lookup failed")
if not data.location:
raise QWeatherAPIError(code="404", message="location not found")
return data.location[0]
async def resolve_location(
self,
*,
location: str,
geo_lang: str = "en",
resolve_city_name: bool = True,
) -> tuple[str, str, _QWeatherGeoLocation | None]:
"""Resolve a user-provided `location` into the best upstream query value.
Returns:
- display_location: best-effort canonical location name (for MCP output)
- query_location: value used in QWeather v7 weather API (LocationID or lon,lat)
- geo: GeoAPI lookup result if performed
"""
location = location.strip()
if not location:
raise ValueError("location is required")
# Coordinates can be used directly in QWeather weather APIs.
if _looks_like_coordinates(location):
return location, location, None
# For LocationID, try GeoAPI once to standardize name.
if _looks_like_location_id(location):
try:
geo = await self.geo_lookup_city(location=location, lang=geo_lang)
return geo.name, location, geo
except QWeatherError:
return location, location, None
if not resolve_city_name:
return location, location, None
geo = await self.geo_lookup_city(location=location, lang=geo_lang)
return geo.name, geo.id, geo
async def get_weather_now(
self,
*,
location: str,
lang: str = "zh",
geo_lang: str = "en",
unit: Unit = "m",
resolve_city_name: bool = True,
) -> tuple[str, dict[str, Any]]:
"""Call `/v7/weather/now`.
Returns:
- a display location name (best-effort)
- raw JSON payload (for debugging / traceability)
"""
location = location.strip()
if not location:
raise ValueError("location is required")
display_location = location
query_location = location
# Why use `geo_lang`:
# - QWeather weather API returns no canonical location name.
# - We want `location` in our MCP output to be a stable/standardized display name
# (e.g. "Beijing") while still allowing weather text to be localized (e.g. "多云").
display_location, query_location, _ = await self.resolve_location(
location=location,
geo_lang=geo_lang,
resolve_city_name=resolve_city_name,
)
try:
resp = await self._client.get(
"/v7/weather/now",
params={
"location": query_location,
"lang": lang,
"unit": unit,
},
)
except httpx.HTTPError as e:
raise QWeatherNetworkError(str(e)) from e
if resp.status_code >= 400:
raise QWeatherAPIError(
code=str(resp.status_code), message="weather api http error")
raw: dict[str, Any] = resp.json()
parsed = QWeatherNowResponse.model_validate(raw)
if parsed.code != "200":
# QWeather uses string codes in the JSON body; preserve it for callers.
raise QWeatherAPIError(
code=parsed.code, message="weather api returned error")
if not parsed.now:
raise QWeatherAPIError(
code="500", message="missing now in response")
return display_location, raw
async def get_forecast_daily(
self,
*,
location: str,
days: int,
lang: str = "zh",
geo_lang: str = "en",
unit: Unit = "m",
resolve_city_name: bool = True,
) -> tuple[str, dict[str, Any]]:
if days not in {3, 7, 10, 15}:
raise ValueError("days must be one of 3/7/10/15")
display_location, query_location, _ = await self.resolve_location(
location=location,
geo_lang=geo_lang,
resolve_city_name=resolve_city_name,
)
path_days = f"{days}d"
try:
resp = await self._client.get(
f"/v7/weather/{path_days}",
params={"location": query_location,
"lang": lang, "unit": unit},
)
except httpx.HTTPError as e:
raise QWeatherNetworkError(str(e)) from e
if resp.status_code >= 400:
raise QWeatherAPIError(
code=str(resp.status_code), message="daily api http error")
raw: dict[str, Any] = resp.json()
parsed = QWeatherDailyForecastResponse.model_validate(raw)
if parsed.code != "200":
raise QWeatherAPIError(
code=parsed.code, message="daily api returned error")
return display_location, raw
async def get_forecast_hourly(
self,
*,
location: str,
hours: int,
lang: str = "zh",
geo_lang: str = "en",
unit: Unit = "m",
resolve_city_name: bool = True,
) -> tuple[str, dict[str, Any]]:
if hours not in {24, 72, 168}:
raise ValueError("hours must be one of 24/72/168")
display_location, query_location, _ = await self.resolve_location(
location=location,
geo_lang=geo_lang,
resolve_city_name=resolve_city_name,
)
path_hours = f"{hours}h"
try:
resp = await self._client.get(
f"/v7/weather/{path_hours}",
params={"location": query_location,
"lang": lang, "unit": unit},
)
except httpx.HTTPError as e:
raise QWeatherNetworkError(str(e)) from e
if resp.status_code >= 400:
raise QWeatherAPIError(
code=str(resp.status_code), message="hourly api http error")
raw: dict[str, Any] = resp.json()
parsed = QWeatherHourlyForecastResponse.model_validate(raw)
if parsed.code != "200":
raise QWeatherAPIError(
code=parsed.code, message="hourly api returned error")
return display_location, raw
async def get_weather_alerts(
self,
*,
location: str,
lang: str = "zh",
geo_lang: str = "en",
resolve_city_name: bool = True,
) -> tuple[str, dict[str, Any]]:
display_location, query_location, _ = await self.resolve_location(
location=location,
geo_lang=geo_lang,
resolve_city_name=resolve_city_name,
)
try:
resp = await self._client.get(
"/v7/warning/now",
params={"location": query_location, "lang": lang},
)
except httpx.HTTPError as e:
raise QWeatherNetworkError(str(e)) from e
if resp.status_code >= 400:
raise QWeatherAPIError(
code=str(resp.status_code), message="warning api http error")
raw: dict[str, Any] = resp.json()
parsed = QWeatherWarningNowResponse.model_validate(raw)
if parsed.code != "200":
raise QWeatherAPIError(
code=parsed.code, message="warning api returned error")
return display_location, raw
async def get_air_quality_current(
self,
*,
location: str,
geo_lang: str = "en",
resolve_city_name: bool = True,
) -> tuple[str, dict[str, Any]]:
"""Fetch current air quality.
QWeather Air Quality v1 uses a different endpoint style:
`/airquality/v1/current/{latitude}/{longitude}`.
Therefore we need coordinates; for city names / LocationID, we resolve via GeoAPI.
"""
location = location.strip()
if not location:
raise ValueError("location is required")
display_location = location
lat: float | None = None
lon: float | None = None
if _looks_like_coordinates(location):
lon, lat = _parse_lon_lat(location)
else:
geo = await self.geo_lookup_city(location=location, lang=geo_lang)
display_location = geo.name
if geo.lat is None or geo.lon is None:
raise QWeatherAPIError(
code="500", message="geo lookup missing lat/lon")
lat = float(geo.lat)
lon = float(geo.lon)
# API requires latitude/longitude in path, 2 decimal places recommended.
lat_str = f"{lat:.2f}"
lon_str = f"{lon:.2f}"
try:
resp = await self._client.get(f"/airquality/v1/current/{lat_str}/{lon_str}")
except httpx.HTTPError as e:
raise QWeatherNetworkError(str(e)) from e
if resp.status_code >= 400:
raise QWeatherAPIError(
code=str(resp.status_code), message="air api http error")
raw: dict[str, Any] = resp.json() if resp.content else {}
return display_location, raw