Skip to main content
Glama
server.py59.2 kB
from mcp.server.fastmcp import FastMCP, Context from dataclasses import dataclass from typing import AsyncIterator, List, Dict, Optional, Tuple, Any, Union import aiohttp import json import asyncio from contextlib import asynccontextmanager import math from datetime import datetime class OSMClient: def __init__(self, base_url="https://api.openstreetmap.org/api/0.6"): self.base_url = base_url self.session = None self.cache = {} # Simple in-memory cache async def connect(self): self.session = aiohttp.ClientSession() async def disconnect(self): if self.session: await self.session.close() async def geocode(self, query: str) -> List[Dict]: """Geocode an address or place name""" if not self.session: raise RuntimeError("OSM client not connected") nominatim_url = "https://nominatim.openstreetmap.org/search" async with self.session.get( nominatim_url, params={ "q": query, "format": "json", "limit": 5 }, headers={"User-Agent": "OSM-MCP-Server/1.0"} ) as response: if response.status == 200: return await response.json() else: raise Exception(f"Failed to geocode '{query}': {response.status}") async def reverse_geocode(self, lat: float, lon: float) -> Dict: """Reverse geocode coordinates to address""" if not self.session: raise RuntimeError("OSM client not connected") nominatim_url = "https://nominatim.openstreetmap.org/reverse" async with self.session.get( nominatim_url, params={ "lat": lat, "lon": lon, "format": "json" }, headers={"User-Agent": "OSM-MCP-Server/1.0"} ) as response: if response.status == 200: return await response.json() else: raise Exception(f"Failed to reverse geocode ({lat}, {lon}): {response.status}") async def get_route(self, from_lat: float, from_lon: float, to_lat: float, to_lon: float, mode: str = "car", steps: bool = False, overview: str = "overview", annotations: bool = True) -> Dict: """Get routing information between two points""" if not self.session: raise RuntimeError("OSM client not connected") # Use OSRM for routing osrm_url = f"http://router.project-osrm.org/route/v1/{mode}/{from_lon},{from_lat};{to_lon},{to_lat}" params = { "overview": overview, "geometries": "geojson", "steps": str(steps).lower(), "annotations": str(annotations).lower() } async with self.session.get(osrm_url, params=params) as response: if response.status == 200: return await response.json() else: raise Exception(f"Failed to get route: {response.status}") async def get_nearby_pois(self, lat: float, lon: float, radius: float = 1000, categories: List[str] = None) -> List[Dict]: """Get points of interest near a location""" if not self.session: raise RuntimeError("OSM client not connected") # Convert radius to bounding box (approximate) # 1 degree latitude ~= 111km # 1 degree longitude ~= 111km * cos(latitude) lat_delta = radius / 111000 lon_delta = radius / (111000 * math.cos(math.radians(lat))) bbox = ( lon - lon_delta, lat - lat_delta, lon + lon_delta, lat + lat_delta ) # Build Overpass query overpass_url = "https://overpass-api.de/api/interpreter" # Default to common POI types if none specified if not categories: categories = ["amenity", "shop", "tourism", "leisure"] # Build tag filters tag_filters = [] for category in categories: tag_filters.append(f'node["{category}"]({{bbox}});') query = f""" [out:json]; ( {" ".join(tag_filters)} ); out body; """ query = query.replace("{bbox}", f"{bbox[1]},{bbox[0]},{bbox[3]},{bbox[2]}") async with self.session.post(overpass_url, data={"data": query}) as response: if response.status == 200: data = await response.json() return data.get("elements", []) else: raise Exception(f"Failed to get nearby POIs: {response.status}") async def search_features_by_category(self, bbox: Tuple[float, float, float, float], category: str, subcategories: List[str] = None) -> List[Dict]: """Search for OSM features by category and subcategories""" if not self.session: raise RuntimeError("OSM client not connected") overpass_url = "https://overpass-api.de/api/interpreter" # Build query for specified category and subcategories if subcategories: subcategory_filters = " or ".join([f'"{category}"="{sub}"' for sub in subcategories]) query_filter = f'({subcategory_filters})' else: query_filter = f'"{category}"' query = f""" [out:json]; ( node[{query_filter}]({bbox[1]},{bbox[0]},{bbox[3]},{bbox[2]}); way[{query_filter}]({bbox[1]},{bbox[0]},{bbox[3]},{bbox[2]}); relation[{query_filter}]({bbox[1]},{bbox[0]},{bbox[3]},{bbox[2]}); ); out body; """ async with self.session.post(overpass_url, data={"data": query}) as response: if response.status == 200: data = await response.json() return data.get("elements", []) else: raise Exception(f"Failed to search features by category: {response.status}") # Create application context @dataclass class AppContext: osm_client: OSMClient # Define lifespan manager @asynccontextmanager async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: """Manage OSM client lifecycle""" osm_client = OSMClient() try: await osm_client.connect() yield AppContext(osm_client=osm_client) finally: await osm_client.disconnect() # Create the MCP server mcp = FastMCP( "Location-Based App MCP Server", dependencies=["aiohttp", "geojson", "shapely", "haversine"], lifespan=app_lifespan ) @mcp.tool() async def geocode_address(address: str, ctx: Context) -> List[Dict]: """ Convert an address or place name to geographic coordinates with detailed location information. This tool takes a text description of a location (such as an address, landmark name, or place of interest) and returns its precise geographic coordinates along with rich metadata. The results can be used for mapping, navigation, location-based analysis, and as input to other geospatial tools. Args: address: The address, place name, landmark, or description to geocode (e.g., "Empire State Building", "123 Main St, Springfield", "Golden Gate Park, San Francisco") Returns: List of matching locations with: - Geographic coordinates (latitude/longitude) - Formatted address - Administrative boundaries (city, state, country) - OSM type and ID - Bounding box (if applicable) - Importance ranking """ osm_client = ctx.request_context.lifespan_context.osm_client results = await osm_client.geocode(address) # Enhance results with additional context for result in results: if "lat" in result and "lon" in result: result["coordinates"] = { "latitude": float(result["lat"]), "longitude": float(result["lon"]) } return results @mcp.tool() async def reverse_geocode(latitude: float, longitude: float, ctx: Context) -> Dict: """ Convert geographic coordinates to a detailed address and location description. This tool takes a specific point on Earth (latitude and longitude) and returns comprehensive information about that location, including its address, nearby landmarks, administrative boundaries, and other contextual information. Useful for translating GPS coordinates into human-readable locations. Args: latitude: The latitude coordinate (decimal degrees, WGS84) longitude: The longitude coordinate (decimal degrees, WGS84) Returns: Detailed address and location information including: - Formatted address - Building, street, city, state, country - Administrative hierarchy - OSM metadata - Postal code and other relevant identifiers """ osm_client = ctx.request_context.lifespan_context.osm_client return await osm_client.reverse_geocode(latitude, longitude) @mcp.tool() async def find_nearby_places( latitude: float, longitude: float, ctx: Context, radius: float = 1000, # meters categories: List[str] = None, limit: int = 20 ) -> Dict[str, Any]: """ Discover points of interest and amenities near a specific location. This tool performs a comprehensive search around a geographic point to identify nearby establishments, amenities, and points of interest. Results are organized by category and subcategory, making it easy to find specific types of places. Essential for location-based recommendations, neighborhood analysis, and proximity-based decision making. Args: latitude: Center point latitude (decimal degrees) longitude: Center point longitude (decimal degrees) radius: Search radius in meters (defaults to 1000m/1km) categories: List of OSM categories to search for (e.g., ["amenity", "shop", "tourism"]). If omitted, searches common categories. limit: Maximum number of total results to return Returns: Structured dictionary containing: - Original query parameters - Total count of places found - Results grouped by category and subcategory - Each place includes name, coordinates, and associated tags """ osm_client = ctx.request_context.lifespan_context.osm_client # Set default categories if not provided if not categories: categories = ["amenity", "shop", "tourism", "leisure"] ctx.info(f"Searching for places within {radius}m of ({latitude}, {longitude})") places = await osm_client.get_nearby_pois(latitude, longitude, radius, categories) # Group results by category results_by_category = {} for place in places[:limit]: tags = place.get("tags", {}) # Find the matching category for category in categories: if category in tags: subcategory = tags[category] if category not in results_by_category: results_by_category[category] = {} if subcategory not in results_by_category[category]: results_by_category[category][subcategory] = [] # Add place to appropriate category and subcategory place_info = { "id": place.get("id"), "name": tags.get("name", "Unnamed"), "latitude": place.get("lat"), "longitude": place.get("lon"), "tags": tags } results_by_category[category][subcategory].append(place_info) # Calculate total count total_count = sum( len(places) for category_data in results_by_category.values() for places in category_data.values() ) return { "query": { "latitude": latitude, "longitude": longitude, "radius": radius }, "categories": results_by_category, "total_count": total_count } @mcp.tool() async def get_route_directions( from_latitude: float, from_longitude: float, to_latitude: float, to_longitude: float, ctx: Context, mode: str = "car", steps: bool = False, overview: str = "simplified", annotations: bool = False ) -> Dict[str, Any]: """ Calculate detailed route directions between two geographic points. This tool provides comprehensive routing information between two locations using OpenStreetMap/OSRM. The output can be minimized using the steps, overview, and annotations parameters to reduce the response size. Args: from_latitude: Starting point latitude (decimal degrees) from_longitude: Starting point longitude (decimal degrees) to_latitude: Destination latitude (decimal degrees) to_longitude: Destination longitude (decimal degrees) ctx: Context (provided internally by MCP) mode: Transportation mode ("car", "bike", "foot") steps: Turn-by-turn instructions (True/False, Default: False) overview: Geometry output ("full", "simplified", "false"; Default: "simplified") annotations: Additional segment info (True/False, Default: False) Returns: Dictionary with routing information (summary, directions, geometry, waypoints) Example: { "from_latitude": 51.3334193, "from_longitude": 9.4540423, "to_latitude": 51.3295516, "to_longitude": 9.4576721, "mode": "car", "steps": false, "overview": "simplified", "annotations": false } """ osm_client = ctx.request_context.lifespan_context.osm_client # Validate transportation mode valid_modes = ["car", "bike", "foot"] if mode not in valid_modes: ctx.warning(f"Invalid mode '{mode}'. Using 'car' instead.") mode = "car" ctx.info(f"Calculating {mode} route from ({from_latitude}, {from_longitude}) to ({to_latitude}, {to_longitude})") # Get route from OSRM route_data = await osm_client.get_route( from_latitude, from_longitude, to_latitude, to_longitude, mode, steps=steps, overview=overview, annotations=annotations ) # Process and simplify the response if "routes" in route_data and len(route_data["routes"]) > 0: route = route_data["routes"][0] # Extract turn-by-turn directions steps_list = [] if "legs" in route: for leg in route["legs"]: for step in leg.get("steps", []): steps_list.append({ "instruction": step.get("maneuver", {}).get("instruction", ""), "distance": step.get("distance"), "duration": step.get("duration"), "name": step.get("name", "") }) return { "summary": { "distance": route.get("distance"), # meters "duration": route.get("duration"), # seconds "mode": mode }, "directions": steps_list, "geometry": route.get("geometry"), "waypoints": route_data.get("waypoints", []) } else: raise Exception("No route found") @mcp.tool() async def search_category( category: str, min_latitude: float, min_longitude: float, max_latitude: float, max_longitude: float, ctx: Context, subcategories: List[str] = None ) -> Dict[str, Any]: """ Search for specific types of places within a defined geographic area. This tool allows targeted searches for places matching specific categories within a rectangular geographic region. It's particularly useful for filtering places by type (restaurants, schools, parks, etc.) within a neighborhood or city district. Results include complete location details and metadata about each matching place. Args: category: Main OSM category to search for (e.g., "amenity", "shop", "tourism", "building") min_latitude: Southern boundary of search area (decimal degrees) min_longitude: Western boundary of search area (decimal degrees) max_latitude: Northern boundary of search area (decimal degrees) max_longitude: Eastern boundary of search area (decimal degrees) subcategories: Optional list of specific subcategories to filter by (e.g., ["restaurant", "cafe"]) Returns: Structured results including: - Query parameters - Count of matching places - List of matching places with coordinates, names, and metadata """ osm_client = ctx.request_context.lifespan_context.osm_client bbox = (min_longitude, min_latitude, max_longitude, max_latitude) ctx.info(f"Searching for {category} in bounding box") features = await osm_client.search_features_by_category(bbox, category, subcategories) # Process results results = [] for feature in features: tags = feature.get("tags", {}) # Get coordinates based on feature type coords = {} if feature.get("type") == "node": coords = { "latitude": feature.get("lat"), "longitude": feature.get("lon") } # For ways and relations, use center coordinates if available elif "center" in feature: coords = { "latitude": feature.get("center", {}).get("lat"), "longitude": feature.get("center", {}).get("lon") } # Only include features with valid coordinates if coords: results.append({ "id": feature.get("id"), "type": feature.get("type"), "name": tags.get("name", "Unnamed"), "coordinates": coords, "category": category, "subcategory": tags.get(category), "tags": tags }) return { "query": { "category": category, "subcategories": subcategories, "bbox": { "min_latitude": min_latitude, "min_longitude": min_longitude, "max_latitude": max_latitude, "max_longitude": max_longitude } }, "results": results, "count": len(results) } @mcp.tool() async def suggest_meeting_point( locations: List[Dict[str, float]], ctx: Context, venue_type: str = "cafe" ) -> Dict[str, Any]: """ Find the optimal meeting place for multiple people coming from different locations. This tool calculates a central meeting point based on the locations of multiple individuals, then recommends suitable venues near that central point. Ideal for planning social gatherings, business meetings, or any situation where multiple people need to converge from different starting points. Args: locations: List of dictionaries, each containing the latitude and longitude of a person's location Example: [{"latitude": 37.7749, "longitude": -122.4194}, {"latitude": 37.3352, "longitude": -121.8811}] venue_type: Type of venue to suggest as a meeting point. Options include: "cafe", "restaurant", "bar", "library", "park", etc. Returns: Meeting point recommendations including: - Calculated center point coordinates - List of suggested venues with names and details - Total number of matching venues in the area """ osm_client = ctx.request_context.lifespan_context.osm_client if len(locations) < 2: raise ValueError("Need at least two locations to suggest a meeting point") # Calculate the center point (simple average) avg_lat = sum(loc.get("latitude", 0) for loc in locations) / len(locations) avg_lon = sum(loc.get("longitude", 0) for loc in locations) / len(locations) ctx.info(f"Calculating center point for {len(locations)} locations: ({avg_lat}, {avg_lon})") # Search for venues around this center point venues = await osm_client.get_nearby_pois( avg_lat, avg_lon, radius=500, # Search within 500m of center categories=["amenity"] ) # Filter venues by type matching_venues = [] for venue in venues: tags = venue.get("tags", {}) if tags.get("amenity") == venue_type: matching_venues.append({ "id": venue.get("id"), "name": tags.get("name", "Unnamed Venue"), "latitude": venue.get("lat"), "longitude": venue.get("lon"), "tags": tags }) # If no venues found, expand search if not matching_venues: ctx.info(f"No {venue_type} found within 500m, expanding search to 1000m") venues = await osm_client.get_nearby_pois( avg_lat, avg_lon, radius=1000, categories=["amenity"] ) for venue in venues: tags = venue.get("tags", {}) if tags.get("amenity") == venue_type: matching_venues.append({ "id": venue.get("id"), "name": tags.get("name", "Unnamed Venue"), "latitude": venue.get("lat"), "longitude": venue.get("lon"), "tags": tags }) # Return the result return { "center_point": { "latitude": avg_lat, "longitude": avg_lon }, "suggested_venues": matching_venues[:5], # Top 5 venues "venue_type": venue_type, "total_options": len(matching_venues) } @mcp.tool() async def explore_area( latitude: float, longitude: float, ctx: Context, radius: float = 500 ) -> Dict[str, Any]: """ Generate a comprehensive profile of an area including all amenities and features. This powerful analysis tool creates a detailed overview of a neighborhood or area by identifying and categorizing all geographic features, amenities, and points of interest. Results are organized by category for easy analysis. Excellent for neighborhood research, area comparisons, and location-based decision making. Args: latitude: Center point latitude (decimal degrees) longitude: Center point longitude (decimal degrees) radius: Search radius in meters (defaults to 500m) Returns: In-depth area profile including: - Address and location context - Total feature count - Features organized by category and subcategory - Each feature includes name, coordinates, and detailed metadata """ osm_client = ctx.request_context.lifespan_context.osm_client # Categories to search for categories = [ "amenity", "shop", "tourism", "leisure", "natural", "historic", "public_transport" ] results = {} for i, category in enumerate(categories): await ctx.report_progress(i, len(categories)) ctx.info(f"Exploring {category} features...") try: # Convert radius to bounding box lat_delta = radius / 111000 lon_delta = radius / (111000 * math.cos(math.radians(latitude))) bbox = ( longitude - lon_delta, latitude - lat_delta, longitude + lon_delta, latitude + lat_delta ) features = await osm_client.search_features_by_category(bbox, category) # Group by subcategory subcategories = {} for feature in features: tags = feature.get("tags", {}) subcategory = tags.get(category) if subcategory: if subcategory not in subcategories: subcategories[subcategory] = [] # Get coordinates based on feature type coords = {} if feature.get("type") == "node": coords = { "latitude": feature.get("lat"), "longitude": feature.get("lon") } elif "center" in feature: coords = { "latitude": feature.get("center", {}).get("lat"), "longitude": feature.get("center", {}).get("lon") } subcategories[subcategory].append({ "id": feature.get("id"), "name": tags.get("name", "Unnamed"), "coordinates": coords, "type": feature.get("type"), "tags": tags }) results[category] = subcategories except Exception as e: ctx.warning(f"Error fetching {category} features: {str(e)}") results[category] = {} # Get address information for the center point try: address_info = await osm_client.reverse_geocode(latitude, longitude) except Exception: address_info = {"error": "Could not retrieve address information"} # Report completion await ctx.report_progress(len(categories), len(categories)) # Count total features total_features = sum( len(places) for category_data in results.values() for places in category_data.values() ) return { "query": { "latitude": latitude, "longitude": longitude, "radius": radius }, "address": address_info, "categories": results, "total_features": total_features, "timestamp": datetime.now().isoformat() } # Add resource endpoints for common location-based app needs @mcp.resource("location://place/{query}") async def get_place_resource(query: str) -> str: """ Get information about a place by name. Args: query: Place name or address to look up Returns: JSON string with place information """ async with aiohttp.ClientSession() as session: nominatim_url = "https://nominatim.openstreetmap.org/search" async with session.get( nominatim_url, params={ "q": query, "format": "json", "limit": 1 }, headers={"User-Agent": "LocationApp-MCP-Server/1.0"} ) as response: if response.status == 200: data = await response.json() return json.dumps(data) else: raise Exception(f"Failed to get place info for {query}: {response.status}") @mcp.resource("location://map/{style}/{z}/{x}/{y}") async def get_map_style(style: str, z: int, x: int, y: int) -> Tuple[bytes, str]: """ Get a styled map tile at the specified coordinates. Args: style: Map style (standard, cycle, transport, etc.) z: Zoom level x: X coordinate y: Y coordinate Returns: Tuple of (tile image bytes, mime type) """ # Map styles to their respective tile servers tile_servers = { "standard": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", "cycle": "https://tile.thunderforest.com/cycle/{z}/{x}/{y}.png", "transport": "https://tile.thunderforest.com/transport/{z}/{x}/{y}.png", "landscape": "https://tile.thunderforest.com/landscape/{z}/{x}/{y}.png", "outdoor": "https://tile.thunderforest.com/outdoors/{z}/{x}/{y}.png" } if style not in tile_servers: style = "standard" tile_url = tile_servers[style].replace("{z}", str(z)).replace("{x}", str(x)).replace("{y}", str(y)) async with aiohttp.ClientSession() as session: async with session.get(tile_url) as response: if response.status == 200: tile_data = await response.read() return tile_data, "image/png" else: raise Exception(f"Failed to get {style} tile at {z}/{x}/{y}: {response.status}") @mcp.tool() async def find_schools_nearby( latitude: float, longitude: float, ctx: Context, radius: float = 2000, education_levels: List[str] = None ) -> Dict[str, Any]: """ Locate educational institutions near a specific location, filtered by education level. This specialized search tool identifies schools, colleges, and other educational institutions within a specified distance from a location. Results can be filtered by education level (elementary, middle, high school, university, etc.). Essential for families evaluating neighborhoods or real estate purchases with education considerations. Args: latitude: Center point latitude (decimal degrees) longitude: Center point longitude (decimal degrees) radius: Search radius in meters (defaults to 2000m/2km) education_levels: Optional list of specific education levels to filter by (e.g., ["elementary", "secondary", "university"]) Returns: List of educational institutions with: - Name and type - Distance from search point - Education levels offered - Contact information if available - Other relevant metadata """ osm_client = ctx.request_context.lifespan_context.osm_client # Convert radius to bounding box (approximate) lat_delta = radius / 111000 lon_delta = radius / (111000 * math.cos(math.radians(latitude))) bbox = ( longitude - lon_delta, latitude - lat_delta, longitude + lon_delta, latitude + lat_delta ) # Build Overpass query for educational institutions overpass_url = "https://overpass-api.de/api/interpreter" # Create query for amenity=school and other education-related tags education_filters = [ 'node["amenity"="school"]({{bbox}});', 'way["amenity"="school"]({{bbox}});', 'node["amenity"="university"]({{bbox}});', 'way["amenity"="university"]({{bbox}});', 'node["amenity"="kindergarten"]({{bbox}});', 'way["amenity"="kindergarten"]({{bbox}});', 'node["amenity"="college"]({{bbox}});', 'way["amenity"="college"]({{bbox}});' ] query = f""" [out:json]; ( {" ".join(education_filters)} ); out body; """ query = query.replace("{bbox}", f"{bbox[1]},{bbox[0]},{bbox[3]},{bbox[2]}") async with aiohttp.ClientSession() as session: async with session.post(overpass_url, data={"data": query}) as response: if response.status == 200: data = await response.json() schools = data.get("elements", []) else: raise Exception(f"Failed to find schools: {response.status}") # Process and filter results results = [] for school in schools: tags = school.get("tags", {}) school_type = tags.get("school", "") # Filter by education level if specified if education_levels and school_type and school_type not in education_levels: continue # Get coordinates based on feature type coords = {} if school.get("type") == "node": coords = { "latitude": school.get("lat"), "longitude": school.get("lon") } elif "center" in school: coords = { "latitude": school.get("center", {}).get("lat"), "longitude": school.get("center", {}).get("lon") } # Skip if no valid coordinates if not coords: continue # Calculate distance from search point # Using Haversine formula for quick distance calculation from math import radians, sin, cos, sqrt, asin def haversine(lat1, lon1, lat2, lon2): R = 6371000 # Earth radius in meters dLat = radians(lat2 - lat1) dLon = radians(lon2 - lon1) a = sin(dLat/2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dLon/2)**2 c = 2 * asin(sqrt(a)) return R * c distance = haversine(latitude, longitude, coords["latitude"], coords["longitude"]) results.append({ "id": school.get("id"), "name": tags.get("name", "Unnamed School"), "amenity_type": tags.get("amenity", ""), "school_type": school_type, "education_level": tags.get("isced", ""), "coordinates": coords, "distance": round(distance, 1), "address": { "street": tags.get("addr:street", ""), "housenumber": tags.get("addr:housenumber", ""), "city": tags.get("addr:city", ""), "postcode": tags.get("addr:postcode", "") }, "tags": tags }) # Sort by distance results.sort(key=lambda x: x["distance"]) return { "query": { "latitude": latitude, "longitude": longitude, "radius": radius, "education_levels": education_levels }, "schools": results, "count": len(results) } @mcp.tool() async def analyze_commute( home_latitude: float, home_longitude: float, work_latitude: float, work_longitude: float, ctx: Context, modes: List[str] = ["car", "foot", "bike"], depart_at: str = None # Time in HH:MM format, e.g. "08:30" ) -> Dict[str, Any]: """ Perform a detailed commute analysis between home and work locations. This advanced tool analyzes commute options between two locations (typically home and work), comparing multiple transportation modes and providing detailed metrics for each option. Includes estimated travel times, distances, turn-by-turn directions, and other commute-relevant data. Essential for real estate decisions, lifestyle planning, and workplace relocation analysis. Args: home_latitude: Home location latitude (decimal degrees) home_longitude: Home location longitude (decimal degrees) work_latitude: Workplace location latitude (decimal degrees) work_longitude: Workplace location longitude (decimal degrees) modes: List of transportation modes to analyze (options: "car", "foot", "bike") depart_at: Optional departure time (format: "HH:MM") for time-sensitive routing Returns: Comprehensive commute analysis with: - Summary comparing all transportation modes - Detailed route information for each mode - Total distance and duration for each option - Turn-by-turn directions """ osm_client = ctx.request_context.lifespan_context.osm_client # Get address information for both locations home_info = await osm_client.reverse_geocode(home_latitude, home_longitude) work_info = await osm_client.reverse_geocode(work_latitude, work_longitude) # Get commute information for each mode commute_options = [] for mode in modes: ctx.info(f"Calculating {mode} route for commute analysis") # Get route from OSRM try: route_data = await osm_client.get_route( home_latitude, home_longitude, work_latitude, work_longitude, mode ) if "routes" in route_data and len(route_data["routes"]) > 0: route = route_data["routes"][0] # Extract directions steps = [] if "legs" in route: for leg in route["legs"]: for step in leg.get("steps", []): steps.append({ "instruction": step.get("maneuver", {}).get("instruction", ""), "distance": step.get("distance"), "duration": step.get("duration"), "name": step.get("name", "") }) commute_options.append({ "mode": mode, "distance_km": round(route.get("distance", 0) / 1000, 2), "duration_minutes": round(route.get("duration", 0) / 60, 1), "directions": steps }) except Exception as e: ctx.warning(f"Error getting {mode} route: {str(e)}") commute_options.append({ "mode": mode, "error": str(e) }) # Sort by duration (fastest first) commute_options.sort(key=lambda x: x.get("duration_minutes", float("inf"))) return { "home": { "coordinates": { "latitude": home_latitude, "longitude": home_longitude }, "address": home_info.get("display_name", "Unknown location") }, "work": { "coordinates": { "latitude": work_latitude, "longitude": work_longitude }, "address": work_info.get("display_name", "Unknown location") }, "commute_options": commute_options, "fastest_option": commute_options[0]["mode"] if commute_options else None, "depart_at": depart_at } @mcp.tool() async def find_ev_charging_stations( latitude: float, longitude: float, ctx: Context, radius: float = 5000, connector_types: List[str] = None, min_power: float = None ) -> Dict[str, Any]: """ Locate electric vehicle charging stations near a specific location. This specialized search tool identifies EV charging infrastructure within a specified distance from a location. Results can be filtered by connector type (Tesla, CCS, CHAdeMO, etc.) and minimum power delivery. Essential for EV owners planning trips or evaluating potential charging stops. Args: latitude: Center point latitude (decimal degrees) longitude: Center point longitude (decimal degrees) radius: Search radius in meters (defaults to 5000m/5km) connector_types: Optional list of specific connector types to filter by (e.g., ["type2", "ccs", "tesla"]) min_power: Minimum charging power in kW Returns: List of charging stations with: - Location name and operator - Available connector types - Charging speeds - Number of charging points - Access restrictions - Other relevant metadata """ osm_client = ctx.request_context.lifespan_context.osm_client # Convert radius to bounding box lat_delta = radius / 111000 lon_delta = radius / (111000 * math.cos(math.radians(latitude))) bbox = ( longitude - lon_delta, latitude - lat_delta, longitude + lon_delta, latitude + lat_delta ) # Build Overpass query for EV charging stations overpass_url = "https://overpass-api.de/api/interpreter" query = f""" [out:json]; ( node["amenity"="charging_station"]({{bbox}}); way["amenity"="charging_station"]({{bbox}}); ); out body; """ query = query.replace("{bbox}", f"{bbox[1]},{bbox[0]},{bbox[3]},{bbox[2]}") async with aiohttp.ClientSession() as session: async with session.post(overpass_url, data={"data": query}) as response: if response.status == 200: data = await response.json() stations = data.get("elements", []) else: raise Exception(f"Failed to find charging stations: {response.status}") # Process and filter results results = [] for station in stations: tags = station.get("tags", {}) # Get coordinates based on feature type coords = {} if station.get("type") == "node": coords = { "latitude": station.get("lat"), "longitude": station.get("lon") } elif "center" in station: coords = { "latitude": station.get("center", {}).get("lat"), "longitude": station.get("center", {}).get("lon") } # Skip if no valid coordinates if not coords: continue # Extract connector information connectors = [] for key, value in tags.items(): if key.startswith("socket:"): connector_type = key.split(":", 1)[1] connectors.append({ "type": connector_type, "count": value if value.isdigit() else 1 }) # Filter by connector type if specified if connector_types: has_matching_connector = False for connector in connectors: if connector["type"] in connector_types: has_matching_connector = True break if not has_matching_connector: continue # Extract power information power = None if "maxpower" in tags: try: power = float(tags["maxpower"]) except ValueError: pass # Filter by minimum power if specified if min_power is not None and (power is None or power < min_power): continue # Calculate distance from search point from math import radians, sin, cos, sqrt, asin def haversine(lat1, lon1, lat2, lon2): R = 6371000 # Earth radius in meters dLat = radians(lat2 - lat1) dLon = radians(lon2 - lon1) a = sin(dLat/2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dLon/2)**2 c = 2 * asin(sqrt(a)) return R * c distance = haversine(latitude, longitude, coords["latitude"], coords["longitude"]) results.append({ "id": station.get("id"), "name": tags.get("name", "Unnamed Charging Station"), "operator": tags.get("operator", "Unknown"), "coordinates": coords, "distance": round(distance, 1), "connectors": connectors, "capacity": tags.get("capacity", "Unknown"), "power": power, "fee": tags.get("fee", "Unknown"), "access": tags.get("access", "public"), "opening_hours": tags.get("opening_hours", "Unknown"), "address": { "street": tags.get("addr:street", ""), "housenumber": tags.get("addr:housenumber", ""), "city": tags.get("addr:city", ""), "postcode": tags.get("addr:postcode", "") }, "tags": tags }) # Sort by distance results.sort(key=lambda x: x["distance"]) return { "query": { "latitude": latitude, "longitude": longitude, "radius": radius, "connector_types": connector_types, "min_power": min_power }, "stations": results, "count": len(results) } @mcp.tool() async def analyze_neighborhood( latitude: float, longitude: float, ctx: Context, radius: float = 1000 ) -> Dict[str, Any]: """ Generate a comprehensive neighborhood analysis focused on livability factors. This advanced analysis tool evaluates a neighborhood based on multiple livability factors, including amenities, transportation options, green spaces, and services. Results include counts and proximity scores for various categories, helping to assess the overall quality and convenience of a residential area. Invaluable for real estate decisions, relocation planning, and neighborhood comparisons. Args: latitude: Center point latitude (decimal degrees) longitude: Center point longitude (decimal degrees) radius: Analysis radius in meters (defaults to 1000m/1km) Returns: Comprehensive neighborhood profile including: - Overall neighborhood score - Walkability assessment - Public transportation access - Nearby amenities (shops, restaurants, services) - Green spaces and recreation - Education and healthcare facilities - Detailed counts and distance metrics for each category """ osm_client = ctx.request_context.lifespan_context.osm_client # Get address information for the center point address_info = await osm_client.reverse_geocode(latitude, longitude) # Categories to analyze for neighborhood quality categories = [ # Essential services {"name": "groceries", "tags": ["shop=supermarket", "shop=convenience", "shop=grocery"]}, {"name": "restaurants", "tags": ["amenity=restaurant", "amenity=cafe", "amenity=fast_food"]}, {"name": "healthcare", "tags": ["amenity=hospital", "amenity=doctors", "amenity=pharmacy"]}, {"name": "education", "tags": ["amenity=school", "amenity=kindergarten", "amenity=university"]}, # Transportation {"name": "public_transport", "tags": ["public_transport=stop_position", "railway=station", "amenity=bus_station"]}, # Recreation {"name": "parks", "tags": ["leisure=park", "leisure=garden", "leisure=playground"]}, {"name": "sports", "tags": ["leisure=sports_centre", "leisure=fitness_centre", "leisure=swimming_pool"]}, # Culture and entertainment {"name": "entertainment", "tags": ["amenity=theatre", "amenity=cinema", "amenity=arts_centre"]}, # Other amenities {"name": "shopping", "tags": ["shop=mall", "shop=department_store", "shop=clothes"]}, {"name": "services", "tags": ["amenity=bank", "amenity=post_office", "amenity=atm"]} ] # Build overpass queries and collect results results = {} scores = {} for i, category in enumerate(categories): await ctx.report_progress(i, len(categories)) ctx.info(f"Analyzing {category['name']} in neighborhood...") # Convert radius to bounding box lat_delta = radius / 111000 lon_delta = radius / (111000 * math.cos(math.radians(latitude))) bbox = ( longitude - lon_delta, latitude - lat_delta, longitude + lon_delta, latitude + lat_delta ) # Build Overpass query overpass_url = "https://overpass-api.de/api/interpreter" # Create query for category tags tag_filters = [] for tag in category["tags"]: key, value = tag.split("=") tag_filters.append(f'node["{key}"="{value}"]({{bbox}});') tag_filters.append(f'way["{key}"="{value}"]({{bbox}});') query = f""" [out:json]; ( {" ".join(tag_filters)} ); out body; """ query = query.replace("{bbox}", f"{bbox[1]},{bbox[0]},{bbox[3]},{bbox[2]}") try: async with aiohttp.ClientSession() as session: async with session.post(overpass_url, data={"data": query}) as response: if response.status == 200: data = await response.json() features = data.get("elements", []) else: ctx.warning(f"Failed to analyze {category['name']}: {response.status}") features = [] # Process and calculate metrics feature_list = [] distances = [] for feature in features: tags = feature.get("tags", {}) # Get coordinates based on feature type coords = {} if feature.get("type") == "node": coords = { "latitude": feature.get("lat"), "longitude": feature.get("lon") } elif "center" in feature: coords = { "latitude": feature.get("center", {}).get("lat"), "longitude": feature.get("center", {}).get("lon") } # Skip if no valid coordinates if not coords: continue # Calculate distance from center point from math import radians, sin, cos, sqrt, asin def haversine(lat1, lon1, lat2, lon2): R = 6371000 # Earth radius in meters dLat = radians(lat2 - lat1) dLon = radians(lon2 - lon1) a = sin(dLat/2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dLon/2)**2 c = 2 * asin(sqrt(a)) return R * c distance = haversine(latitude, longitude, coords["latitude"], coords["longitude"]) distances.append(distance) feature_list.append({ "id": feature.get("id"), "name": tags.get("name", "Unnamed"), "type": feature.get("type"), "coordinates": coords, "distance": round(distance, 1), "tags": tags }) # Sort by distance feature_list.sort(key=lambda x: x["distance"]) # Calculate metrics count = len(feature_list) avg_distance = sum(distances) / count if count > 0 else None min_distance = min(distances) if count > 0 else None # Score this category (0-10) # Higher score for more amenities and closer proximity if count == 0: category_score = 0 else: # Base score on count and proximity count_score = min(count / 5, 1) * 5 # Up to 5 points for count proximity_score = 5 - min(min_distance / radius, 1) * 5 # Up to 5 points for proximity category_score = count_score + proximity_score # Store results results[category["name"]] = { "count": count, "features": feature_list[:10], # Limit to top 10 "metrics": { "total_count": count, "avg_distance": round(avg_distance, 1) if avg_distance else None, "min_distance": round(min_distance, 1) if min_distance else None } } scores[category["name"]] = category_score except Exception as e: ctx.warning(f"Error analyzing {category['name']}: {str(e)}") results[category["name"]] = {"error": str(e)} scores[category["name"]] = 0 # Calculate overall neighborhood score if scores: overall_score = sum(scores.values()) / len(scores) else: overall_score = 0 # Calculate walkability score based on amenities within walking distance (500m) walkable_amenities = 0 walkable_categories = 0 for category_name, category_data in results.items(): if "metrics" in category_data: # Count amenities within walking distance walking_count = sum(1 for feature in category_data.get("features", []) if feature.get("distance", float("inf")) <= 500) if walking_count > 0: walkable_amenities += walking_count walkable_categories += 1 walkability_score = min(walkable_amenities + walkable_categories, 10) # Report completion await ctx.report_progress(len(categories), len(categories)) return { "location": { "coordinates": { "latitude": latitude, "longitude": longitude }, "address": address_info.get("display_name", "Unknown location") }, "scores": { "overall": round(overall_score, 1), "walkability": walkability_score, "categories": {k: round(v, 1) for k, v in scores.items()} }, "categories": results, "analysis_radius": radius, "timestamp": datetime.now().isoformat() } @mcp.tool() async def find_parking_facilities( latitude: float, longitude: float, ctx: Context, radius: float = 1000, parking_type: str = None # e.g., "surface", "underground", "multi-storey" ) -> Dict[str, Any]: """ Locate parking facilities near a specific location. This tool finds parking options (lots, garages, street parking) near a specified location. Results can be filtered by parking type and include capacity information where available. Useful for trip planning, city navigation, and evaluating parking availability in urban areas. Args: latitude: Center point latitude (decimal degrees) longitude: Center point longitude (decimal degrees) radius: Search radius in meters (defaults to 1000m/1km) parking_type: Optional filter for specific types of parking facilities ("surface", "underground", "multi-storey", etc.) Returns: List of parking facilities with: - Name and type - Capacity information if available - Fee structure if available - Access restrictions - Distance from search point """ osm_client = ctx.request_context.lifespan_context.osm_client # Convert radius to bounding box lat_delta = radius / 111000 lon_delta = radius / (111000 * math.cos(math.radians(latitude))) bbox = ( longitude - lon_delta, latitude - lat_delta, longitude + lon_delta, latitude + lat_delta ) # Build Overpass query for parking facilities overpass_url = "https://overpass-api.de/api/interpreter" query = f""" [out:json]; ( node["amenity"="parking"]({{bbox}}); way["amenity"="parking"]({{bbox}}); relation["amenity"="parking"]({{bbox}}); ); out body; """ query = query.replace("{bbox}", f"{bbox[1]},{bbox[0]},{bbox[3]},{bbox[2]}") async with aiohttp.ClientSession() as session: async with session.post(overpass_url, data={"data": query}) as response: if response.status == 200: data = await response.json() parking_facilities = data.get("elements", []) else: raise Exception(f"Failed to find parking facilities: {response.status}") # Process and filter results results = [] for facility in parking_facilities: tags = facility.get("tags", {}) # Filter by parking type if specified if parking_type and tags.get("parking", "") != parking_type: continue # Get coordinates based on feature type coords = {} if facility.get("type") == "node": coords = { "latitude": facility.get("lat"), "longitude": facility.get("lon") } elif "center" in facility: coords = { "latitude": facility.get("center", {}).get("lat"), "longitude": facility.get("center", {}).get("lon") } # Skip if no valid coordinates if not coords: continue # Calculate distance from search point from math import radians, sin, cos, sqrt, asin def haversine(lat1, lon1, lat2, lon2): R = 6371000 # Earth radius in meters dLat = radians(lat2 - lat1) dLon = radians(lon2 - lon1) a = sin(dLat/2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dLon/2)**2 c = 2 * asin(sqrt(a)) return R * c distance = haversine(latitude, longitude, coords["latitude"], coords["longitude"]) results.append({ "id": facility.get("id"), "name": tags.get("name", "Unnamed Parking"), "type": tags.get("parking", "surface"), "coordinates": coords, "distance": round(distance, 1), "capacity": tags.get("capacity", "Unknown"), "fee": tags.get("fee", "Unknown"), "access": tags.get("access", "public"), "opening_hours": tags.get("opening_hours", "Unknown"), "levels": tags.get("levels", "1"), "address": { "street": tags.get("addr:street", ""), "housenumber": tags.get("addr:housenumber", ""), "city": tags.get("addr:city", ""), "postcode": tags.get("addr:postcode", "") }, "tags": tags }) # Sort by distance results.sort(key=lambda x: x["distance"]) return { "query": { "latitude": latitude, "longitude": longitude, "radius": radius, "parking_type": parking_type }, "parking_facilities": results, "count": len(results) } if __name__ == "__main__": mcp.run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jagan-shanmugam/open-streetmap-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server