#server.py
import asyncio
import inspect
import requests
import json
import sys
import os
import mimetypes
from datetime import datetime, timezone
from typing import Optional, Dict, Any, List
from uuid import uuid4
from pathlib import Path
from fastapi import FastAPI, HTTPException, Request, Depends, Query
from fastapi.responses import JSONResponse, RedirectResponse, HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import ollama
from tavily import TavilyClient
from dotenv import load_dotenv
# Cargar variables de entorno desde .env ANTES de importar módulos locales
load_dotenv()
# Importar módulos locales
try:
from .logger import get_logger
from .auth_models import (
User, TokenRequest, TokenResponse, UserInfo, AuthorizeRequest,
OAuthProvider, AuthError
)
from .auth_service import auth_service
from .auth_middleware import (
auth_middleware, get_current_active_user, get_user_with_mcp_read,
get_user_with_mcp_write, get_user_with_mcp_read_from_request
)
except ImportError:
# Fallback para ejecución directa
from logger import get_logger
from auth_models import (
User, TokenRequest, TokenResponse, UserInfo, AuthorizeRequest,
OAuthProvider, AuthError
)
from auth_service import auth_service
from auth_middleware import (
auth_middleware, get_current_active_user, get_user_with_mcp_read,
get_user_with_mcp_write, get_user_with_mcp_read_from_request
)
# Configurar logger
logger = get_logger("mcp-server")
# Debug: verificar que las variables se cargan
logger.info("Verificando carga de variables de entorno...")
logger.info(f"GOOGLE_CLIENT_ID: {'✓' if os.getenv('GOOGLE_CLIENT_ID') else '✗'}")
logger.info(f"GOOGLE_CLIENT_SECRET: {'✓' if os.getenv('GOOGLE_CLIENT_SECRET') else '✗'}")
logger.info(f"GITHUB_CLIENT_ID: {'✓' if os.getenv('GITHUB_CLIENT_ID') else '✗'}")
logger.info(f"GITHUB_CLIENT_SECRET: {'✓' if os.getenv('GITHUB_CLIENT_SECRET') else '✗'}")
logger.info(f"TAVILY_API_KEY: {'✓' if os.getenv('TAVILY_API_KEY') else '✗'}")
# Instancia principal del servidor FastAPI
app = FastAPI(
title="MCP HTTP Server",
description="Servidor MCP con transporte HTTP en lugar de STDIO",
version="1.0.0"
)
# Configurar CORS para permitir acceso desde cualquier origen
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Agregar middleware de autenticación
app.middleware("http")(auth_middleware)
# Modelos Pydantic para las peticiones MCP
class MCPRequest(BaseModel):
jsonrpc: str = "2.0"
id: Optional[str] = None
method: str
params: Optional[Dict[str, Any]] = None
class MCPResponse(BaseModel):
jsonrpc: str = "2.0"
id: Optional[str] = None
result: Optional[Dict[str, Any]] = None
error: Optional[Dict[str, Any]] = None
class InitializeParams(BaseModel):
protocolVersion: str
capabilities: Dict[str, Any] = {}
clientInfo: Dict[str, Any] = {}
class ToolCallParams(BaseModel):
name: str
arguments: Dict[str, Any] = {}
# Estado del servidor
server_state = {
"initialized": False,
"client_info": None,
"capabilities": {}
}
logger.info("Servidor MCP HTTP inicializado")
def get_current_datetime_iso() -> str:
"""Devuelve la fecha y hora actuales en formato ISO-8601 (UTC)."""
logger.debug("Obteniendo fecha/hora actual")
current_timestamp_iso: str = datetime.now(timezone.utc).isoformat()
logger.debug(f"Fecha/hora obtenida: {current_timestamp_iso}")
return current_timestamp_iso
# Función datetime_now eliminada
def search_web_tavily(query: str, num_results: int = 3) -> str:
"""
Realiza una búsqueda web usando TAVILY API (optimizada para LLMs).
Devuelve resultados estructurados y relevantes.
"""
logger.debug(f"Iniciando búsqueda TAVILY para: {query}")
try:
# Obtener API key de variable de entorno
api_key = os.getenv("TAVILY_API_KEY")
if not api_key:
logger.warning("TAVILY_API_KEY no configurada, usando DuckDuckGo como fallback")
return search_web_simple(query, num_results)
logger.debug(f"TAVILY_API_KEY encontrada: {api_key[:10]}...{api_key[-4:] if len(api_key) > 14 else '***'}")
# Crear cliente TAVILY
client = TavilyClient(api_key=api_key)
# Realizar búsqueda con TAVILY
response = client.search(
query=query,
search_depth="basic", # Usar búsqueda básica para ahorrar créditos
max_results=num_results,
include_answer=True, # Incluir respuesta directa si está disponible
include_raw_content=False # No incluir contenido raw para ahorrar tokens
)
results = []
# Procesar respuesta directa si está disponible
if response.get("answer"):
results.append(f"Respuesta directa: {response['answer']}")
# Procesar resultados de búsqueda
if response.get("results"):
for result in response["results"][:num_results]:
title = result.get("title", "Sin título")
content = result.get("content", "Sin contenido")
url = result.get("url", "")
results.append(f"- {title}")
results.append(f" {content}")
if url:
results.append(f" Fuente: {url}")
results.append("") # Línea en blanco para separar
if results:
logger.debug(f"TAVILY completada: {len(results)} elementos")
return "\n".join(results)
else:
logger.warning("TAVILY no devolvió resultados, usando DuckDuckGo")
return search_web_simple(query, num_results)
except Exception as e:
logger.error(f"Error en TAVILY: {e}", exc_info=True)
if "Unauthorized" in str(e) or "invalid API key" in str(e).lower():
logger.error("TAVILY_API_KEY es inválida o ha expirado. Verifica la configuración en Railway.")
logger.info("Fallback a DuckDuckGo")
return search_web_simple(query, num_results)
def search_web_simple(query: str, num_results: int = 3) -> str:
"""
Realiza una búsqueda web simple usando DuckDuckGo (sin API key requerida).
Devuelve los primeros resultados como texto plano.
"""
logger.debug(f"Iniciando búsqueda web para: {query}")
try:
# Usar DuckDuckGo Instant Answer API (gratuita, sin API key)
url = "https://api.duckduckgo.com/"
params = {
"q": query,
"format": "json",
"no_html": "1",
"skip_disambig": "1",
"t": "web" # Forzar búsqueda web en lugar de instant answer
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
# Extraer información relevante
results = []
# Abstract (resumen)
if data.get("Abstract"):
results.append(f"Resumen: {data['Abstract']}")
# Definición
if data.get("Definition"):
results.append(f"Definición: {data['Definition']}")
# Resultados relacionados
if data.get("RelatedTopics"):
for topic in data["RelatedTopics"][:num_results]:
if isinstance(topic, dict) and topic.get("Text"):
results.append(f"- {topic['Text']}")
# Si no hay resultados específicos, usar el abstract
if not results and data.get("Abstract"):
results.append(data["Abstract"])
# Log de datos recibidos para debugging
logger.debug(f"Datos DuckDuckGo recibidos: Abstract={bool(data.get('Abstract'))}, Definition={bool(data.get('Definition'))}, RelatedTopics={len(data.get('RelatedTopics', []))}")
# Si aún no hay resultados, crear una respuesta informativa
if not results:
results.append(f"No se encontraron resultados específicos para '{query}'")
results.append("")
results.append("Esto puede deberse a:")
results.append("- Limitaciones de la API de DuckDuckGo")
results.append("- Consulta muy específica o reciente")
results.append("- Problemas de conectividad")
results.append("")
results.append("Recomendación: Intenta reformular tu consulta o verifica la configuración de TAVILY_API_KEY.")
logger.debug(f"Búsqueda web completada: {len(results)} resultados")
return "\n".join(results)
except Exception as e:
logger.error(f"Error en búsqueda web: {e}", exc_info=True)
# Fallback con información básica sobre Ollama
return f"""Información sobre '{query}':
Ollama es una herramienta de código abierto que permite ejecutar modelos de lenguaje de gran tamaño (LLMs) localmente en tu dispositivo.
Características principales:
- Ejecuta modelos como Llama, Mistral, CodeLlama localmente
- No requiere conexión a internet para funcionar
- Mantiene privacidad de tus datos
- Soporte para múltiples sistemas operativos
- API compatible con OpenAI
Para instalar: https://ollama.com/download
Para más información: https://ollama.com"""
# Herramientas de directorio eliminadas - ya no se necesitan
# Funciones de directorio eliminadas
async def web_search_tool(query: str, model: str = "gpt-oss:20b") -> str:
"""
Realiza una búsqueda web y la procesa usando Ollama localmente.
Args:
query: Consulta de búsqueda
model: Modelo de Ollama a usar (por defecto: gpt-oss:20b)
Returns:
Respuesta procesada por Ollama basada en los resultados de búsqueda web
"""
logger.info(f"Herramienta web_search llamada - Query: {query}, Model: {model}")
try:
# 1. Realizar búsqueda web (TAVILY primero, DuckDuckGo como fallback)
logger.debug("Realizando búsqueda web...")
web_results = search_web_tavily(query)
logger.debug(f"Resultados web obtenidos: {len(web_results)} caracteres")
# 2. Intentar usar Ollama si está disponible
logger.debug(f"Intentando conectar con Ollama usando modelo: {model}")
try:
# Preparar prompt para Ollama
prompt = f"""Basándote en los siguientes resultados de búsqueda web, responde de manera útil y precisa a la consulta del usuario.
Consulta del usuario: {query}
Resultados de búsqueda web:
{web_results}
Por favor, proporciona una respuesta clara y bien estructurada basada en esta información."""
# Usar Ollama para procesar la información
logger.debug("Creando cliente Ollama...")
client = ollama.Client(host='http://127.0.0.1:11434')
logger.debug("Enviando prompt a Ollama...")
response = client.chat(
model=model,
messages=[{
'role': 'user',
'content': prompt
}]
)
result = f"[Ollama] Búsqueda web procesada con {model}:\n\n{response['message']['content']}"
logger.info(f"web_search completada con Ollama: {len(result)} caracteres")
logger.debug(f"Respuesta completa de Ollama: {result[:500]}...") # Primeros 500 caracteres
return result
except Exception as ollama_error:
logger.warning(f"Ollama no disponible: {ollama_error}")
# Si Ollama no está disponible, devolver resultados web directamente
logger.info(f"Devolviendo resultados de Tavily directamente: {len(web_results)} caracteres")
return web_results
except Exception as e:
logger.error(f"Error en web_search: {e}", exc_info=True)
return f"Error en web_search: {str(e)}"
# Endpoints de autenticación OAuth2
@app.get("/auth/providers")
async def get_oauth_providers():
"""Obtener lista de proveedores OAuth2 disponibles"""
providers = auth_service.get_available_providers()
return {
"providers": [
{
"name": provider.value,
"display_name": provider.value.title(),
"auth_url": f"/auth/{provider.value}"
}
for provider in providers
]
}
@app.get("/auth/success", response_class=HTMLResponse)
async def auth_success(
request: Request,
access_token: Optional[str] = Query(None),
refresh_token: Optional[str] = Query(None)
):
"""Página de éxito de autenticación"""
# Si hay tokens en la URL, configurar cookies seguras
if access_token and refresh_token:
response = HTMLResponse("""
<!DOCTYPE html>
<html>
<head>
<title>Autenticación Exitosa</title>
<style>
body { font-family: Arial, sans-serif; max-width: 400px; margin: 100px auto; padding: 20px; text-align: center; }
.warning { background-color: #fff3cd; border: 1px solid #ffeaa7; padding: 15px; margin: 20px 0; border-radius: 5px; }
.success { background-color: #d4edda; border: 1px solid #c3e6cb; padding: 15px; margin: 20px 0; border-radius: 5px; }
</style>
</head>
<body>
<h1>🔐 Autenticación Exitosa</h1>
<div class="success">
<p>¡Autenticación completada exitosamente!</p>
</div>
<p><a href="/docs">📖 Ver documentación de la API</a></p>
<p><a href="/test-page">🧪 Ir a página de pruebas</a></p>
<script>
// Limpiar la URL sin recargar la página
if (window.history.replaceState) {
window.history.replaceState({}, document.title, '/auth/success');
}
</script>
</body>
</html>
""")
# Configurar cookies seguras
# Detectar si estamos en producción
environment = os.getenv("ENVIRONMENT", "development")
railway_env = os.getenv("RAILWAY_ENVIRONMENT")
is_production = environment == "production" or railway_env is not None
cookie_secure = bool(is_production)
cookie_samesite = "lax" # Usar "lax" tanto en producción como desarrollo para mejor compatibilidad
# Debug logging de cookies
logger.info(f"Configurando cookies - secure: {cookie_secure}, samesite: {cookie_samesite}")
response.set_cookie(
key="access_token",
value=access_token,
httponly=True, # No accesible desde JavaScript
secure=cookie_secure, # True en producción (HTTPS), False en desarrollo
samesite=cookie_samesite, # lax para mejor compatibilidad
max_age=3600 # 1 hora
)
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True,
secure=cookie_secure, # True en producción (HTTPS), False en desarrollo
samesite=cookie_samesite,
max_age=30*24*3600 # 30 días
)
logger.info("Cookies configuradas correctamente")
return response
# Si no hay tokens, mostrar página normal
return HTMLResponse("""
<!DOCTYPE html>
<html>
<head>
<title>Autenticación Exitosa</title>
<style>
body { font-family: Arial, sans-serif; max-width: 400px; margin: 100px auto; padding: 20px; text-align: center; }
</style>
</head>
<body>
<h1>¡Autenticación Exitosa!</h1>
<p>Ya puedes usar el servidor MCP con tu token de acceso.</p>
<p><a href="/docs">Ver documentación de la API</a></p>
</body>
</html>
""")
@app.get("/auth/{provider}")
async def oauth_authorize(
provider: str,
redirect_uri: Optional[str] = Query(None, description="URI de redirección después de la autenticación")
):
"""Iniciar flujo de autorización OAuth2"""
try:
oauth_provider = OAuthProvider(provider)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Proveedor no soportado: {provider}"
)
if oauth_provider not in auth_service.get_available_providers():
raise HTTPException(
status_code=400,
detail=f"Proveedor no configurado: {provider}"
)
try:
# Usar redirect_uri por defecto si no se proporciona
# Detectar si estamos en producción (Railway siempre tiene PORT)
railway_port = os.getenv("PORT")
if railway_port:
# Estamos en Railway
base_url = "https://mcptavilidateoaut-production.up.railway.app"
else:
# Desarrollo local
base_url = "http://localhost:8001"
final_redirect_uri = redirect_uri or f"{base_url}/auth/success"
# Debug logging
logger.info(f"OAuth {provider} - base_url: {base_url}")
logger.info(f"OAuth {provider} - final_redirect_uri: {final_redirect_uri}")
auth_url, state = auth_service.create_oauth_authorization_url(
oauth_provider,
final_redirect_uri
)
logger.info(f"Redirigiendo a {provider} para autenticación")
logger.info(f"Auth URL generada: {auth_url}")
return RedirectResponse(url=auth_url)
except AuthError as e:
logger.error(f"Error creando URL de autorización: {e.message}")
raise HTTPException(status_code=400, detail=e.message)
@app.get("/auth/callback/{provider}")
async def oauth_callback(
provider: str,
code: str = Query(..., description="Código de autorización"),
state: str = Query(..., description="Estado para prevenir CSRF"),
redirect_uri: Optional[str] = Query(None, description="URI de redirección original (opcional)")
):
"""Manejar callback OAuth2"""
try:
oauth_provider = OAuthProvider(provider)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Proveedor no soportado: {provider}"
)
# Debug logging
logger.info(f"OAuth callback {provider} - code: {code[:10]}...")
logger.info(f"OAuth callback {provider} - state: {state}")
logger.info(f"OAuth callback {provider} - redirect_uri: {redirect_uri}")
try:
access_token, refresh_token = auth_service.handle_oauth_callback(
oauth_provider, code, state, redirect_uri
)
# Usar redirect_uri del estado OAuth si no se proporciona en la query
final_redirect_uri = redirect_uri
if not final_redirect_uri:
# Obtener redirect_uri del estado OAuth
if state in auth_service.oauth_states:
final_redirect_uri = auth_service.oauth_states[state].redirect_uri
else:
# Fallback con detección de entorno
railway_port = os.getenv("PORT")
if railway_port:
# Estamos en Railway
base_url = "https://mcptavilidateoaut-production.up.railway.app"
else:
# Desarrollo local
base_url = "http://localhost:8001"
final_redirect_uri = f"{base_url}/auth/success"
# Redirigir a la página de éxito con los tokens en la URL (para desarrollo)
# En producción, usar cookies seguras o almacenamiento en sesión
success_url = f"{final_redirect_uri}?access_token={access_token}&refresh_token={refresh_token}"
return RedirectResponse(url=success_url)
except AuthError as e:
logger.error(f"Error en callback OAuth2: {e.message}")
raise HTTPException(status_code=400, detail=e.message)
@app.post("/auth/token", response_model=TokenResponse)
async def refresh_token(token_request: TokenRequest):
"""Refrescar token de acceso"""
if token_request.grant_type != "refresh_token":
raise HTTPException(
status_code=400,
detail="grant_type debe ser 'refresh_token'"
)
if not token_request.refresh_token:
raise HTTPException(
status_code=400,
detail="refresh_token requerido"
)
try:
new_access_token = auth_service.refresh_access_token(token_request.refresh_token)
return TokenResponse(
access_token=new_access_token,
expires_in=auth_service.access_token_expire_minutes * 60
)
except AuthError as e:
logger.error(f"Error refrescando token: {e.message}")
raise HTTPException(status_code=401, detail=e.message)
@app.get("/auth/me", response_model=UserInfo)
async def get_current_user_info(current_user: User = Depends(get_current_active_user)):
"""Obtener información del usuario actual"""
return UserInfo(
id=current_user.id,
email=current_user.email,
name=current_user.name,
avatar_url=current_user.avatar_url,
role=current_user.role,
provider=current_user.provider,
created_at=current_user.created_at,
last_login=current_user.last_login
)
@app.post("/auth/logout")
async def logout(
token_request: TokenRequest,
current_user: User = Depends(get_current_active_user)
):
"""Cerrar sesión y revocar tokens"""
if token_request.refresh_token:
auth_service.revoke_refresh_token(token_request.refresh_token)
logger.info(f"Usuario desconectado: {current_user.email}")
return {"message": "Sesión cerrada exitosamente"}
@app.get("/auth/stats")
async def get_auth_stats(current_user: User = Depends(get_current_active_user)):
"""Obtener estadísticas de autenticación (solo para administradores)"""
if current_user.role != "admin":
raise HTTPException(
status_code=403,
detail="Acceso denegado: se requieren permisos de administrador"
)
return auth_service.get_stats()
# Página de login simple
@app.get("/login", response_class=HTMLResponse)
async def login_page():
"""Página de login simple"""
providers = auth_service.get_available_providers()
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>MCP Server - Login</title>
<style>
body { font-family: Arial, sans-serif; max-width: 400px; margin: 100px auto; padding: 20px; }
.provider { display: block; margin: 10px 0; padding: 10px; background: #f0f0f0; text-decoration: none; color: black; border-radius: 5px; }
.provider:hover { background: #e0e0e0; }
h1 { text-align: center; }
.no-providers { color: #666; text-align: center; margin: 20px 0; }
</style>
</head>
<body>
<h1>MCP Server Login</h1>
"""
if providers:
html_content += "<p>Selecciona un proveedor para autenticarte:</p>"
for provider in providers:
# Detectar entorno para URL base
railway_port = os.getenv("PORT")
if railway_port:
# Estamos en Railway
base_url = "https://mcptavilidateoaut-production.up.railway.app"
else:
# Desarrollo local
base_url = "http://localhost:8001"
html_content += f"""
<a href="/auth/{provider.value}?redirect_uri={base_url}/auth/success" class="provider">
Iniciar sesión con {provider.value.title()}
</a>
"""
else:
html_content += """
<div class="no-providers">
<p>No hay proveedores OAuth configurados.</p>
<p>Configura al menos un proveedor (Google, GitHub, Microsoft) en el archivo .env para habilitar la autenticación.</p>
<p><a href="/">Volver al inicio</a></p>
</div>
"""
html_content += """
</body>
</html>
"""
return html_content
# Página de pruebas mejorada con cookies
@app.get("/test-page", response_class=HTMLResponse)
async def test_page():
"""Página de pruebas para la herramienta web_search"""
return """
<!DOCTYPE html>
<html lang="es">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Búsqueda Web</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 800px;
margin: 20px auto;
padding: 20px;
background-color: #f5f5f5;
}
.container {
background: white;
padding: 30px;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
h1 {
color: #333;
text-align: center;
}
.section {
margin: 20px 0;
padding: 15px;
border: 1px solid #ddd;
border-radius: 5px;
}
.section h3 {
margin-top: 0;
color: #555;
}
input[type="text"] {
width: 100%;
padding: 10px;
margin: 5px 0;
border: 1px solid #ccc;
border-radius: 4px;
}
button {
background-color: #007bff;
color: white;
padding: 10px 20px;
border: none;
border-radius: 4px;
cursor: pointer;
margin: 5px;
}
button:hover {
background-color: #0056b3;
}
.result {
background-color: #f8f9fa;
border: 1px solid #e9ecef;
padding: 15px;
margin: 10px 0;
border-radius: 4px;
white-space: pre-wrap;
font-family: monospace;
}
.error {
background-color: #f8d7da;
border-color: #f5c6cb;
color: #721c24;
}
.success {
background-color: #d4edda;
border-color: #c3e6cb;
color: #155724;
}
.info {
background-color: #d1ecf1;
border-color: #bee5eb;
color: #0c5460;
padding: 10px;
border-radius: 4px;
margin: 10px 0;
}
</style>
</head>
<body>
<div class="container">
<div class="section">
<h3>🔍 Búsqueda Web con TAVILY</h3>
<label for="searchQuery">Consulta de búsqueda:</label>
<input type="text" id="searchQuery" placeholder="ej: ¿en qué país está Londres?">
<button onclick="testWebSearch()">Buscar en Web</button>
<div id="webSearchResult" class="result" style="display: none;"></div>
</div>
</div>
<script>
// Detectar URL del servidor basado en el entorno
const isRailway = window.location.hostname.includes('railway.app') || window.location.hostname.includes('noemtoqueuelsdallonses.site');
const SERVER_URL = isRailway ? window.location.origin : 'http://localhost:8001';
// Debug logging
console.log('Entorno detectado:', isRailway ? 'Railway' : 'Local');
console.log('SERVER_URL:', SERVER_URL);
async function makeMcpRequest(toolName, arguments = {}) {
const requestBody = {
jsonrpc: "2.0",
id: "1",
method: "tools/call",
params: {
name: toolName,
arguments: arguments
}
};
try {
console.log('Enviando petición MCP a:', `${SERVER_URL}/mcp`);
console.log('Request body:', requestBody);
const response = await fetch(`${SERVER_URL}/mcp`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
credentials: 'include', // Incluir cookies
body: JSON.stringify(requestBody)
});
console.log('Response status:', response.status);
console.log('Response headers:', Object.fromEntries(response.headers.entries()));
const result = await response.json();
console.log('Response body:', result);
if (response.ok && result.result) {
return result.result.content[0].text;
} else {
throw new Error(result.error ? result.error.message : 'Error en la respuesta');
}
} catch (error) {
console.error('Error en makeMcpRequest:', error);
throw new Error(`Error: ${error.message}`);
}
}
function showResult(elementId, message, isError = false) {
const element = document.getElementById(elementId);
element.textContent = message;
element.className = `result ${isError ? 'error' : 'success'}`;
element.style.display = 'block';
}
async function testWebSearch() {
try {
const query = document.getElementById('searchQuery').value;
if (!query) {
showResult('webSearchResult', 'Por favor, introduce una consulta de búsqueda', true);
return;
}
const result = await makeMcpRequest('web_search', { query: query });
showResult('webSearchResult', result);
} catch (error) {
showResult('webSearchResult', error.message, true);
}
}
</script>
</body>
</html>
"""
# Endpoints MCP HTTP (ahora protegidos)
@app.get("/")
async def root():
"""Endpoint raíz con información del servidor"""
return {
"name": "MCP HTTP Server",
"version": "1.0.0",
"description": "Servidor MCP con transporte HTTP",
"protocol": "MCP over HTTP",
"tools": ["web_search"],
"status": "running"
}
@app.post("/mcp")
async def mcp_endpoint(
request: MCPRequest,
http_request: Request
):
"""
Endpoint principal MCP que maneja todas las peticiones del protocolo MCP
"""
logger.info(f"Petición MCP recibida: {request.method}")
try:
if request.method == "initialize":
return await handle_initialize(request)
elif request.method == "notifications/initialized":
return await handle_initialized(request)
elif request.method == "tools/list":
# tools/list no requiere autenticación específica
return await handle_tools_list(request)
elif request.method == "tools/call":
# tools/call requiere autenticación
# Debug: verificar cookies recibidas
cookies = http_request.cookies
logger.info(f"Cookies recibidas en MCP: {list(cookies.keys())}")
logger.info(f"Access token presente: {'access_token' in cookies}")
try:
current_user = await get_user_with_mcp_read_from_request(http_request)
logger.info(f"Usuario autenticado: {current_user.email}")
return await handle_tools_call(request, current_user)
except HTTPException as e:
logger.warning(f"Error de autenticación en MCP: {e.detail}")
return MCPResponse(
id=request.id,
error={
"code": -32603,
"message": f"Error de autenticación: {e.detail}"
}
)
else:
return MCPResponse(
id=request.id,
error={
"code": -32601,
"message": f"Método no encontrado: {request.method}"
}
)
except Exception as e:
logger.error(f"Error procesando petición MCP: {e}", exc_info=True)
return MCPResponse(
id=request.id,
error={
"code": -32603,
"message": f"Error interno: {str(e)}"
}
)
async def handle_initialize(request: MCPRequest) -> MCPResponse:
"""Maneja la inicialización del servidor MCP"""
logger.info("Inicializando servidor MCP")
params = request.params or {}
server_state["client_info"] = params.get("clientInfo", {})
server_state["capabilities"] = params.get("capabilities", {})
server_state["initialized"] = True
return MCPResponse(
id=request.id,
result={
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "mcp-http-starter",
"version": "1.0.0"
}
}
)
async def handle_initialized(request: MCPRequest) -> MCPResponse:
"""Maneja la notificación de inicialización completada"""
logger.info("Servidor MCP inicializado completamente")
# Las notificaciones no requieren respuesta
return MCPResponse(id=request.id, result=None)
async def handle_tools_list(request: MCPRequest) -> MCPResponse:
"""Lista las herramientas disponibles"""
logger.info("Listando herramientas disponibles")
tools = [
{
"name": "web_search",
"description": "Realiza una búsqueda web usando TAVILY API con fallback a DuckDuckGo. "
"Combina búsqueda web con procesamiento local usando Ollama cuando está disponible.",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Consulta de búsqueda"
},
"model": {
"type": "string",
"description": "Modelo de Ollama a usar (opcional, por defecto gpt-oss:20b)",
"default": "gpt-oss:20b"
}
},
"required": ["query"]
}
}
]
return MCPResponse(
id=request.id,
result={"tools": tools}
)
async def handle_tools_call(request: MCPRequest, current_user: User) -> MCPResponse:
"""Ejecuta una herramienta específica"""
params = request.params or {}
tool_name = params.get("name")
arguments = params.get("arguments", {})
logger.info(f"Ejecutando herramienta: {tool_name} para usuario: {current_user.email}")
logger.debug(f"Argumentos: {arguments}")
# Verificar permisos específicos por herramienta
if tool_name == "web_search" and current_user.role == "readonly":
return MCPResponse(
id=request.id,
error={
"code": -32603,
"message": "Permisos insuficientes para web_search"
}
)
try:
if tool_name == "web_search":
query = arguments.get("query", "")
model = arguments.get("model", "gpt-oss:20b")
result = await web_search_tool(query, model)
else:
raise ValueError(f"Herramienta desconocida: {tool_name}")
return MCPResponse(
id=request.id,
result={
"content": [
{
"type": "text",
"text": str(result)
}
]
}
)
except Exception as e:
logger.error(f"Error ejecutando herramienta {tool_name}: {e}", exc_info=True)
return MCPResponse(
id=request.id,
error={
"code": -32603,
"message": f"Error ejecutando herramienta: {str(e)}"
}
)
# Endpoints adicionales para facilitar el testing
@app.get("/health")
async def health_check():
"""Endpoint de salud del servidor"""
return {
"status": "healthy",
"timestamp": datetime.now(timezone.utc).isoformat(),
"initialized": server_state["initialized"]
}
@app.get("/tools")
async def list_tools_endpoint():
"""Endpoint directo para listar herramientas (sin MCP)"""
return {
"tools": [
{
"name": "web_search",
"description": "Realiza una búsqueda web usando TAVILY API con fallback a DuckDuckGo. Combina búsqueda web con procesamiento local usando Ollama cuando está disponible."
}
]
}
if __name__ == "__main__":
import uvicorn
import os
# Configuración para producción (Railway)
port = int(os.getenv("PORT", 8001))
host = "0.0.0.0" if os.getenv("RAILWAY_ENVIRONMENT") else "127.0.0.1"
environment = os.getenv("ENVIRONMENT", "development")
if environment == "production":
logger.info("Iniciando servidor MCP HTTP en modo PRODUCCIÓN...")
logger.info("Herramientas registradas: web_search")
logger.info(f"Servidor disponible en: https://mcptavilidateoaut-production.up.railway.app")
logger.info("Documentación API: /docs")
else:
logger.info("Iniciando servidor MCP HTTP en modo DESARROLLO...")
logger.info("Herramientas registradas: web_search")
logger.info(f"Servidor disponible en: http://localhost:{port}")
logger.info("Documentación API: http://localhost:8001/docs")
uvicorn.run(
app,
host=host,
port=port,
log_level="info"
)