import os
import json
import httpx
import logging
import asyncio
from typing import Any, Dict, List, Optional
from mcp.server.models import InitializationOptions
from mcp.server import Server
from mcp.server.stdio import stdio_server
import mcp.types as types
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
AMAP_API_KEY = os.getenv("AMAP_API_KEY")
CITY_CODE_FILE = "city_codes.json"
server = Server("amap-city-codes-mcp-server")
def load_city_codes() -> List[Dict[str, str]]:
"""Load city codes from JSON file"""
if not os.path.exists(CITY_CODE_FILE):
logger.error(f"City codes file not found: {CITY_CODE_FILE}")
return []
try:
with open(CITY_CODE_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading city codes: {e}")
return []
def build_city_index(city_codes: List[Dict[str, str]]) -> Dict[str, Dict[str, str]]:
"""Build indexes for efficient city lookup"""
name_index = {}
adcode_index = {}
for city in city_codes:
name = city.get("name", "")
adcode = city.get("adcode", "")
if name and adcode:
# Index by exact name
name_index[name] = city
# Index by adcode
adcode_index[adcode] = city
return {"name": name_index, "adcode": adcode_index}
# Global variables for city data and indexes
_city_codes = []
_city_indexes = {"name": {}, "adcode": {}}
def initialize_city_data():
"""Initialize city data and build indexes"""
global _city_codes, _city_indexes
_city_codes = load_city_codes()
_city_indexes = build_city_index(_city_codes)
logger.info(f"Loaded {len(_city_codes)} city codes")
# Initialize city data on module import
initialize_city_data()
async def fetch_weather(city_code: str, city_name: str) -> dict:
"""
Fetch weather data from AMap API
Supports both adcode and city code
Requires AMAP_API_KEY environment variable
"""
if not AMAP_API_KEY:
return {
"status": "error",
"message": "AMAP_API_KEY environment variable is not set"
}
base_url = "https://restapi.amap.com/v3/weather/weatherInfo"
params = {
"key": AMAP_API_KEY,
"city": city_code or city_name,
"extensions": "all"
}
headers = {
"User-Agent": "Mozilla/5.0"
}
try:
async with httpx.AsyncClient() as client:
response = await client.get(
base_url,
params=params,
headers=headers,
timeout=10
)
if response.status_code == 200:
data = response.json()
return {
"status": "success",
"data": data
}
else:
return {
"status": "error",
"message": f"API returned status {response.status_code}"
}
except httpx.TimeoutException:
return {
"status": "error",
"message": "Request timeout"
}
except Exception as e:
logger.error(f"Error fetching weather: {e}")
return {
"status": "error",
"message": str(e)
}
def search_city_by_name(city_name: str) -> List[Dict[str, str]]:
"""Search for cities by name (exact or partial match)"""
results = []
# First try exact match
if city_name in _city_indexes["name"]:
results.append(_city_indexes["name"][city_name])
# Then try partial matches
for city in _city_codes:
name = city.get("name", "")
if city_name in name and city not in results:
results.append(city)
return results
def get_city_by_adcode(adcode: str) -> Optional[Dict[str, str]]:
"""Get city by adcode"""
return _city_indexes["adcode"].get(adcode)
@server.list_tools()
async def list_tools() -> list[types.Tool]:
"""List available tools"""
return [
types.Tool(
name="get_weather_by_city_name",
description="Get real-time weather information by city name. Returns current weather conditions and 4-day forecast.",
inputSchema={
"type": "object",
"properties": {
"city_name": {
"type": "string",
"description": "City name in Chinese, e.g., '杭州市', '北京市'"
}
},
"required": ["city_name"]
}
),
types.Tool(
name="get_weather_by_city_code",
description="Get real-time weather information by city administrative code. Returns current weather conditions and 4-day forecast.",
inputSchema={
"type": "object",
"properties": {
"city_code": {
"type": "string",
"description": "City administrative code (adcode), e.g., '110100' for Beijing"
}
},
"required": ["city_code"]
}
),
types.Tool(
name="search_city",
description="Search for city code by city name (supports exact and partial matches)",
inputSchema={
"type": "object",
"properties": {
"city_name": {
"type": "string",
"description": "City name to search"
}
},
"required": ["city_name"]
}
),
types.Tool(
name="get_city_by_adcode",
description="Get city information by administrative code (adcode)",
inputSchema={
"type": "object",
"properties": {
"adcode": {
"type": "string",
"description": "Administrative code (adcode) to look up"
}
},
"required": ["adcode"]
}
),
types.Tool(
name="list_all_cities",
description="List all available cities with their codes (returns first 50 results)",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Maximum number of results to return (default: 50, max: 200)",
"default": 50
}
}
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> Any:
"""Handle tool calls"""
try:
if name == "get_weather_by_city_name":
city_name = arguments.get("city_name")
if not city_name:
return [types.TextContent(type="text", text=json.dumps({"status": "error", "message": "city_name is required"}, ensure_ascii=False))]
result = await fetch_weather("", city_name)
return [types.TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))]
elif name == "get_weather_by_city_code":
city_code = arguments.get("city_code")
if not city_code:
return [types.TextContent(type="text", text=json.dumps({"status": "error", "message": "city_code is required"}, ensure_ascii=False))]
result = await fetch_weather(city_code, "")
return [types.TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))]
elif name == "search_city":
city_name = arguments.get("city_name")
if not city_name:
return [types.TextContent(type="text", text=json.dumps({"status": "error", "message": "city_name is required"}, ensure_ascii=False))]
matches = search_city_by_name(city_name)
if matches:
return [types.TextContent(type="text", text=json.dumps({
"status": "success",
"count": len(matches),
"results": matches
}, ensure_ascii=False, indent=2))]
else:
return [types.TextContent(type="text", text=json.dumps({
"status": "not_found",
"message": f"City '{city_name}' not found"
}, ensure_ascii=False))]
elif name == "get_city_by_adcode":
adcode = arguments.get("adcode")
if not adcode:
return [types.TextContent(type="text", text=json.dumps({"status": "error", "message": "adcode is required"}, ensure_ascii=False))]
city = get_city_by_adcode(adcode)
if city:
return [types.TextContent(type="text", text=json.dumps({
"status": "success",
"result": city
}, ensure_ascii=False, indent=2))]
else:
return [types.TextContent(type="text", text=json.dumps({
"status": "not_found",
"message": f"City with adcode '{adcode}' not found"
}, ensure_ascii=False))]
elif name == "list_all_cities":
limit = min(arguments.get("limit", 50), 200) # Cap at 200 to prevent large responses
cities = _city_codes[:limit]
return [types.TextContent(type="text", text=json.dumps({
"status": "success",
"count": len(cities),
"total_available": len(_city_codes),
"results": cities
}, ensure_ascii=False, indent=2))]
else:
return [types.TextContent(type="text", text=json.dumps({
"status": "error",
"message": f"Unknown tool: {name}"
}, ensure_ascii=False))]
except Exception as e:
logger.error(f"Error handling tool call {name}: {e}")
return [types.TextContent(type="text", text=json.dumps({
"status": "error",
"message": f"Internal error: {str(e)}"
}, ensure_ascii=False))]
async def main():
"""Run the MCP server using stdio"""
logger.info("Starting MCP server for AMap city codes and weather")
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream=read_stream,
write_stream=write_stream,
initialization_options=InitializationOptions(
server_name="amap-city-codes-mcp-server",
server_version="0.1.0",
capabilities={
"tools": {}
}
)
)
if __name__ == "__main__":
asyncio.run(main())