search_stops_tool
Find bus stops by name or ID to get location coordinates and details for planning trips with CATA Bus services in State College, PA.
Instructions
Search for stops by name or ID.
Args: query: Search query string to match against stop names, IDs, or descriptions
Returns: List of matching stops with their ID, name, latitude, and longitude
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes |
Implementation Reference
- src/catabus_mcp/server.py:95-108 (handler)Handler function decorated with @mcp.tool, serving as both registration and execution entrypoint. Initializes data and delegates to search_stops helper.@mcp.tool async def search_stops_tool(query: str) -> List[Dict[str, Any]]: """Search for stops by name or ID. Args: query: Search query string to match against stop names, IDs, or descriptions Returns: List of matching stops with their ID, name, latitude, and longitude """ await ensure_initialized() if not gtfs_data or not gtfs_data.stops: return [] return await search_stops(gtfs_data, query)
- Core implementation performing fuzzy search on stop names, IDs, codes, and descriptions within GTFS data, returning sorted list of matches.async def search_stops(gtfs_data: GTFSData, query: str) -> List[Dict[str, Any]]: """ Search for stops by name or ID. Args: gtfs_data: The GTFS static data. query: Search query string. Returns: List of matching stops with id, name, latitude, and longitude. """ query_lower = query.lower() results = [] for stop_id, stop in gtfs_data.stops.items(): # Search in stop ID, name, code, and description if (query_lower in stop.stop_id.lower() or query_lower in stop.stop_name.lower() or (stop.stop_code and query_lower in stop.stop_code.lower()) or (stop.stop_desc and query_lower in stop.stop_desc.lower())): results.append({ "stop_id": stop.stop_id, "name": stop.stop_name, "lat": stop.stop_lat, "lon": stop.stop_lon, }) # Sort by name for consistency results.sort(key=lambda x: x["name"]) return results
- src/catabus_mcp/server.py:15-95 (registration)Import of the search_stops helper function used by the handler.from .tools.search_stops import search_stops from .tools.trip_alerts import trip_alerts from .tools.vehicle_positions import vehicle_positions # Configure logging for cloud environment logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Initialize FastMCP server mcp = FastMCP("catabus-mcp", version="0.1.0") # Global data stores static_loader = StaticGTFSLoader() realtime_poller = RealtimeGTFSPoller() gtfs_data = None initialized = False async def ensure_initialized(): """Lazy initialization of GTFS data with comprehensive error handling.""" global gtfs_data, initialized if not initialized: logger.info("Starting GTFS data initialization...") # Initialize with empty data first to ensure server always works from .ingest.static_loader import GTFSData from .ingest.realtime_poll import RealtimeData try: # Try to load static GTFS data with aggressive timeout gtfs_data = await asyncio.wait_for( static_loader.load_feed(force_refresh=False, timeout_seconds=10), timeout=15.0 # Maximum 15 seconds for cloud environments ) logger.info(f"GTFS data loaded: {len(gtfs_data.routes)} routes, {len(gtfs_data.stops)} stops") except asyncio.TimeoutError: logger.warning("GTFS data loading timed out - using empty dataset") gtfs_data = GTFSData() except Exception as e: logger.warning(f"GTFS data loading failed: {e} - using empty dataset") gtfs_data = GTFSData() # Start realtime polling (non-blocking now) try: await realtime_poller.start() logger.info("Real-time polling started successfully") except Exception as e: logger.warning(f"Real-time polling failed to start: {e} - continuing without real-time data") realtime_poller.data = RealtimeData() initialized = True logger.info("Server initialization completed") def _is_cloud_environment() -> bool: """Detect if running in FastMCP Cloud or similar environment.""" return ( os.environ.get('FASTMCP_CLOUD') or os.environ.get('LAMBDA_RUNTIME_DIR') or os.environ.get('AWS_LAMBDA_FUNCTION_NAME') or (os.path.exists('/tmp') and not os.path.exists(os.path.expanduser('~'))) ) @mcp.tool async def list_routes_tool() -> List[Dict[str, Any]]: """List all available bus routes. Returns a list of routes with their ID, short name, long name, and color. """ await ensure_initialized() if not gtfs_data or not gtfs_data.routes: return [] return await list_routes(gtfs_data) @mcp.tool