"""Geolocation MCP Server implementation."""
import os
import asyncio
import json
from typing import Annotated, Optional, List, Dict, Any
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
ErrorData,
TextContent,
Tool,
INVALID_PARAMS,
INTERNAL_ERROR,
)
from pydantic import BaseModel, Field
class LocationRequest(BaseModel):
"""Parameters for location-based requests."""
lat: float = Field(description="Latitude coordinate (-90 to 90)")
lon: float = Field(description="Longitude coordinate (-180 to 180)")
address: Optional[str] = Field(None, description="Optional address for context")
class TransitStop(BaseModel):
"""Transit stop information from WalkScore API."""
id: str
lat: float
lon: float
name: str
distance: float
route_summary: List[Dict[str, Any]]
summary_text: str
summary_html: str
class WalkScoreClient:
"""Client for WalkScore API."""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.walkscore.com"
self.transit_url = "https://transit.walkscore.com/transit/search/stops/"
async def get_transit_stops(
self,
lat: float,
lon: float
) -> List[TransitStop]:
"""Get nearby transit stops for a location."""
params = {
"lat": lat,
"lon": lon,
"wsapikey": self.api_key,
}
async with httpx.AsyncClient() as client:
response = await client.get(self.transit_url, params=params)
response.raise_for_status()
data = response.json()
stops = []
for stop_data in data:
stops.append(TransitStop(
id=stop_data["id"],
lat=stop_data["lat"],
lon=stop_data["lon"],
name=stop_data["name"],
distance=stop_data["distance"],
route_summary=stop_data["route_summary"],
summary_text=stop_data["summary_text"],
summary_html=stop_data["summary_html"],
))
return stops
async def get_walkscore(
self,
address: Optional[str] = None,
lat: Optional[float] = None,
lon: Optional[float] = None
) -> Dict[str, Any]:
"""Get WalkScore for a location."""
if not address and (lat is None or lon is None):
raise ValueError("Either address or both lat/lon must be provided")
params = {
"format": "json",
"wsapikey": self.api_key,
}
if address:
params["address"] = address
if lat is not None:
params["lat"] = lat
if lon is not None:
params["lon"] = lon
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.base_url}/score", params=params)
response.raise_for_status()
return response.json()
def create_server() -> Server:
"""Create and configure the Geolocation MCP server."""
server = Server("mcp-geolocation")
# Get API key from environment
api_key = os.getenv("WALKSCORE_API_KEY")
if not api_key:
raise ValueError("WALKSCORE_API_KEY environment variable is required")
client = WalkScoreClient(api_key)
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="get_transit_stops",
description="Get nearby transit stops for a location using WalkScore API.",
inputSchema=LocationRequest.model_json_schema(),
),
Tool(
name="get_walkscore",
description="Get WalkScore, TransitScore, and BikeScore for a location.",
inputSchema=LocationRequest.model_json_schema(),
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
args = LocationRequest(**arguments)
except ValueError as e:
raise ErrorData(code=INVALID_PARAMS, message=str(e))
try:
if name == "get_transit_stops":
stops = await client.get_transit_stops(args.lat, args.lon)
if not stops:
content = f"No transit stops found near coordinates {args.lat}, {args.lon}"
else:
content = f"Found {len(stops)} transit stops near {args.lat}, {args.lon}:\n\n"
for i, stop in enumerate(stops[:10], 1): # Limit to first 10 stops
content += f"{i}. {stop.name}\n"
content += f" Distance: {stop.distance:.3f} miles\n"
content += f" Routes: {stop.summary_text}\n\n"
if len(stops) > 10:
content += f"... and {len(stops) - 10} more stops"
elif name == "get_walkscore":
walkscore_data = await client.get_walkscore(
address=args.address,
lat=args.lat,
lon=args.lon
)
content = "WalkScore Data:\n\n"
if walkscore_data.get("status") == 1:
if walkscore_data.get("walkscore"):
content += f"WalkScore: {walkscore_data['walkscore']}/100\n"
if walkscore_data.get("transit", {}).get("score"):
content += f"TransitScore: {walkscore_data['transit']['score']}/100\n"
if walkscore_data.get("bike", {}).get("score"):
content += f"BikeScore: {walkscore_data['bike']['score']}/100\n"
if walkscore_data.get("description"):
content += f"Description: {walkscore_data['description']}\n"
if walkscore_data.get("snapped_lat") and walkscore_data.get("snapped_lon"):
content += f"Location: {walkscore_data['snapped_lat']}, {walkscore_data['snapped_lon']}\n"
else:
content += f"Error: {walkscore_data.get('description', 'Unknown error')}"
else:
raise ErrorData(code=INVALID_PARAMS, message=f"Unknown tool: {name}")
except httpx.HTTPError as e:
raise ErrorData(code=INTERNAL_ERROR, message=f"Failed to fetch data from WalkScore API: {e}")
except ValueError as e:
raise ErrorData(code=INVALID_PARAMS, message=str(e))
return [TextContent(type="text", text=content)]
return server
def main():
"""Main entry point for the Geolocation MCP server."""
try:
server = create_server()
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
async def run_server():
options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options)
asyncio.run(run_server())
if __name__ == "__main__":
import sys
main()