#!/usr/bin/env python3
"""
MCP Server for Traffic Prediction API with SSE Transport
This server exposes tools to interact with a traffic prediction REST API:
1. get_traffic_stations - Get available traffic stations
2. get_actual_traffic - Get current traffic data and historical sequence for a station
3. predict_traffic_spi - Predict Speed Performance Index using LSTM model (requires historical data from get_actual_traffic)
4. suggest_routes - Suggest optimal routes between stations
5. geocode_location - Convert location names to coordinates
6. get_traffic_at_location - Get traffic stations near a specific location
Supports remote deployment via SSE (Server-Sent Events) transport.
IMPORTANT: Traffic predictions now require real historical data.
Use get_actual_traffic first to obtain the historical_sequence, then pass it to predict_traffic_spi.
"""
import os
import json
import logging
from typing import Any, Dict, Optional, List, Tuple
from math import radians, sin, cos, sqrt, atan2
import httpx
from mcp.server import Server
from mcp.types import Tool, TextContent
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.requests import Request
from starlette.responses import Response
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# API Configuration
TRAFFIC_API_URL = os.getenv(
"TRAFFIC_API_URL"
)
PORT = int(os.getenv("PORT", 8080))
MAPBOX_ACCESS_TOKEN = os.getenv(
"MAPBOX_ACCESS_TOKEN"
)
# Initialize MCP server
mcp_server = Server("traffic-mcp")
# HTTP client with timeout configuration
client_config = httpx.Timeout(30.0, connect=10.0)
# ========== GEOCODING HELPER FUNCTIONS ==========
def haversine_distance(coord1: Tuple[float, float], coord2: Tuple[float, float]) -> float:
"""
Calculate distance in km between two coordinates (lat, lon)
Args:
coord1: Tuple of (latitude, longitude)
coord2: Tuple of (latitude, longitude)
Returns:
Distance in kilometers
"""
lat1, lon1 = radians(coord1[0]), radians(coord1[1])
lat2, lon2 = radians(coord2[0]), radians(coord2[1])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
R = 6371 # Earth radius in km
return R * c
async def geocode_location(location: str, city: str = "Los Angeles", state: str = "California") -> Optional[Tuple[float, float]]:
"""
Geocode a location using OpenStreetMap Nominatim API
Args:
location: Name of place (neighborhood, address, point of interest)
city: City for the search (default: Los Angeles)
state: State for the search (default: California)
Returns:
Tuple of (latitude, longitude) or None if not found
"""
url = "https://nominatim.openstreetmap.org/search"
params = {
"q": f"{location}, {city}, {state}",
"format": "json",
"limit": 1
}
headers = {"User-Agent": "TrafficMCPServer/1.0"}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
if data:
return float(data[0]["lat"]), float(data[0]["lon"])
return None
except Exception as e:
logger.error(f"Error geocoding location: {e}")
return None
def find_nearest_stations(lat: float, lon: float, stations: List[Dict], max_results: int = 5) -> List[Dict]:
"""
Find nearest stations to a given coordinate
Args:
lat: Latitude
lon: Longitude
stations: List of station dictionaries
max_results: Maximum number of stations to return
Returns:
List of nearest stations with distance information
"""
if not stations:
return []
# Calculate distances
stations_with_distance = []
for station in stations:
try:
distance = haversine_distance(
(lat, lon),
(station['Latitude'], station['Longitude'])
)
station_copy = station.copy()
station_copy['distance_km'] = round(distance, 2)
stations_with_distance.append(station_copy)
except (KeyError, TypeError) as e:
logger.warning(f"Skipping station due to missing coordinates: {e}")
continue
# Sort by distance and return nearest
stations_with_distance.sort(key=lambda x: x['distance_km'])
return stations_with_distance[:max_results]
def generate_map_urls(lat: float, lon: float, location_name: str = "", zoom: int = 13) -> Dict[str, str]:
"""
Generate map URLs for various mapping services
Args:
lat: Latitude
lon: Longitude
location_name: Optional name of the location for labels
zoom: Zoom level for the map (default: 13)
Returns:
Dictionary with map URLs for different services
"""
# Google Maps URL
google_maps_url = f"https://www.google.com/maps?q={lat},{lon}&z={zoom}"
# OpenStreetMap URL
osm_url = f"https://www.openstreetmap.org/?mlat={lat}&mlon={lon}&zoom={zoom}"
# Static map image from OpenStreetMap (via Static Map API)
# Using a public static map service
static_map_url = f"https://www.openstreetmap.org/export/embed.html?bbox={lon-0.01},{lat-0.01},{lon+0.01},{lat+0.01}&layer=mapnik&marker={lat},{lon}"
# Apple Maps URL (works on iOS/macOS)
apple_maps_url = f"https://maps.apple.com/?q={lat},{lon}&z={zoom}"
return {
"google_maps": google_maps_url,
"openstreetmap": osm_url,
"static_map": static_map_url,
"apple_maps": apple_maps_url
}
def generate_multi_point_map_url(points: List[Tuple[float, float, str]]) -> str:
"""
Generate a Google Maps URL showing multiple points
Args:
points: List of tuples (lat, lon, label)
Returns:
Google Maps URL with multiple markers
"""
if not points:
return ""
# Start with the first point as center
base_lat, base_lon, _ = points[0]
# Build markers string
markers = []
for lat, lon, label in points:
# URL encode label
encoded_label = label.replace(" ", "+")
markers.append(f"{lat},{lon}")
# Google Maps URL with multiple markers
markers_str = "|".join(markers)
url = f"https://www.google.com/maps/dir/{markers_str}"
return url
async def get_mapbox_route(origin: Tuple[float, float], destination: Tuple[float, float]) -> Optional[Dict[str, Any]]:
"""
Get route from Mapbox Directions API
Args:
origin: Tuple of (latitude, longitude) for origin
destination: Tuple of (latitude, longitude) for destination
Returns:
Dictionary with routes from Mapbox API or None if request fails
"""
# Mapbox expects coordinates as lon,lat
origin_str = f"{origin[1]},{origin[0]}"
destination_str = f"{destination[1]},{destination[0]}"
url = f"https://api.mapbox.com/directions/v5/mapbox/driving/{origin_str};{destination_str}"
params = {
"alternatives": "true",
"geometries": "geojson",
"overview": "simplified",
"steps": "false",
"access_token": MAPBOX_ACCESS_TOKEN
}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
if data.get("code") == "Ok":
return data
else:
logger.error(f"Mapbox API returned error code: {data.get('code')}")
return None
except Exception as e:
logger.error(f"Error calling Mapbox API: {e}")
return None
def find_nearest_stations_to_route(route_coords: List[Tuple[float, float]], stations: List[Dict], max_distance_km: float = 2.0) -> Dict[int, List[Dict]]:
"""
Find nearest stations to route coordinates
Args:
route_coords: List of (latitude, longitude) tuples along the route
stations: List of station dictionaries
max_distance_km: Maximum distance in km to consider a station (default: 2km)
Returns:
Dictionary mapping segment index to list of nearby stations with distance info
"""
# Divide route into segments (approximately 10-20 segments)
num_segments = min(20, max(10, len(route_coords) // 5))
segment_size = len(route_coords) // num_segments
if segment_size < 1:
segment_size = 1
segment_stations = {}
for i in range(0, len(route_coords), segment_size):
segment_idx = i // segment_size
segment_coord = route_coords[i]
# Find stations within max_distance_km of this segment point
nearby = []
for station in stations:
try:
distance = haversine_distance(
segment_coord,
(station['Latitude'], station['Longitude'])
)
if distance <= max_distance_km:
station_copy = station.copy()
station_copy['distance_km'] = round(distance, 2)
nearby.append(station_copy)
except (KeyError, TypeError) as e:
continue
# Sort by distance and keep closest ones
nearby.sort(key=lambda x: x['distance_km'])
segment_stations[segment_idx] = nearby[:3] # Keep top 3 closest stations per segment
return segment_stations
async def predict_traffic_for_route(segment_stations: Dict[int, List[Dict]], timestamp: Optional[str] = None) -> Dict[int, Dict]:
"""
Predict traffic for route segments based on nearby stations
Args:
segment_stations: Dictionary mapping segment index to list of nearby stations
timestamp: Timestamp for prediction in ISO 8601 format (required for real data)
Returns:
Dictionary mapping segment index to aggregated traffic prediction
"""
segment_predictions = {}
# Validate timestamp is provided
if not timestamp:
logger.warning("No timestamp provided for route traffic prediction")
# Return error for all segments
for segment_idx, stations in segment_stations.items():
segment_predictions[segment_idx] = {
"status": "missing_timestamp",
"spi": None,
"congestion_level": "unknown",
"message": "Timestamp is required for traffic predictions. Please provide a timestamp in ISO 8601 format."
}
return segment_predictions
for segment_idx, stations in segment_stations.items():
if not stations:
segment_predictions[segment_idx] = {
"status": "no_nearby_stations",
"spi": None,
"congestion_level": "unknown"
}
continue
# Get predictions for each nearby station
station_predictions = []
for station in stations:
try:
station_id = station.get('ID')
# Get actual traffic data with historical sequence
actual_traffic_response = await make_api_request(
"POST",
"/traffic/actual",
data={
"station_id": station_id,
"timestamp": timestamp
}
)
# Extract historical sequence
historical_sequence = actual_traffic_response.get("historical_sequence")
if not historical_sequence:
logger.warning(f"No historical sequence returned for station {station_id}")
continue
# Get prediction from API using the historical sequence
prediction_response = await make_api_request(
"POST",
"/predict",
data={"sequence": historical_sequence}
)
station_predictions.append({
"station_id": station_id,
"station_name": station.get('Name'),
"distance_km": station.get('distance_km'),
"spi": prediction_response.get("spi_predicted"),
"congestion_level": prediction_response.get("congestion_level"),
"congestion_label": prediction_response.get("congestion_label"),
"traffic_state": prediction_response.get("traffic_state")
})
except Exception as e:
logger.error(f"Error getting prediction for station {station.get('ID')}: {e}")
continue
# Aggregate predictions (weighted by distance - closer stations have more weight)
if station_predictions:
# Weight by inverse distance
total_weight = sum(1 / max(p['distance_km'], 0.1) for p in station_predictions)
weighted_spi = sum(p['spi'] * (1 / max(p['distance_km'], 0.1)) for p in station_predictions) / total_weight
# Use the prediction from the closest station for categorical values
closest_prediction = station_predictions[0]
segment_predictions[segment_idx] = {
"status": "success",
"spi": round(weighted_spi, 2),
"congestion_level": closest_prediction['congestion_level'],
"congestion_label": closest_prediction['congestion_label'],
"traffic_state": closest_prediction['traffic_state'],
"stations_used": len(station_predictions),
"station_details": station_predictions
}
else:
segment_predictions[segment_idx] = {
"status": "prediction_failed",
"spi": None,
"congestion_level": "unknown"
}
return segment_predictions
async def make_api_request(
method: str,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Make an API request with error handling.
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint path
data: JSON data for POST requests
params: Query parameters for GET requests
Returns:
API response as dictionary
Raises:
Exception: If the request fails
"""
url = f"{TRAFFIC_API_URL}{endpoint}"
try:
async with httpx.AsyncClient(timeout=client_config) as client:
if method.upper() == "GET":
response = await client.get(url, params=params)
elif method.upper() == "POST":
response = await client.post(url, json=data)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error: {e.response.status_code} - {e.response.text}")
raise Exception(f"API request failed with status {e.response.status_code}: {e.response.text}")
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
raise Exception(f"Failed to connect to API: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
raise Exception(f"Unexpected error occurred: {str(e)}")
@mcp_server.list_tools()
async def list_tools() -> list[Tool]:
"""
List all available tools.
Returns:
List of Tool objects defining the available MCP tools
"""
return [
Tool(
name="get_traffic_stations",
description=(
"Obtiene la lista de estaciones de tráfico disponibles en el área de Los Ángeles con su información "
"geográfica y características. Permite filtrar por autopista y dirección. "
"IMPORTANTE: Usa 'limit' para controlar cuántas estaciones retornar (máximo 50 por defecto). "
"Para consultas generales, usa limit=10-20. Para búsquedas específicas, usa filtros. "
"Las estaciones cubren el condado de Los Ángeles y áreas metropolitanas cercanas."
),
inputSchema={
"type": "object",
"properties": {
"freeway": {
"type": "integer",
"description": "Filtrar por número de autopista (opcional)"
},
"direction": {
"type": "string",
"description": "Filtrar por dirección: N, S, E, W (opcional)",
"enum": ["N", "S", "E", "W"]
},
"limit": {
"type": "integer",
"description": "Número máximo de estaciones a retornar (1-100, default: 50)",
"minimum": 1,
"maximum": 100,
"default": 50
}
},
"required": []
}
),
Tool(
name="get_actual_traffic",
description=(
"Obtiene los datos actuales de tráfico para una estación específica en un timestamp dado. "
"Retorna: condiciones actuales (flujo, ocupancy, velocidad), metadata de la estación, "
"secuencia histórica de los últimos 12 intervalos (1 hora), y contexto temporal. "
"Esta herramienta DEBE llamarse ANTES de predict_traffic_spi para obtener datos históricos reales."
),
inputSchema={
"type": "object",
"properties": {
"station_id": {
"type": "integer",
"description": "ID de la estación de tráfico"
},
"timestamp": {
"type": "string",
"description": "Timestamp en formato ISO 8601 (ej: '2025-09-15T08:30:00')"
}
},
"required": ["station_id", "timestamp"]
}
),
Tool(
name="predict_traffic_spi",
description=(
"Predice el índice de rendimiento de velocidad (SPI) del tráfico usando una secuencia histórica de 12 intervalos. "
"IMPORTANTE: Debes obtener primero la secuencia usando get_actual_traffic, luego extraer el campo 'historical_sequence' y pasarlo aquí. "
"Ya NO se generan datos automáticamente - DEBES proporcionar la secuencia real."
),
inputSchema={
"type": "object",
"properties": {
"sequence": {
"type": "array",
"description": "Lista de 12 intervalos de 5 minutos con datos históricos. DEBE obtenerse del campo 'historical_sequence' de get_actual_traffic. Cada intervalo contiene [Total_Flow, Avg_Occupancy, Avg_Speed, Hour, Day_of_Week, Lanes, Lane_Type_encoded]",
"items": {
"type": "array",
"description": "[Total_Flow, Avg_Occupancy, Avg_Speed, Hour, Day_of_Week, Lanes, Lane_Type_encoded]",
"minItems": 7,
"maxItems": 7,
"items": {
"type": "number"
}
},
"minItems": 12,
"maxItems": 12
}
},
"required": ["sequence"]
}
),
Tool(
name="suggest_routes",
description=(
"Sugiere rutas óptimas entre dos ubicaciones usando Mapbox Directions API y combina "
"estas rutas con predicciones de tráfico de las estaciones más cercanas a los puntos "
"de ruta. Devuelve múltiples rutas alternativas con información de tráfico para cada segmento. "
"IMPORTANTE: Ahora requiere un timestamp para obtener predicciones de tráfico reales. "
"Útil para consultas como '¿Cuál es la mejor ruta de X a Y considerando tráfico?'"
),
inputSchema={
"type": "object",
"properties": {
"origin_coords": {
"type": "array",
"description": "Coordenadas de origen [latitud, longitud]",
"items": {
"type": "number"
},
"minItems": 2,
"maxItems": 2
},
"destination_coords": {
"type": "array",
"description": "Coordenadas de destino [latitud, longitud]",
"items": {
"type": "number"
},
"minItems": 2,
"maxItems": 2
},
"timestamp": {
"type": "string",
"description": "Timestamp para la predicción de tráfico en formato ISO 8601 (ej: '2025-09-15T08:30:00'). REQUERIDO para obtener predicciones de tráfico."
},
"max_distance_km": {
"type": "number",
"description": "Radio máximo en km para considerar estaciones cercanas (default: 2.0)",
"minimum": 0.5,
"maximum": 10.0,
"default": 2.0
}
},
"required": ["origin_coords", "destination_coords", "timestamp"]
}
),
Tool(
name="geocode_location",
description=(
"Convierte el nombre de una ubicación en el área de Los Ángeles (vecindario, dirección, punto de interés) en "
"coordenadas geográficas (latitud, longitud). Esta herramienta SOLO geocodifica, "
"NO consulta información de tráfico. Usar esta herramienta primero para obtener "
"coordenadas, luego usar get_traffic_at_location para consultar tráfico. "
"Por defecto busca en Los Ángeles, California. "
"Los resultados pueden ser mostrados en un mapa interactivo por el cliente."
),
inputSchema={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "Nombre del lugar en Los Ángeles (vecindario, dirección, punto de interés). Ejemplos: 'Downtown', 'Santa Monica', 'Hollywood', 'Venice Beach'"
},
"city": {
"type": "string",
"description": "Ciudad para la búsqueda (default: Los Angeles)",
"default": "Los Angeles"
},
"state": {
"type": "string",
"description": "Estado para la búsqueda (default: California)",
"default": "California"
}
},
"required": ["location"]
}
),
Tool(
name="get_traffic_at_location",
description=(
"Obtiene información de tráfico para unas coordenadas específicas. Encuentra las "
"estaciones de monitoreo más cercanas y sus predicciones de tráfico. Los datos "
"retornados incluyen coordenadas de cada estación para que el cliente pueda "
"renderizarlas en un mapa interactivo con React Leaflet. "
"IMPORTANTE: Ahora requiere un timestamp para obtener predicciones de tráfico reales. "
"Usar después de geocode_location."
),
inputSchema={
"type": "object",
"properties": {
"latitude": {
"type": "number",
"description": "Latitud de la ubicación"
},
"longitude": {
"type": "number",
"description": "Longitud de la ubicación"
},
"location_name": {
"type": "string",
"description": "Nombre de la ubicación (para referencia)"
},
"timestamp": {
"type": "string",
"description": "Timestamp para las predicciones de tráfico en formato ISO 8601 (ej: '2025-09-15T08:30:00'). REQUERIDO para obtener predicciones de tráfico."
},
"max_stations": {
"type": "integer",
"description": "Número máximo de estaciones cercanas a retornar (1-20, default: 5)",
"minimum": 1,
"maximum": 20,
"default": 5
},
"include_predictions": {
"type": "boolean",
"description": "Incluir predicciones de tráfico para cada estación (default: true)",
"default": True
}
},
"required": ["latitude", "longitude", "timestamp"]
}
)
]
@mcp_server.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""
Handle tool execution requests.
Args:
name: Name of the tool to execute
arguments: Tool arguments as dictionary
Returns:
List of TextContent with the tool execution results
"""
try:
if name == "get_traffic_stations":
return await handle_get_stations(arguments)
elif name == "get_actual_traffic":
return await handle_get_actual_traffic(arguments)
elif name == "predict_traffic_spi":
return await handle_predict_spi(arguments)
elif name == "suggest_routes":
return await handle_suggest_routes(arguments)
elif name == "geocode_location":
return await handle_geocode_simple(arguments)
elif name == "get_traffic_at_location":
return await handle_traffic_at_location(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Error executing tool {name}: {str(e)}")
return [TextContent(
type="text",
text=f"Error: {str(e)}"
)]
async def handle_get_stations(arguments: Dict[str, Any]) -> list[TextContent]:
"""
Handle get_traffic_stations tool execution.
Args:
arguments: Tool arguments containing optional freeway, direction, and limit filters
Returns:
List of TextContent with formatted station information
"""
# Get all stations from API
response = await make_api_request("GET", "/stations")
stations = response.get("stations", [])
total = response.get("total", 0)
# Apply filters if provided
freeway = arguments.get("freeway")
direction = arguments.get("direction")
limit = arguments.get("limit", 50) # Default to 50 stations
if freeway is not None:
stations = [s for s in stations if s.get("Fwy") == freeway]
if direction:
stations = [s for s in stations if s.get("Dir") == direction.upper()]
# Apply limit
filtered_count = len(stations)
stations = stations[:limit]
# Format the response
result = {
"returned_stations": len(stations),
"filtered_count": filtered_count,
"total_available": total,
"filters_applied": {
"freeway": freeway,
"direction": direction,
"limit": limit
},
"stations": stations
}
# Add helpful message if results were truncated
if filtered_count > limit:
result["note"] = f"Mostrando {limit} de {filtered_count} estaciones. Usa filtros más específicos o aumenta el 'limit' si necesitas más."
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
async def handle_get_actual_traffic(arguments: Dict[str, Any]) -> list[TextContent]:
"""
Handle get_actual_traffic tool execution.
Args:
arguments: Tool arguments containing station_id and timestamp
Returns:
List of TextContent with current traffic data and historical sequence
"""
station_id = arguments.get("station_id")
timestamp = arguments.get("timestamp")
# Validate required parameters
if station_id is None:
raise ValueError("station_id parameter is required")
if not timestamp:
raise ValueError("timestamp parameter is required")
logger.info(f"Getting actual traffic for station {station_id} at {timestamp}")
# Make request to /traffic/actual endpoint
response = await make_api_request(
"POST",
"/traffic/actual",
data={
"station_id": station_id,
"timestamp": timestamp
}
)
# Return the complete JSON response
return [TextContent(
type="text",
text=json.dumps(response, indent=2)
)]
async def handle_predict_spi(arguments: Dict[str, Any]) -> list[TextContent]:
"""
Handle predict_traffic_spi tool execution.
Args:
arguments: Tool arguments containing sequence of traffic data from get_actual_traffic
Returns:
List of TextContent with SPI prediction and fuzzy classification
"""
sequence = arguments.get("sequence")
# Validate that sequence is provided
if not sequence:
raise ValueError(
"sequence parameter is required. "
"You must first call get_actual_traffic to obtain the 'historical_sequence' field, "
"then pass it to this function."
)
# Validate sequence
if len(sequence) != 12:
raise ValueError(f"sequence must contain exactly 12 intervals, got {len(sequence)}")
for i, interval in enumerate(sequence):
if len(interval) != 7:
raise ValueError(f"Interval {i} must contain exactly 7 values, got {len(interval)}")
# Make prediction request
response = await make_api_request("POST", "/predict", data={"sequence": sequence})
# Format the response with detailed information
result = {
"prediction": {
"spi": response.get("spi_predicted"),
"congestion_level": response.get("congestion_level"),
"congestion_label": response.get("congestion_label"),
"traffic_state": response.get("traffic_state"),
"confidence_level": response.get("confidence_level")
},
"fuzzy_classification": response.get("fuzzy_classification", {}),
"status": response.get("status")
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
async def handle_suggest_routes(arguments: Dict[str, Any]) -> list[TextContent]:
"""
Handle suggest_routes tool execution using Mapbox Directions API.
Args:
arguments: Tool arguments containing origin_coords, destination_coords, timestamp, and max_distance_km
Returns:
List of TextContent with route suggestions enriched with traffic predictions
"""
origin_coords = arguments.get("origin_coords")
destination_coords = arguments.get("destination_coords")
timestamp = arguments.get("timestamp")
max_distance_km = arguments.get("max_distance_km", 2.0)
# Validate required parameters
if not origin_coords or len(origin_coords) != 2:
raise ValueError("origin_coords must be an array of [latitude, longitude]")
if not destination_coords or len(destination_coords) != 2:
raise ValueError("destination_coords must be an array of [latitude, longitude]")
origin_lat, origin_lon = origin_coords
dest_lat, dest_lon = destination_coords
logger.info(f"Getting routes from ({origin_lat}, {origin_lon}) to ({dest_lat}, {dest_lon})")
# Get routes from Mapbox
mapbox_response = await get_mapbox_route(
(origin_lat, origin_lon),
(dest_lat, dest_lon)
)
if not mapbox_response:
return [TextContent(
type="text",
text=json.dumps({
"status": "error",
"message": "No se pudo obtener rutas de Mapbox API"
}, indent=2)
)]
# Get all traffic stations
stations_response = await make_api_request("GET", "/stations")
all_stations = stations_response.get("stations", [])
if not all_stations:
return [TextContent(
type="text",
text=json.dumps({
"status": "error",
"message": "No se pudieron obtener las estaciones de tráfico"
}, indent=2)
)]
# Process each route alternative
routes_with_traffic = []
for route_idx, route in enumerate(mapbox_response.get("routes", [])):
geometry = route.get("geometry", {})
coordinates = geometry.get("coordinates", [])
# Convert from [lon, lat] to [lat, lon]
route_coords = [(coord[1], coord[0]) for coord in coordinates]
logger.info(f"Processing route {route_idx + 1} with {len(route_coords)} coordinates")
# Find nearest stations to route segments
segment_stations = find_nearest_stations_to_route(
route_coords,
all_stations,
max_distance_km
)
# Predict traffic for each segment
segment_predictions = await predict_traffic_for_route(
segment_stations,
timestamp
)
# Calculate overall route metrics
valid_predictions = [p for p in segment_predictions.values() if p.get('spi') is not None]
if valid_predictions:
avg_spi = sum(p['spi'] for p in valid_predictions) / len(valid_predictions)
avg_congestion_level = sum(p['congestion_level'] for p in valid_predictions) / len(valid_predictions)
# Determine overall traffic state
if avg_congestion_level < 2:
overall_state = "Fluido"
elif avg_congestion_level < 4:
overall_state = "Moderado"
else:
overall_state = "Congestionado"
else:
avg_spi = None
avg_congestion_level = None
overall_state = "Desconocido"
# Flatten all stations from all segments for frontend
# This creates a flat list of unique stations that the frontend can easily process
station_details_map = {} # Use dict to avoid duplicates
for segment_idx, prediction in segment_predictions.items():
if prediction.get('status') == 'success' and prediction.get('station_details'):
for station_detail in prediction['station_details']:
station_id = station_detail.get('station_id')
if station_id and station_id not in station_details_map:
# Find the full station data from segment_stations
for stations_list in segment_stations.values():
matching_station = next(
(s for s in stations_list if s.get('ID') == station_id),
None
)
if matching_station:
station_details_map[station_id] = {
'id': station_id,
'name': matching_station.get('Name', ''),
'latitude': matching_station.get('Latitude'),
'longitude': matching_station.get('Longitude'),
'freeway': matching_station.get('Fwy'),
'direction': matching_station.get('Dir'),
'distance_km': station_detail.get('distance_km'),
'spi': station_detail.get('spi'),
'congestion_level': station_detail.get('congestion_level'),
'traffic_state': station_detail.get('traffic_state')
}
break
station_details = list(station_details_map.values())
routes_with_traffic.append({
"route_index": route_idx + 1,
"distance_meters": route.get("distance"),
"distance_km": round(route.get("distance", 0) / 1000, 2),
"duration_seconds": route.get("duration"),
"duration_minutes": round(route.get("duration", 0) / 60, 1),
"summary": route.get("legs", [{}])[0].get("summary", ""),
"geometry": geometry,
"station_details": station_details, # Flat list for frontend
"traffic_analysis": {
"average_spi": round(avg_spi, 2) if avg_spi else None,
"average_congestion_level": round(avg_congestion_level, 1) if avg_congestion_level else None,
"overall_traffic_state": overall_state,
"segments_analyzed": len(segment_predictions),
"segments_with_data": len(valid_predictions),
"total_stations": len(station_details)
},
"segment_predictions": segment_predictions
})
# Determine best route
best_route_idx = 0
if routes_with_traffic:
# Rank by combination of traffic (lower is better) and duration
for idx, route in enumerate(routes_with_traffic):
avg_spi = route["traffic_analysis"]["average_spi"]
if avg_spi is None:
continue
# Lower SPI and shorter duration are better
route["score"] = (avg_spi * 0.6) + (route["duration_minutes"] * 0.4)
# Sort by score (lower is better)
routes_with_traffic.sort(key=lambda r: r.get("score", float('inf')))
best_route_idx = 0
# Build final result
result = {
"status": "success",
"origin": {
"latitude": origin_lat,
"longitude": origin_lon
},
"destination": {
"latitude": dest_lat,
"longitude": dest_lon
},
"routes_count": len(routes_with_traffic),
"routes": routes_with_traffic,
"recommendation": {
"best_route_index": best_route_idx + 1,
"reason": f"Ruta con mejor combinación de tráfico y tiempo de viaje"
},
"waypoints": mapbox_response.get("waypoints", []),
"parameters": {
"max_distance_km": max_distance_km,
"timestamp": timestamp
}
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
async def handle_geocode_simple(arguments: Dict[str, Any]) -> list[TextContent]:
"""
Handle geocode_location tool execution - ONLY geocoding, no traffic data.
Args:
arguments: Tool arguments containing location, city, and state
Returns:
List of TextContent with geocoded coordinates in JSON format
"""
location = arguments.get("location")
city = arguments.get("city", "Los Angeles")
state = arguments.get("state", "California")
# Validate required parameters
if not location:
raise ValueError("location parameter is required")
logger.info(f"Geocoding location: {location}, {city}, {state}")
# Geocode the location
coordinates = await geocode_location(location, city, state)
if coordinates is None:
return [TextContent(
type="text",
text=json.dumps({
"status": "not_found",
"location": location,
"city": city,
"state": state,
"message": f"No se pudo encontrar la ubicación '{location}, {city}, {state}'"
}, indent=2)
)]
lat, lon = coordinates
logger.info(f"Coordinates found: {lat}, {lon}")
# Return simple JSON with coordinates
result = {
"status": "success",
"location": location,
"city": city,
"state": state,
"coordinates": {
"latitude": lat,
"longitude": lon
}
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
async def handle_traffic_at_location(arguments: Dict[str, Any]) -> list[TextContent]:
"""
Handle get_traffic_at_location tool execution.
Args:
arguments: Tool arguments containing latitude, longitude, timestamp, and options
Returns:
List of TextContent with traffic stations and predictions in JSON format for map rendering
"""
lat = arguments.get("latitude")
lon = arguments.get("longitude")
location_name = arguments.get("location_name", "Unknown Location")
timestamp = arguments.get("timestamp")
max_stations = arguments.get("max_stations", 5)
include_predictions = arguments.get("include_predictions", True)
# Validate required parameters
if lat is None or lon is None:
raise ValueError("latitude and longitude parameters are required")
if not timestamp:
raise ValueError("timestamp parameter is required for traffic predictions")
logger.info(f"Getting traffic at location: {lat}, {lon} ({location_name})")
# Get all stations from the API
stations_response = await make_api_request("GET", "/stations")
all_stations = stations_response.get("stations", [])
if not all_stations:
return [TextContent(
type="text",
text=json.dumps({
"status": "no_stations",
"message": "No se pudieron obtener las estaciones de tráfico"
}, indent=2)
)]
# Find nearest stations
nearest_stations = find_nearest_stations(lat, lon, all_stations, max_stations)
if not nearest_stations:
return [TextContent(
type="text",
text=json.dumps({
"status": "no_nearby_stations",
"location": {
"name": location_name,
"latitude": lat,
"longitude": lon
},
"message": "No se encontraron estaciones cercanas"
}, indent=2)
)]
# Prepare station data for client map rendering
stations_data = []
for station in nearest_stations:
station_info = {
"id": station.get('ID'),
"name": station.get('Name'),
"latitude": station.get('Latitude'),
"longitude": station.get('Longitude'),
"freeway": station.get('Fwy'),
"direction": station.get('Dir'),
"lanes": station.get('Lanes'),
"type": station.get('Type'),
"distance_km": station.get('distance_km')
}
# Include traffic prediction if requested
if include_predictions:
try:
station_id = station.get('ID')
# Get actual traffic data with historical sequence
actual_traffic_response = await make_api_request(
"POST",
"/traffic/actual",
data={
"station_id": station_id,
"timestamp": timestamp
}
)
# Extract historical sequence
historical_sequence = actual_traffic_response.get("historical_sequence")
if not historical_sequence:
logger.warning(f"No historical sequence returned for station {station_id}")
station_info["traffic"] = {
"status": "no_historical_data",
"message": "No historical sequence available for this station and timestamp"
}
else:
# Get prediction from API using the historical sequence
prediction_response = await make_api_request(
"POST",
"/predict",
data={"sequence": historical_sequence}
)
station_info["traffic"] = {
"spi": prediction_response.get("spi_predicted"),
"congestion_level": prediction_response.get("congestion_level"),
"congestion_label": prediction_response.get("congestion_label"),
"traffic_state": prediction_response.get("traffic_state"),
"confidence_level": prediction_response.get("confidence_level"),
"fuzzy_classification": prediction_response.get("fuzzy_classification")
}
except Exception as e:
logger.error(f"Error getting prediction for station {station.get('ID')}: {e}")
station_info["traffic"] = {
"status": "error",
"message": str(e)
}
stations_data.append(station_info)
# Build response for map rendering
result = {
"status": "success",
"query_location": {
"name": location_name,
"latitude": lat,
"longitude": lon
},
"stations_count": len(stations_data),
"stations": stations_data,
"map_center": {
"latitude": lat,
"longitude": lon
},
"map_zoom": 12
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# SSE Transport Configuration
sse_transport = SseServerTransport("/messages/")
async def handle_sse(request: Request) -> Response:
"""
Handle SSE connections.
Args:
request: Starlette request object
Returns:
Starlette response object
"""
logger.info("SSE connection established")
# Connect SSE and run MCP server
async with sse_transport.connect_sse(
request.scope, request.receive, request._send
) as streams:
await mcp_server.run(
streams[0], streams[1], mcp_server.create_initialization_options()
)
return Response("SSE connection closed", status_code=200)
async def handle_health(request: Request) -> Response:
"""
Health check endpoint.
Args:
request: Starlette request object
Returns:
Health status response
"""
return Response(
json.dumps({
"status": "healthy",
"service": "traffic-mcp-server",
"traffic_api_url": TRAFFIC_API_URL
}),
media_type="application/json"
)
# Create Starlette application
app = Starlette(
routes=[
Route("/sse", endpoint=handle_sse, methods=["GET"]),
Mount("/messages/", app=sse_transport.handle_post_message),
Route("/health", endpoint=handle_health),
],
middleware=[
Middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
]
)
if __name__ == "__main__":
import uvicorn
logger.info(f"Starting Traffic MCP Server with SSE transport")
logger.info(f"Traffic API URL: {TRAFFIC_API_URL}")
logger.info(f"Server will listen on 0.0.0.0:{PORT}")
logger.info(f"SSE endpoint: http://0.0.0.0:{PORT}/sse")
logger.info(f"Messages endpoint: http://0.0.0.0:{PORT}/messages/")
logger.info(f"Health endpoint: http://0.0.0.0:{PORT}/health")
uvicorn.run(app, host="0.0.0.0", port=PORT)