#!/usr/bin/env python3
"""Lumen Resto MCP - Server para reservas y horarios de restaurantes con Supabase."""
import os
import sys
import logging
import json
import re
from datetime import datetime, timedelta, time as dt_time
from dateutil import parser as date_parser
from dateutil import tz
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp import types
# Logging setup
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stderr)]
)
logger = logging.getLogger(__name__)
# Environment variables
SUPABASE_URL = os.getenv("SUPABASE_URL", "")
SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY", "")
# Table whitelist
ALLOWED_TABLES = {
"restaurants",
"restaurant_schedules",
"special_dates",
"events",
"event_tables",
"tables",
"clients",
"reservations"
}
# Service types
SERVICE_TYPES = {"almuerzo", "cena"}
# Active reservation statuses (block tables)
ACTIVE_STATUSES = {"pending", "confirmed", "seated"}
# HTTP client
http_client = None
def get_http_client():
"""Get or create the shared HTTP client."""
global http_client
if http_client is None:
http_client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=5.0, read=10.0)
)
return http_client
def _mask_phone(phone):
"""Mask phone number for logging, showing last 4 digits."""
if not phone or len(phone) < 4:
return "****"
return "***" + phone[-4:]
def _is_uuid_like(s):
"""Check if string looks like a UUID."""
if not s:
return False
uuid_pattern = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
return bool(re.match(uuid_pattern, s.lower()))
def _parse_date_yyyy_mm_dd(s):
"""Parse date string in YYYY-MM-DD format, return datetime.date or None."""
if not s:
return None
try:
parts = s.split("-")
if len(parts) != 3:
return None
year, month, day = int(parts[0]), int(parts[1]), int(parts[2])
return datetime(year, month, day).date()
except (ValueError, IndexError):
return None
def _parse_time_hh_mm(s):
"""Parse time string in HH:MM format, return datetime.time or None."""
if not s:
return None
try:
parts = s.split(":")
if len(parts) != 2:
return None
hour, minute = int(parts[0]), int(parts[1])
if hour < 0 or hour > 23 or minute < 0 or minute > 59:
return None
return dt_time(hour, minute)
except (ValueError, IndexError):
return None
def _normalize_phone(s):
"""Normalize phone to E.164 format if possible, else clean it."""
if not s:
return ""
cleaned = re.sub(r'[^\d+]', '', s)
if cleaned.startswith('+'):
return cleaned
if cleaned.startswith('54') and len(cleaned) >= 10:
return '+' + cleaned
if len(cleaned) >= 10:
return '+54' + cleaned
return cleaned if cleaned else s
def _safe_int(s, default=0):
"""Convert string to int safely, return default on failure."""
if not s:
return default
try:
return int(s)
except (ValueError, TypeError):
return default
async def _supabase_get(table, params=None):
"""Execute GET request to Supabase PostgREST."""
if table not in ALLOWED_TABLES:
raise ValueError(f"Table not allowed: {table}")
if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
raise ValueError("Missing SUPABASE_URL or SUPABASE_SERVICE_ROLE_KEY")
url = f"{SUPABASE_URL}/rest/v1/{table}"
headers = {
"apikey": SUPABASE_SERVICE_ROLE_KEY,
"Authorization": f"Bearer {SUPABASE_SERVICE_ROLE_KEY}",
"Content-Type": "application/json"
}
try:
client = get_http_client()
resp = await client.get(url, headers=headers, params=params or {})
resp.raise_for_status()
return resp.json()
except httpx.TimeoutException:
logger.error(f"Timeout fetching {table}")
raise ValueError("❌ Conexión: timeout consultando Supabase.")
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
logger.error(f"Supabase 401 on {table}")
raise ValueError("❌ Configuración: credenciales Supabase inválidas.")
logger.error(f"HTTP {e.response.status_code} on {table}: {e.response.text}")
raise ValueError(f"❌ Error consultando {table}: {e.response.status_code}")
except Exception as e:
logger.error(f"Error fetching {table}: {e}")
raise ValueError(f"❌ Error inesperado: {str(e)}")
async def _supabase_post(table, data):
"""Execute POST request to Supabase PostgREST."""
if table not in ALLOWED_TABLES:
raise ValueError(f"Table not allowed: {table}")
if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
raise ValueError("Missing SUPABASE_URL or SUPABASE_SERVICE_ROLE_KEY")
url = f"{SUPABASE_URL}/rest/v1/{table}"
headers = {
"apikey": SUPABASE_SERVICE_ROLE_KEY,
"Authorization": f"Bearer {SUPABASE_SERVICE_ROLE_KEY}",
"Content-Type": "application/json",
"Prefer": "return=representation"
}
try:
client = get_http_client()
resp = await client.post(url, headers=headers, json=data)
resp.raise_for_status()
return resp.json()
except httpx.TimeoutException:
logger.error(f"Timeout posting to {table}")
raise ValueError("❌ Conexión: timeout consultando Supabase.")
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
logger.error(f"Supabase 401 on {table}")
raise ValueError("❌ Configuración: credenciales Supabase inválidas.")
logger.error(f"HTTP {e.response.status_code} on {table}: {e.response.text}")
raise ValueError(f"❌ Error insertando en {table}: {e.response.status_code}")
except Exception as e:
logger.error(f"Error posting to {table}: {e}")
raise ValueError(f"❌ Error inesperado: {str(e)}")
async def _supabase_patch(table, params, data):
"""Execute PATCH request to Supabase PostgREST."""
if table not in ALLOWED_TABLES:
raise ValueError(f"Table not allowed: {table}")
if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
raise ValueError("Missing SUPABASE_URL or SUPABASE_SERVICE_ROLE_KEY")
url = f"{SUPABASE_URL}/rest/v1/{table}"
headers = {
"apikey": SUPABASE_SERVICE_ROLE_KEY,
"Authorization": f"Bearer {SUPABASE_SERVICE_ROLE_KEY}",
"Content-Type": "application/json",
"Prefer": "return=representation"
}
try:
client = get_http_client()
resp = await client.patch(url, headers=headers, params=params, json=data)
resp.raise_for_status()
return resp.json()
except httpx.TimeoutException:
logger.error(f"Timeout patching {table}")
raise ValueError("❌ Conexión: timeout consultando Supabase.")
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
logger.error(f"Supabase 401 on {table}")
raise ValueError("❌ Configuración: credenciales Supabase inválidas.")
logger.error(f"HTTP {e.response.status_code} on {table}: {e.response.text}")
raise ValueError(f"❌ Error actualizando {table}: {e.response.status_code}")
except Exception as e:
logger.error(f"Error patching {table}: {e}")
raise ValueError(f"❌ Error inesperado: {str(e)}")
def _get_day_of_week(date_obj):
"""Get day of week (0=Sunday) from date."""
weekday = date_obj.weekday()
return (weekday + 1) % 7
def _infer_service_type(time_obj):
"""Infer service type from time (almuerzo < 17:00, cena >= 17:00)."""
if time_obj.hour < 17:
return "almuerzo"
return "cena"
def _times_overlap(start1, end1, start2, end2):
"""Check if two time ranges overlap."""
return start1 < end2 and start2 < end1
async def _get_restaurant(restaurant_id):
"""Fetch restaurant by ID."""
restaurants = await _supabase_get("restaurants", {"id": f"eq.{restaurant_id}"})
if not restaurants:
raise ValueError(f"❌ Error: restaurante {restaurant_id} no encontrado.")
return restaurants[0]
async def _check_schedule_open(restaurant_id, date_obj, time_obj, service_type, restaurant):
"""Check if restaurant is open at given date/time/service, return (is_open, reason, hours_window)."""
dow = _get_day_of_week(date_obj)
date_str = date_obj.isoformat()
# 1. Check events that close restaurant
events = await _supabase_get("events", {
"restaurant_id": f"eq.{restaurant_id}",
"event_date": f"eq.{date_str}",
"closes_restaurant": "eq.true",
"status": f"in.(confirmed,pending)"
})
for event in events:
event_start = datetime.strptime(event["start_time"], "%H:%M:%S").time()
event_end = datetime.strptime(event["end_time"], "%H:%M:%S").time()
if time_obj and _times_overlap(time_obj, time_obj, event_start, event_end):
return False, f"Evento privado: {event['name']}", None
# 2. Check special_dates for this date + service
special_dates = await _supabase_get("special_dates", {
"restaurant_id": f"eq.{restaurant_id}",
"date": f"eq.{date_str}",
"service_type": f"eq.{service_type}"
})
if special_dates:
sd = special_dates[0]
if not sd["is_enabled"]:
return False, f"Cerrado ({sd.get('reason', 'fecha especial')})", None
# Special date enabled - use its times
start_time = datetime.strptime(sd["start_time"], "%H:%M:%S").time()
end_time = datetime.strptime(sd["end_time"], "%H:%M:%S").time()
if time_obj and not _times_overlap(time_obj, time_obj, start_time, end_time):
return False, f"Fuera del horario especial ({start_time.strftime('%H:%M')}-{end_time.strftime('%H:%M')})", (start_time, end_time)
return True, "Horario especial", (start_time, end_time)
# 3. Check regular_closed_days
regular_closed = restaurant.get("regular_closed_days", [])
if dow in regular_closed:
return False, "Día de cierre regular", None
# 4. Check restaurant_schedules
schedules = await _supabase_get("restaurant_schedules", {
"restaurant_id": f"eq.{restaurant_id}",
"day_of_week": f"eq.{dow}",
"service_type": f"eq.{service_type}",
"is_enabled": "eq.true"
})
if not schedules:
return False, "Sin horario configurado", None
schedule = schedules[0]
start_time = datetime.strptime(schedule["start_time"], "%H:%M:%S").time()
end_time = datetime.strptime(schedule["end_time"], "%H:%M:%S").time()
if time_obj and not _times_overlap(time_obj, time_obj, start_time, end_time):
return False, f"Fuera del horario ({start_time.strftime('%H:%M')}-{end_time.strftime('%H:%M')})", (start_time, end_time)
return True, "Abierto", (start_time, end_time)
async def _find_available_table(restaurant_id, date_obj, start_time_obj, end_time_obj, party_size):
"""Find best-fit available table for the reservation."""
# Get all active tables sorted by capacity
tables = await _supabase_get("tables", {
"restaurant_id": f"eq.{restaurant_id}",
"is_active": "eq.true",
"capacity": f"gte.{party_size}",
"order": "capacity.asc"
})
if not tables:
return None
date_str = date_obj.isoformat()
# Check event_tables for blocked tables on this date
events = await _supabase_get("events", {
"restaurant_id": f"eq.{restaurant_id}",
"event_date": f"eq.{date_str}",
"status": f"in.(confirmed,pending)"
})
blocked_table_ids = set()
for event in events:
event_start = datetime.strptime(event["start_time"], "%H:%M:%S").time()
event_end = datetime.strptime(event["end_time"], "%H:%M:%S").time()
if _times_overlap(start_time_obj, end_time_obj, event_start, event_end):
event_tables = await _supabase_get("event_tables", {"event_id": f"eq.{event['id']}"})
for et in event_tables:
blocked_table_ids.add(et["table_id"])
# Check existing reservations
reservations = await _supabase_get("reservations", {
"restaurant_id": f"eq.{restaurant_id}",
"date": f"eq.{date_str}",
"status": f"in.(pending,confirmed,seated)"
})
for table in tables:
if table["id"] in blocked_table_ids:
continue
# Check if table has overlapping reservations
has_conflict = False
for res in reservations:
if res["table_id"] != table["id"]:
continue
res_start = datetime.strptime(res["start_time"], "%H:%M:%S%z").time()
res_end = datetime.strptime(res["end_time"], "%H:%M:%S%z").time()
if _times_overlap(start_time_obj, end_time_obj, res_start, res_end):
has_conflict = True
break
if not has_conflict:
return table
return None
async def _generate_alternatives(restaurant_id, date_obj, original_time_obj, party_size, service_type, restaurant):
"""Generate up to 2 alternative times that have availability."""
alternatives = []
offsets = [30, 60, -30, -60]
for offset in offsets:
if len(alternatives) >= 2:
break
alt_time_obj = (datetime.combine(date_obj, original_time_obj) + timedelta(minutes=offset)).time()
# Check if this time is open
is_open, _, _ = await _check_schedule_open(restaurant_id, date_obj, alt_time_obj, service_type, restaurant)
if not is_open:
continue
# Check if there's a table available
duration = restaurant.get("default_reservation_duration", 90)
alt_end_time_obj = (datetime.combine(date_obj, alt_time_obj) + timedelta(minutes=duration)).time()
table = await _find_available_table(restaurant_id, date_obj, alt_time_obj, alt_end_time_obj, party_size)
if table:
alternatives.append(alt_time_obj.strftime("%H:%M"))
return alternatives
# Initialize MCP server
mcp = Server("lumen_resto_mcp")
@mcp.tool()
async def check_restaurant_schedule(
restaurant_id: str = "",
date: str = "",
time: str = "",
service_type: str = "",
channel: str = ""
) -> str:
"""Consulta horarios de un restaurante para una fecha y hora específica."""
try:
# Validate restaurant_id
if not restaurant_id or not _is_uuid_like(restaurant_id):
return "❌ Error: falta restaurant_id válido."
# Validate date
if not date:
return "❌ Error: falta fecha (YYYY-MM-DD)."
date_obj = _parse_date_yyyy_mm_dd(date)
if not date_obj:
return "❌ Error: fecha inválida (formato YYYY-MM-DD requerido)."
# Parse time if provided
time_obj = None
if time:
time_obj = _parse_time_hh_mm(time)
if not time_obj:
return "❌ Error: hora inválida (formato HH:MM requerido)."
# Get restaurant
restaurant = await _get_restaurant(restaurant_id)
timezone_str = restaurant.get("timezone", "America/Argentina/Buenos_Aires")
# Infer service_type if not provided
if not service_type:
if time_obj:
service_type = _infer_service_type(time_obj)
else:
return "❌ Error: falta service_type (almuerzo o cena) o time para inferirlo."
if service_type not in SERVICE_TYPES:
return f"❌ Error: service_type inválido (debe ser almuerzo o cena)."
logger.info(f"Checking schedule for restaurant {restaurant_id}, date {date}, service {service_type}")
# Check if open
is_open, reason, hours = await _check_schedule_open(restaurant_id, date_obj, time_obj, service_type, restaurant)
dow = _get_day_of_week(date_obj)
days_es = ["Domingo", "Lunes", "Martes", "Miércoles", "Jueves", "Viernes", "Sábado"]
day_name = days_es[dow]
if is_open:
hours_str = f"{hours[0].strftime('%H:%M')}-{hours[1].strftime('%H:%M')}" if hours else "horario completo"
time_check = f", hora {time}" if time else ""
return f"🕒 Abierto\n📅 {day_name} {date}, {service_type}\n⏰ Horario: {hours_str}\n🌍 Timezone: {timezone_str}{time_check}\n✅ {reason}"
else:
time_check = f" a las {time}" if time else ""
result = f"❌ Cerrado\n📅 {day_name} {date}, {service_type}{time_check}\n🌍 Timezone: {timezone_str}\n⚠️ Motivo: {reason}"
# Try to find next opening
if hours:
result += f"\n💡 Horario habitual: {hours[0].strftime('%H:%M')}-{hours[1].strftime('%H:%M')}"
return result
except ValueError as e:
logger.error(f"Schedule check error: {e}")
return str(e)
except Exception as e:
logger.error(f"Unexpected error in check_restaurant_schedule: {e}")
return f"❌ Error inesperado: {str(e)}"
@mcp.tool()
async def create_reservation(
restaurant_id: str = "",
phone: str = "",
full_name: str = "",
language: str = "",
date: str = "",
time: str = "",
party_size: str = "",
service_type: str = "",
duration_minutes: str = "",
notes: str = "",
occasion: str = "",
special_needs: str = "",
channel: str = "",
idempotency_hint: str = ""
) -> str:
"""Crea una nueva reserva con asignación automática de mesa."""
try:
# Validate restaurant_id
if not restaurant_id or not _is_uuid_like(restaurant_id):
return "❌ Error: falta restaurant_id válido."
# Validate phone
if not phone:
return "❌ Error: falta phone."
normalized_phone = _normalize_phone(phone)
masked_phone = _mask_phone(normalized_phone)
# Validate date
if not date:
return "❌ Error: falta fecha (YYYY-MM-DD)."
date_obj = _parse_date_yyyy_mm_dd(date)
if not date_obj:
return "❌ Error: fecha inválida (formato YYYY-MM-DD requerido)."
# Validate time
if not time:
return "❌ Error: falta hora (HH:MM)."
time_obj = _parse_time_hh_mm(time)
if not time_obj:
return "❌ Error: hora inválida (formato HH:MM requerido)."
# Validate party_size
if not party_size:
return "❌ Error: falta party_size."
party_size_int = _safe_int(party_size, 0)
if party_size_int <= 0:
return "❌ Error: party_size inválido (debe ser > 0)."
# Get restaurant
restaurant = await _get_restaurant(restaurant_id)
timezone_str = restaurant.get("timezone", "America/Argentina/Buenos_Aires")
# Infer service_type if not provided
if not service_type:
service_type = _infer_service_type(time_obj)
if service_type not in SERVICE_TYPES:
return f"❌ Error: service_type inválido (debe ser almuerzo o cena)."
# Get duration
duration = _safe_int(duration_minutes, 0)
if duration <= 0:
duration = restaurant.get("default_reservation_duration", 90)
logger.info(f"Creating reservation for restaurant {restaurant_id}, phone {masked_phone}, date {date}, time {time}, party {party_size_int}")
# Check if restaurant is open
is_open, reason, hours = await _check_schedule_open(restaurant_id, date_obj, time_obj, service_type, restaurant)
if not is_open:
return f"❌ Cerrado: el restaurante no atiende en ese horario.\n⚠️ Motivo: {reason}\n📅 {date} a las {time} ({service_type})"
# Calculate start_time and end_time with timezone
tz_obj = tz.gettz(timezone_str)
start_datetime = datetime.combine(date_obj, time_obj).replace(tzinfo=tz_obj)
end_datetime = start_datetime + timedelta(minutes=duration)
start_time_str = start_datetime.strftime("%H:%M:%S%z")
end_time_str = end_datetime.strftime("%H:%M:%S%z")
# Upsert client
existing_clients = await _supabase_get("clients", {
"restaurant_id": f"eq.{restaurant_id}",
"phone": f"eq.{normalized_phone}"
})
if existing_clients:
client = existing_clients[0]
client_id = client["id"]
# Update client if name or language provided
update_data = {}
if full_name and full_name != client.get("full_name"):
update_data["full_name"] = full_name
if language and language != client.get("language"):
update_data["language"] = language
if update_data:
update_data["updated_at"] = datetime.utcnow().isoformat()
await _supabase_patch("clients", {"id": f"eq.{client_id}"}, update_data)
else:
# Create new client
client_data = {
"restaurant_id": restaurant_id,
"phone": normalized_phone,
"full_name": full_name or None,
"language": language or None
}
created_clients = await _supabase_post("clients", client_data)
client_id = created_clients[0]["id"]
# Check for duplicate reservation (idempotency)
existing_reservations = await _supabase_get("reservations", {
"restaurant_id": f"eq.{restaurant_id}",
"client_id": f"eq.{client_id}",
"date": f"eq.{date_obj.isoformat()}",
"start_time": f"eq.{start_time_str}",
"status": f"in.(pending,confirmed,seated)"
})
if existing_reservations:
existing = existing_reservations[0]
table_code = "sin mesa"
if existing.get("table_id"):
tables = await _supabase_get("tables", {"id": f"eq.{existing['table_id']}"})
if tables:
table_code = tables[0]["table_code"]
return f"✅ Reserva ya existe\n🍽️ ID: {existing['id']}\n📅 {date} a las {time}\n👥 {party_size_int} personas\n📍 Mesa: {table_code}\n⚠️ Reserva duplicada detectada"
# Find available table
table = await _find_available_table(restaurant_id, date_obj, time_obj, end_datetime.time(), party_size_int)
if not table:
# No table available - generate alternatives
alternatives = await _generate_alternatives(restaurant_id, date_obj, time_obj, party_size_int, service_type, restaurant)
alt_str = ""
if alternatives:
alt_str = f"\n💡 Horarios disponibles: {', '.join(alternatives)}"
return f"❌ Sin disponibilidad: no hay mesas para {party_size_int} personas.\n📅 {date} a las {time}\n🍽️ {service_type}{alt_str}"
# Create reservation
reservation_data = {
"restaurant_id": restaurant_id,
"client_id": client_id,
"date": date_obj.isoformat(),
"start_time": start_time_str,
"end_time": end_time_str,
"party_size": party_size_int,
"duration_minutes": duration,
"table_id": table["id"],
"status": "confirmed",
"notes": notes or None,
"occasion": occasion or None,
"special_needs": special_needs or None,
"channel": channel or None
}
created_reservations = await _supabase_post("reservations", reservation_data)
reservation = created_reservations[0]
logger.info(f"Reservation created: {reservation['id']}, table {table['table_code']}")
return f"✅ Reserva confirmada\n🍽️ ID: {reservation['id']}\n📅 {date} a las {time}\n👥 {party_size_int} personas\n📍 Mesa: {table['table_code']}\n⏱️ Duración: {duration} min\n🌍 Timezone: {timezone_str}\n📱 Canal: {channel or 'no especificado'}"
except ValueError as e:
logger.error(f"Reservation creation error: {e}")
return str(e)
except Exception as e:
logger.error(f"Unexpected error in create_reservation: {e}")
return f"❌ Error inesperado: {str(e)}"
async def main():
"""Main entry point for the MCP server."""
logger.info("Starting Lumen Resto MCP server")
if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
logger.error("Missing required environment variables")
sys.exit(1)
async with stdio_server() as (read_stream, write_stream):
await mcp.run(
read_stream,
write_stream,
mcp.create_initialization_options()
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())