Skip to main content
Glama

MVG Störung MCP Server (inofficial)

by rmoriz
Creative Commons Zero v1.0 Universal
mvg_mcp_server.py12.8 kB
#!/usr/bin/env python3 """ MCP Server for MVG Störung (Munich Public Transport Disruptions) Provides cached access to MVG incident data with at least 10-minute caching Disclaimer: This project is not an official MVG project, not endorsed or recommended. People should ask MVG for permission prior using it. """ import asyncio import json import logging import sys from datetime import datetime, timedelta from typing import Any, Dict, List, Optional import httpx from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions from mcp.server.stdio import stdio_server from mcp.types import ( Resource, Tool, TextContent, ) from pydantic import BaseModel # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class CachedData(BaseModel): """Model for cached MVG data""" data: List[Dict[str, Any]] timestamp: datetime expires_at: datetime class MVGCache: """Simple in-memory cache for MVG data with 10+ minute expiration""" def __init__(self, cache_duration_minutes: int = 10): self.cache_duration = timedelta(minutes=cache_duration_minutes) self._cached_data: Optional[CachedData] = None def is_expired(self) -> bool: """Check if cached data is expired""" if self._cached_data is None: return True return datetime.now() >= self._cached_data.expires_at def get(self) -> Optional[List[Dict[str, Any]]]: """Get cached data if not expired""" if self.is_expired(): return None return self._cached_data.data def set(self, data: List[Dict[str, Any]]) -> None: """Cache new data with expiration""" now = datetime.now() self._cached_data = CachedData( data=data, timestamp=now, expires_at=now + self.cache_duration ) logger.info(f"Cached {len(data)} incidents, expires at {self._cached_data.expires_at}") def get_cache_info(self) -> Dict[str, Any]: """Get information about cache status""" if self._cached_data is None: return {"status": "empty", "cached_items": 0} return { "status": "expired" if self.is_expired() else "valid", "cached_items": len(self._cached_data.data), "cached_at": self._cached_data.timestamp.isoformat(), "expires_at": self._cached_data.expires_at.isoformat(), "cache_duration_minutes": self.cache_duration.total_seconds() / 60 } class MVGDataFetcher: """Fetches and processes MVG disruption data""" MVG_API_URL = "https://www.mvg.de/api/bgw-pt/v3/messages" def __init__(self): self.client = httpx.AsyncClient(timeout=30.0) async def fetch_raw_data(self) -> Dict[str, Any]: """Fetch raw data from MVG API""" try: response = await self.client.get(self.MVG_API_URL) response.raise_for_status() return response.json() except httpx.RequestError as e: logger.error(f"Error fetching data from MVG API: {e}") raise except json.JSONDecodeError as e: logger.error(f"Error parsing JSON response: {e}") raise def filter_incidents(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: """Filter messages to return only INCIDENT type elements""" incidents = [] # Handle different possible data structures messages = [] if isinstance(data, list): messages = data elif isinstance(data, dict): # Try common keys where messages might be stored for key in ["messages", "data", "items", "results"]: if key in data and isinstance(data[key], list): messages = data[key] break else: # If no common key found, check if the dict itself contains type field if "type" in data: messages = [data] # Filter for INCIDENT type for message in messages: if isinstance(message, dict) and message.get("type") == "INCIDENT": incidents.append(message) return incidents def format_timestamp(self, timestamp: int) -> str: """Convert Unix timestamp (milliseconds) to readable format""" try: dt = datetime.fromtimestamp(timestamp / 1000) return dt.strftime("%d.%m.%Y %H:%M") except (ValueError, OSError): return str(timestamp) def enhance_incident_data(self, incident: Dict[str, Any]) -> Dict[str, Any]: """Add human-readable fields to incident data""" enhanced = incident.copy() # Add readable timestamps if "publication" in enhanced and isinstance(enhanced["publication"], int): enhanced["publication_readable"] = self.format_timestamp(enhanced["publication"]) if "validFrom" in enhanced and isinstance(enhanced["validFrom"], int): enhanced["validFrom_readable"] = self.format_timestamp(enhanced["validFrom"]) if "validTo" in enhanced and isinstance(enhanced["validTo"], int): enhanced["validTo_readable"] = self.format_timestamp(enhanced["validTo"]) return enhanced async def fetch_incidents(self) -> List[Dict[str, Any]]: """Fetch and process incident data""" raw_data = await self.fetch_raw_data() incidents = self.filter_incidents(raw_data) # Enhance each incident with readable timestamps enhanced_incidents = [self.enhance_incident_data(incident) for incident in incidents] logger.info(f"Fetched {len(enhanced_incidents)} incidents from MVG API") return enhanced_incidents async def close(self): """Close the HTTP client""" await self.client.aclose() # Create server instance server = Server("mvg-stoerung") # Initialize components cache = MVGCache(cache_duration_minutes=10) fetcher = MVGDataFetcher() async def get_incidents(force_refresh: bool = False) -> List[Dict[str, Any]]: """Get incidents with caching""" if not force_refresh: cached_data = cache.get() if cached_data is not None: logger.info(f"Returning {len(cached_data)} cached incidents") return cached_data # Fetch fresh data logger.info("Fetching fresh data from MVG API") incidents = await fetcher.fetch_incidents() cache.set(incidents) return incidents @server.list_resources() async def handle_list_resources() -> list[Resource]: """List available resources""" return [ Resource( uri="mvg://incidents", name="MVG Incidents", description="Current incidents from Munich Public Transport (MVG)", mimeType="application/json", ), Resource( uri="mvg://cache-info", name="Cache Information", description="Information about the current cache status", mimeType="application/json", ), ] @server.read_resource() async def handle_read_resource(uri: str) -> str: """Read resource content""" if uri == "mvg://incidents": incidents = await get_incidents() return json.dumps(incidents, indent=2, ensure_ascii=False) elif uri == "mvg://cache-info": cache_info = cache.get_cache_info() return json.dumps(cache_info, indent=2, ensure_ascii=False) else: raise ValueError(f"Unknown resource: {uri}") @server.list_tools() async def handle_list_tools() -> list[Tool]: """List available tools""" return [ Tool( name="get_mvg_incidents", description="Get current MVG incidents (cached for 10+ minutes)", inputSchema={ "type": "object", "properties": { "force_refresh": { "type": "boolean", "description": "Force refresh cache even if not expired", "default": False } } }, ), Tool( name="get_cache_status", description="Get information about the cache status", inputSchema={ "type": "object", "properties": {} }, ), Tool( name="search_incidents", description="Search incidents by line, title, or description", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Search query (searches in title, description, and line labels)" }, "line": { "type": "string", "description": "Filter by specific line (e.g., 'U6', 'S1', 'Bus 100')" } }, "required": ["query"] }, ), ] @server.call_tool() async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]: """Handle tool calls""" if name == "get_mvg_incidents": force_refresh = arguments.get("force_refresh", False) incidents = await get_incidents(force_refresh=force_refresh) result = { "incidents": incidents, "count": len(incidents), "cache_info": cache.get_cache_info() } return [TextContent( type="text", text=json.dumps(result, indent=2, ensure_ascii=False) )] elif name == "get_cache_status": cache_info = cache.get_cache_info() return [TextContent( type="text", text=json.dumps(cache_info, indent=2, ensure_ascii=False) )] elif name == "search_incidents": query = arguments.get("query", "").lower() line_filter = arguments.get("line", "").upper() incidents = await get_incidents() filtered_incidents = [] for incident in incidents: # Search in title and description title_match = query in incident.get("title", "").lower() desc_match = query in incident.get("description", "").lower() # Search in line labels line_match = False if "lines" in incident: for line in incident["lines"]: if isinstance(line, dict) and "label" in line: if query in line["label"].lower(): line_match = True break # Apply line filter if specified line_filter_match = True if line_filter: line_filter_match = False if "lines" in incident: for line in incident["lines"]: if isinstance(line, dict) and "label" in line: if line_filter in line["label"].upper(): line_filter_match = True break if (title_match or desc_match or line_match) and line_filter_match: filtered_incidents.append(incident) result = { "incidents": filtered_incidents, "count": len(filtered_incidents), "query": arguments.get("query"), "line_filter": arguments.get("line"), "total_incidents": len(incidents) } return [TextContent( type="text", text=json.dumps(result, indent=2, ensure_ascii=False) )] else: raise ValueError(f"Unknown tool: {name}") async def main(): """Main entry point""" try: async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="mvg-stoerung", server_version="1.0.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) except KeyboardInterrupt: logger.info("Server interrupted") finally: await fetcher.close() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rmoriz/mvg_stoerung_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server