#!/usr/bin/env python3
"""
测试AQICN API连接
"""
import asyncio
import httpx
AQICN_TOKEN = "65e4e8b2fb19f19faf501419e83b6f1fa14d85bc"
API_BASE_URL = "http://api.waqi.info/feed"
async def test_city(city: str):
"""测试查询指定城市的空气质量"""
url = f"{API_BASE_URL}/{city}/?token={AQICN_TOKEN}"
print(f"\n{'=' * 60}")
print(f"正在查询: {city}")
print(f"URL: {url}")
print('=' * 60)
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url)
response.raise_for_status()
data = response.json()
print(f"状态: {data.get('status')}")
if data.get("status") == "ok":
aqi_data = data.get("data", {})
aqi = aqi_data.get("aqi", "N/A")
city_name = aqi_data.get("city", {}).get("name", "N/A")
time_info = aqi_data.get("time", {}).get("iso", "N/A")
print(f"城市: {city_name}")
print(f"AQI: {aqi}")
print(f"更新时间: {time_info}")
# 显示污染物
iaqi = aqi_data.get("iaqi", {})
print("\n污染物详情:")
pollutants = ["pm25", "pm10", "so2", "no2", "o3", "co"]
for p in pollutants:
if p in iaqi and iaqi[p]:
print(f" {p}: {iaqi[p].get('v', 'N/A')}")
else:
print(f"❌ 查询失败")
print(f"数据: {data}")
except Exception as e:
print(f"❌ 错误: {str(e)}")
async def main():
"""测试多个城市"""
cities = ["beijing", "shanghai", "guangzhou", "shenzhen", "chengdu"]
for city in cities:
await test_city(city)
await asyncio.sleep(1) # 避免请求过快
if __name__ == "__main__":
asyncio.run(main())