#!/usr/bin/env python3
"""
AQICN MCP Server
用于查询中国城市空气质量信息的MCP服务器
"""
import os
from typing import Any
import httpx
from mcp.server import Server
from mcp.types import Tool, TextContent
import asyncio
# 初始化服务器
app = Server("aqicn-server")
# API配置
AQICN_TOKEN = os.getenv("AQICN_TOKEN", "")
if not AQICN_TOKEN:
raise ValueError("AQICN_TOKEN environment variable is required. Please set it in your MCP configuration.")
API_BASE_URL = "http://api.waqi.info/feed"
def get_aqi_description(aqi: int) -> str:
"""
根据AQI值返回空气质量描述和健康建议
Args:
aqi: 空气质量指数
Returns:
空气质量等级描述和健康建议
"""
if aqi <= 50:
return "优 (0-50): 空气质量令人满意,基本无空气污染。"
elif aqi <= 100:
return "良 (51-100): 空气质量可接受,但某些污染物可能对极少数异常敏感人群健康有较弱影响。"
elif aqi <= 150:
return "轻度污染 (101-150): 易感人群症状有轻度加剧,健康人群出现刺激症状。"
elif aqi <= 200:
return "中度污染 (151-200): 进一步加剧易感人群症状,可能对健康人群心脏、呼吸系统有影响。"
elif aqi <= 300:
return "重度污染 (201-300): 心脏病和肺病患者症状显著加剧,运动耐受力降低,健康人群普遍出现症状。"
else:
return "严重污染 (300+): 健康人运动耐受力降低,有明显强烈症状,提前出现某些疾病。"
def format_air_quality_data(data: dict[str, Any], city: str) -> str:
"""
格式化空气质量数据为易读的文本
Args:
data: API返回的JSON数据
city: 城市名称
Returns:
格式化后的文本
"""
status = data.get("status", "unknown")
if status != "ok":
return f"❌ 查询失败: {data.get('data', '未知错误')}"
aqi_data = data.get("data", {})
if not aqi_data:
return "❌ 未找到该城市的空气质量数据"
# 基础信息
aqi = aqi_data.get("aqi", 0)
idx = aqi_data.get("idx", "N/A")
city_name = aqi_data.get("city", {}).get("name", city)
# 坐标信息
geo = aqi_data.get("city", {}).get("geo", [])
if geo and len(geo) == 2:
lat, lon = geo
else:
lat, lon = "N/A", "N/A"
# 时间戳
time_info = aqi_data.get("time", {})
tz = time_info.get("tz", "N/A")
stamp = time_info.get("stamp", "N/A")
iso = time_info.get("iso", "N/A")
# 污染物数据
iaqi = aqi_data.get("iaqi", {})
# 构建输出
output = f"""📍 {city_name} 空气质量信息
{'=' * 50}
🌡️ AQI指数: {aqi}
📊 空气质量等级: {get_aqi_description(aqi)}
🕐 时区: {tz}
📅 更新时间: {iso}
🔬 污染物详情:
"""
# 添加各个污染物的数据
pollutants = {
"pm25": "PM2.5",
"pm10": "PM10",
"so2": "二氧化硫",
"no2": "二氧化氮",
"o3": "臭氧",
"co": "一氧化碳",
"p": "颗粒物",
}
for key, name_zh in pollutants.items():
if key in iaqi and iaqi[key] is not None:
value = iaqi[key].get("v", "N/A")
output += f" • {name_zh}: {value}\n"
return output
@app.list_tools()
async def list_tools() -> list[Tool]:
"""
列出可用的工具
"""
return [
Tool(
name="get_air_quality",
description="根据城市拼音查询空气质量信息。例如: beijing, shanghai, guangzhou, shenzhen等",
inputSchema={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称的拼音,例如: beijing, shanghai, guangzhou, shenzhen, chengdu等"
}
},
"required": ["city"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""
处理工具调用
"""
if name == "get_air_quality":
city = arguments.get("city", "")
if not city:
return [TextContent(type="text", text="❌ 错误: 请提供城市名称拼音")]
# 构建API URL
url = f"{API_BASE_URL}/{city}/?token={AQICN_TOKEN}"
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url)
response.raise_for_status()
data = response.json()
formatted_output = format_air_quality_data(data, city)
return [TextContent(type="text", text=formatted_output)]
except httpx.TimeoutException:
return [TextContent(type="text", text="❌ 错误: 请求超时,请稍后重试")]
except httpx.HTTPStatusError as e:
return [TextContent(type="text", text=f"❌ HTTP错误: {e.response.status_code}")]
except Exception as e:
return [TextContent(type="text", text=f"❌ 错误: {str(e)}")]
else:
raise ValueError(f"未知工具: {name}")
async def main():
"""启动MCP服务器"""
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())