Skip to main content
Glama
growley

MCP HTTP TAVILY DATE OAUTH

by growley
server.py38.8 kB
#server.py import asyncio import inspect import requests import json import sys import os import mimetypes from datetime import datetime, timezone from typing import Optional, Dict, Any, List from uuid import uuid4 from pathlib import Path from fastapi import FastAPI, HTTPException, Request, Depends, Query from fastapi.responses import JSONResponse, RedirectResponse, HTMLResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import ollama from tavily import TavilyClient from dotenv import load_dotenv # Cargar variables de entorno desde .env ANTES de importar módulos locales load_dotenv() # Importar módulos locales try: from .logger import get_logger from .auth_models import ( User, TokenRequest, TokenResponse, UserInfo, AuthorizeRequest, OAuthProvider, AuthError ) from .auth_service import auth_service from .auth_middleware import ( auth_middleware, get_current_active_user, get_user_with_mcp_read, get_user_with_mcp_write, get_user_with_mcp_read_from_request ) except ImportError: # Fallback para ejecución directa from logger import get_logger from auth_models import ( User, TokenRequest, TokenResponse, UserInfo, AuthorizeRequest, OAuthProvider, AuthError ) from auth_service import auth_service from auth_middleware import ( auth_middleware, get_current_active_user, get_user_with_mcp_read, get_user_with_mcp_write, get_user_with_mcp_read_from_request ) # Configurar logger logger = get_logger("mcp-server") # Debug: verificar que las variables se cargan logger.info("Verificando carga de variables de entorno...") logger.info(f"GOOGLE_CLIENT_ID: {'✓' if os.getenv('GOOGLE_CLIENT_ID') else '✗'}") logger.info(f"GOOGLE_CLIENT_SECRET: {'✓' if os.getenv('GOOGLE_CLIENT_SECRET') else '✗'}") logger.info(f"GITHUB_CLIENT_ID: {'✓' if os.getenv('GITHUB_CLIENT_ID') else '✗'}") logger.info(f"GITHUB_CLIENT_SECRET: {'✓' if os.getenv('GITHUB_CLIENT_SECRET') else '✗'}") logger.info(f"TAVILY_API_KEY: {'✓' if os.getenv('TAVILY_API_KEY') else '✗'}") # Instancia principal del servidor FastAPI app = FastAPI( title="MCP HTTP Server", description="Servidor MCP con transporte HTTP en lugar de STDIO", version="1.0.0" ) # Configurar CORS para permitir acceso desde cualquier origen app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Agregar middleware de autenticación app.middleware("http")(auth_middleware) # Modelos Pydantic para las peticiones MCP class MCPRequest(BaseModel): jsonrpc: str = "2.0" id: Optional[str] = None method: str params: Optional[Dict[str, Any]] = None class MCPResponse(BaseModel): jsonrpc: str = "2.0" id: Optional[str] = None result: Optional[Dict[str, Any]] = None error: Optional[Dict[str, Any]] = None class InitializeParams(BaseModel): protocolVersion: str capabilities: Dict[str, Any] = {} clientInfo: Dict[str, Any] = {} class ToolCallParams(BaseModel): name: str arguments: Dict[str, Any] = {} # Estado del servidor server_state = { "initialized": False, "client_info": None, "capabilities": {} } logger.info("Servidor MCP HTTP inicializado") def get_current_datetime_iso() -> str: """Devuelve la fecha y hora actuales en formato ISO-8601 (UTC).""" logger.debug("Obteniendo fecha/hora actual") current_timestamp_iso: str = datetime.now(timezone.utc).isoformat() logger.debug(f"Fecha/hora obtenida: {current_timestamp_iso}") return current_timestamp_iso # Función datetime_now eliminada def search_web_tavily(query: str, num_results: int = 3) -> str: """ Realiza una búsqueda web usando TAVILY API (optimizada para LLMs). Devuelve resultados estructurados y relevantes. """ logger.debug(f"Iniciando búsqueda TAVILY para: {query}") try: # Obtener API key de variable de entorno api_key = os.getenv("TAVILY_API_KEY") if not api_key: logger.warning("TAVILY_API_KEY no configurada, usando DuckDuckGo como fallback") return search_web_simple(query, num_results) logger.debug(f"TAVILY_API_KEY encontrada: {api_key[:10]}...{api_key[-4:] if len(api_key) > 14 else '***'}") # Crear cliente TAVILY client = TavilyClient(api_key=api_key) # Realizar búsqueda con TAVILY response = client.search( query=query, search_depth="basic", # Usar búsqueda básica para ahorrar créditos max_results=num_results, include_answer=True, # Incluir respuesta directa si está disponible include_raw_content=False # No incluir contenido raw para ahorrar tokens ) results = [] # Procesar respuesta directa si está disponible if response.get("answer"): results.append(f"Respuesta directa: {response['answer']}") # Procesar resultados de búsqueda if response.get("results"): for result in response["results"][:num_results]: title = result.get("title", "Sin título") content = result.get("content", "Sin contenido") url = result.get("url", "") results.append(f"- {title}") results.append(f" {content}") if url: results.append(f" Fuente: {url}") results.append("") # Línea en blanco para separar if results: logger.debug(f"TAVILY completada: {len(results)} elementos") return "\n".join(results) else: logger.warning("TAVILY no devolvió resultados, usando DuckDuckGo") return search_web_simple(query, num_results) except Exception as e: logger.error(f"Error en TAVILY: {e}", exc_info=True) if "Unauthorized" in str(e) or "invalid API key" in str(e).lower(): logger.error("TAVILY_API_KEY es inválida o ha expirado. Verifica la configuración en Railway.") logger.info("Fallback a DuckDuckGo") return search_web_simple(query, num_results) def search_web_simple(query: str, num_results: int = 3) -> str: """ Realiza una búsqueda web simple usando DuckDuckGo (sin API key requerida). Devuelve los primeros resultados como texto plano. """ logger.debug(f"Iniciando búsqueda web para: {query}") try: # Usar DuckDuckGo Instant Answer API (gratuita, sin API key) url = "https://api.duckduckgo.com/" params = { "q": query, "format": "json", "no_html": "1", "skip_disambig": "1", "t": "web" # Forzar búsqueda web en lugar de instant answer } response = requests.get(url, params=params, timeout=10) response.raise_for_status() data = response.json() # Extraer información relevante results = [] # Abstract (resumen) if data.get("Abstract"): results.append(f"Resumen: {data['Abstract']}") # Definición if data.get("Definition"): results.append(f"Definición: {data['Definition']}") # Resultados relacionados if data.get("RelatedTopics"): for topic in data["RelatedTopics"][:num_results]: if isinstance(topic, dict) and topic.get("Text"): results.append(f"- {topic['Text']}") # Si no hay resultados específicos, usar el abstract if not results and data.get("Abstract"): results.append(data["Abstract"]) # Log de datos recibidos para debugging logger.debug(f"Datos DuckDuckGo recibidos: Abstract={bool(data.get('Abstract'))}, Definition={bool(data.get('Definition'))}, RelatedTopics={len(data.get('RelatedTopics', []))}") # Si aún no hay resultados, crear una respuesta informativa if not results: results.append(f"No se encontraron resultados específicos para '{query}'") results.append("") results.append("Esto puede deberse a:") results.append("- Limitaciones de la API de DuckDuckGo") results.append("- Consulta muy específica o reciente") results.append("- Problemas de conectividad") results.append("") results.append("Recomendación: Intenta reformular tu consulta o verifica la configuración de TAVILY_API_KEY.") logger.debug(f"Búsqueda web completada: {len(results)} resultados") return "\n".join(results) except Exception as e: logger.error(f"Error en búsqueda web: {e}", exc_info=True) # Fallback con información básica sobre Ollama return f"""Información sobre '{query}': Ollama es una herramienta de código abierto que permite ejecutar modelos de lenguaje de gran tamaño (LLMs) localmente en tu dispositivo. Características principales: - Ejecuta modelos como Llama, Mistral, CodeLlama localmente - No requiere conexión a internet para funcionar - Mantiene privacidad de tus datos - Soporte para múltiples sistemas operativos - API compatible con OpenAI Para instalar: https://ollama.com/download Para más información: https://ollama.com""" # Herramientas de directorio eliminadas - ya no se necesitan # Funciones de directorio eliminadas async def web_search_tool(query: str, model: str = "gpt-oss:20b") -> str: """ Realiza una búsqueda web y la procesa usando Ollama localmente. Args: query: Consulta de búsqueda model: Modelo de Ollama a usar (por defecto: gpt-oss:20b) Returns: Respuesta procesada por Ollama basada en los resultados de búsqueda web """ logger.info(f"Herramienta web_search llamada - Query: {query}, Model: {model}") try: # 1. Realizar búsqueda web (TAVILY primero, DuckDuckGo como fallback) logger.debug("Realizando búsqueda web...") web_results = search_web_tavily(query) logger.debug(f"Resultados web obtenidos: {len(web_results)} caracteres") # 2. Intentar usar Ollama si está disponible logger.debug(f"Intentando conectar con Ollama usando modelo: {model}") try: # Preparar prompt para Ollama prompt = f"""Basándote en los siguientes resultados de búsqueda web, responde de manera útil y precisa a la consulta del usuario. Consulta del usuario: {query} Resultados de búsqueda web: {web_results} Por favor, proporciona una respuesta clara y bien estructurada basada en esta información.""" # Usar Ollama para procesar la información logger.debug("Creando cliente Ollama...") client = ollama.Client(host='http://127.0.0.1:11434') logger.debug("Enviando prompt a Ollama...") response = client.chat( model=model, messages=[{ 'role': 'user', 'content': prompt }] ) result = f"[Ollama] Búsqueda web procesada con {model}:\n\n{response['message']['content']}" logger.info(f"web_search completada con Ollama: {len(result)} caracteres") logger.debug(f"Respuesta completa de Ollama: {result[:500]}...") # Primeros 500 caracteres return result except Exception as ollama_error: logger.warning(f"Ollama no disponible: {ollama_error}") # Si Ollama no está disponible, devolver resultados web directamente logger.info(f"Devolviendo resultados de Tavily directamente: {len(web_results)} caracteres") return web_results except Exception as e: logger.error(f"Error en web_search: {e}", exc_info=True) return f"Error en web_search: {str(e)}" # Endpoints de autenticación OAuth2 @app.get("/auth/providers") async def get_oauth_providers(): """Obtener lista de proveedores OAuth2 disponibles""" providers = auth_service.get_available_providers() return { "providers": [ { "name": provider.value, "display_name": provider.value.title(), "auth_url": f"/auth/{provider.value}" } for provider in providers ] } @app.get("/auth/success", response_class=HTMLResponse) async def auth_success( request: Request, access_token: Optional[str] = Query(None), refresh_token: Optional[str] = Query(None) ): """Página de éxito de autenticación""" # Si hay tokens en la URL, configurar cookies seguras if access_token and refresh_token: response = HTMLResponse(""" <!DOCTYPE html> <html> <head> <title>Autenticación Exitosa</title> <style> body { font-family: Arial, sans-serif; max-width: 400px; margin: 100px auto; padding: 20px; text-align: center; } .warning { background-color: #fff3cd; border: 1px solid #ffeaa7; padding: 15px; margin: 20px 0; border-radius: 5px; } .success { background-color: #d4edda; border: 1px solid #c3e6cb; padding: 15px; margin: 20px 0; border-radius: 5px; } </style> </head> <body> <h1>🔐 Autenticación Exitosa</h1> <div class="success"> <p>¡Autenticación completada exitosamente!</p> </div> <p><a href="/docs">📖 Ver documentación de la API</a></p> <p><a href="/test-page">🧪 Ir a página de pruebas</a></p> <script> // Limpiar la URL sin recargar la página if (window.history.replaceState) { window.history.replaceState({}, document.title, '/auth/success'); } </script> </body> </html> """) # Configurar cookies seguras # Detectar si estamos en producción environment = os.getenv("ENVIRONMENT", "development") railway_env = os.getenv("RAILWAY_ENVIRONMENT") is_production = environment == "production" or railway_env is not None cookie_secure = bool(is_production) cookie_samesite = "lax" # Usar "lax" tanto en producción como desarrollo para mejor compatibilidad # Debug logging de cookies logger.info(f"Configurando cookies - secure: {cookie_secure}, samesite: {cookie_samesite}") response.set_cookie( key="access_token", value=access_token, httponly=True, # No accesible desde JavaScript secure=cookie_secure, # True en producción (HTTPS), False en desarrollo samesite=cookie_samesite, # lax para mejor compatibilidad max_age=3600 # 1 hora ) response.set_cookie( key="refresh_token", value=refresh_token, httponly=True, secure=cookie_secure, # True en producción (HTTPS), False en desarrollo samesite=cookie_samesite, max_age=30*24*3600 # 30 días ) logger.info("Cookies configuradas correctamente") return response # Si no hay tokens, mostrar página normal return HTMLResponse(""" <!DOCTYPE html> <html> <head> <title>Autenticación Exitosa</title> <style> body { font-family: Arial, sans-serif; max-width: 400px; margin: 100px auto; padding: 20px; text-align: center; } </style> </head> <body> <h1>¡Autenticación Exitosa!</h1> <p>Ya puedes usar el servidor MCP con tu token de acceso.</p> <p><a href="/docs">Ver documentación de la API</a></p> </body> </html> """) @app.get("/auth/{provider}") async def oauth_authorize( provider: str, redirect_uri: Optional[str] = Query(None, description="URI de redirección después de la autenticación") ): """Iniciar flujo de autorización OAuth2""" try: oauth_provider = OAuthProvider(provider) except ValueError: raise HTTPException( status_code=400, detail=f"Proveedor no soportado: {provider}" ) if oauth_provider not in auth_service.get_available_providers(): raise HTTPException( status_code=400, detail=f"Proveedor no configurado: {provider}" ) try: # Usar redirect_uri por defecto si no se proporciona # Detectar si estamos en producción (Railway siempre tiene PORT) railway_port = os.getenv("PORT") if railway_port: # Estamos en Railway base_url = "https://mcptavilidateoaut-production.up.railway.app" else: # Desarrollo local base_url = "http://localhost:8001" final_redirect_uri = redirect_uri or f"{base_url}/auth/success" # Debug logging logger.info(f"OAuth {provider} - base_url: {base_url}") logger.info(f"OAuth {provider} - final_redirect_uri: {final_redirect_uri}") auth_url, state = auth_service.create_oauth_authorization_url( oauth_provider, final_redirect_uri ) logger.info(f"Redirigiendo a {provider} para autenticación") logger.info(f"Auth URL generada: {auth_url}") return RedirectResponse(url=auth_url) except AuthError as e: logger.error(f"Error creando URL de autorización: {e.message}") raise HTTPException(status_code=400, detail=e.message) @app.get("/auth/callback/{provider}") async def oauth_callback( provider: str, code: str = Query(..., description="Código de autorización"), state: str = Query(..., description="Estado para prevenir CSRF"), redirect_uri: Optional[str] = Query(None, description="URI de redirección original (opcional)") ): """Manejar callback OAuth2""" try: oauth_provider = OAuthProvider(provider) except ValueError: raise HTTPException( status_code=400, detail=f"Proveedor no soportado: {provider}" ) # Debug logging logger.info(f"OAuth callback {provider} - code: {code[:10]}...") logger.info(f"OAuth callback {provider} - state: {state}") logger.info(f"OAuth callback {provider} - redirect_uri: {redirect_uri}") try: access_token, refresh_token = auth_service.handle_oauth_callback( oauth_provider, code, state, redirect_uri ) # Usar redirect_uri del estado OAuth si no se proporciona en la query final_redirect_uri = redirect_uri if not final_redirect_uri: # Obtener redirect_uri del estado OAuth if state in auth_service.oauth_states: final_redirect_uri = auth_service.oauth_states[state].redirect_uri else: # Fallback con detección de entorno railway_port = os.getenv("PORT") if railway_port: # Estamos en Railway base_url = "https://mcptavilidateoaut-production.up.railway.app" else: # Desarrollo local base_url = "http://localhost:8001" final_redirect_uri = f"{base_url}/auth/success" # Redirigir a la página de éxito con los tokens en la URL (para desarrollo) # En producción, usar cookies seguras o almacenamiento en sesión success_url = f"{final_redirect_uri}?access_token={access_token}&refresh_token={refresh_token}" return RedirectResponse(url=success_url) except AuthError as e: logger.error(f"Error en callback OAuth2: {e.message}") raise HTTPException(status_code=400, detail=e.message) @app.post("/auth/token", response_model=TokenResponse) async def refresh_token(token_request: TokenRequest): """Refrescar token de acceso""" if token_request.grant_type != "refresh_token": raise HTTPException( status_code=400, detail="grant_type debe ser 'refresh_token'" ) if not token_request.refresh_token: raise HTTPException( status_code=400, detail="refresh_token requerido" ) try: new_access_token = auth_service.refresh_access_token(token_request.refresh_token) return TokenResponse( access_token=new_access_token, expires_in=auth_service.access_token_expire_minutes * 60 ) except AuthError as e: logger.error(f"Error refrescando token: {e.message}") raise HTTPException(status_code=401, detail=e.message) @app.get("/auth/me", response_model=UserInfo) async def get_current_user_info(current_user: User = Depends(get_current_active_user)): """Obtener información del usuario actual""" return UserInfo( id=current_user.id, email=current_user.email, name=current_user.name, avatar_url=current_user.avatar_url, role=current_user.role, provider=current_user.provider, created_at=current_user.created_at, last_login=current_user.last_login ) @app.post("/auth/logout") async def logout( token_request: TokenRequest, current_user: User = Depends(get_current_active_user) ): """Cerrar sesión y revocar tokens""" if token_request.refresh_token: auth_service.revoke_refresh_token(token_request.refresh_token) logger.info(f"Usuario desconectado: {current_user.email}") return {"message": "Sesión cerrada exitosamente"} @app.get("/auth/stats") async def get_auth_stats(current_user: User = Depends(get_current_active_user)): """Obtener estadísticas de autenticación (solo para administradores)""" if current_user.role != "admin": raise HTTPException( status_code=403, detail="Acceso denegado: se requieren permisos de administrador" ) return auth_service.get_stats() # Página de login simple @app.get("/login", response_class=HTMLResponse) async def login_page(): """Página de login simple""" providers = auth_service.get_available_providers() html_content = """ <!DOCTYPE html> <html> <head> <title>MCP Server - Login</title> <style> body { font-family: Arial, sans-serif; max-width: 400px; margin: 100px auto; padding: 20px; } .provider { display: block; margin: 10px 0; padding: 10px; background: #f0f0f0; text-decoration: none; color: black; border-radius: 5px; } .provider:hover { background: #e0e0e0; } h1 { text-align: center; } .no-providers { color: #666; text-align: center; margin: 20px 0; } </style> </head> <body> <h1>MCP Server Login</h1> """ if providers: html_content += "<p>Selecciona un proveedor para autenticarte:</p>" for provider in providers: # Detectar entorno para URL base railway_port = os.getenv("PORT") if railway_port: # Estamos en Railway base_url = "https://mcptavilidateoaut-production.up.railway.app" else: # Desarrollo local base_url = "http://localhost:8001" html_content += f""" <a href="/auth/{provider.value}?redirect_uri={base_url}/auth/success" class="provider"> Iniciar sesión con {provider.value.title()} </a> """ else: html_content += """ <div class="no-providers"> <p>No hay proveedores OAuth configurados.</p> <p>Configura al menos un proveedor (Google, GitHub, Microsoft) en el archivo .env para habilitar la autenticación.</p> <p><a href="/">Volver al inicio</a></p> </div> """ html_content += """ </body> </html> """ return html_content # Página de pruebas mejorada con cookies @app.get("/test-page", response_class=HTMLResponse) async def test_page(): """Página de pruebas para la herramienta web_search""" return """ <!DOCTYPE html> <html lang="es"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Búsqueda Web</title> <style> body { font-family: Arial, sans-serif; max-width: 800px; margin: 20px auto; padding: 20px; background-color: #f5f5f5; } .container { background: white; padding: 30px; border-radius: 10px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); } h1 { color: #333; text-align: center; } .section { margin: 20px 0; padding: 15px; border: 1px solid #ddd; border-radius: 5px; } .section h3 { margin-top: 0; color: #555; } input[type="text"] { width: 100%; padding: 10px; margin: 5px 0; border: 1px solid #ccc; border-radius: 4px; } button { background-color: #007bff; color: white; padding: 10px 20px; border: none; border-radius: 4px; cursor: pointer; margin: 5px; } button:hover { background-color: #0056b3; } .result { background-color: #f8f9fa; border: 1px solid #e9ecef; padding: 15px; margin: 10px 0; border-radius: 4px; white-space: pre-wrap; font-family: monospace; } .error { background-color: #f8d7da; border-color: #f5c6cb; color: #721c24; } .success { background-color: #d4edda; border-color: #c3e6cb; color: #155724; } .info { background-color: #d1ecf1; border-color: #bee5eb; color: #0c5460; padding: 10px; border-radius: 4px; margin: 10px 0; } </style> </head> <body> <div class="container"> <div class="section"> <h3>🔍 Búsqueda Web con TAVILY</h3> <label for="searchQuery">Consulta de búsqueda:</label> <input type="text" id="searchQuery" placeholder="ej: ¿en qué país está Londres?"> <button onclick="testWebSearch()">Buscar en Web</button> <div id="webSearchResult" class="result" style="display: none;"></div> </div> </div> <script> // Detectar URL del servidor basado en el entorno const isRailway = window.location.hostname.includes('railway.app') || window.location.hostname.includes('noemtoqueuelsdallonses.site'); const SERVER_URL = isRailway ? window.location.origin : 'http://localhost:8001'; // Debug logging console.log('Entorno detectado:', isRailway ? 'Railway' : 'Local'); console.log('SERVER_URL:', SERVER_URL); async function makeMcpRequest(toolName, arguments = {}) { const requestBody = { jsonrpc: "2.0", id: "1", method: "tools/call", params: { name: toolName, arguments: arguments } }; try { console.log('Enviando petición MCP a:', `${SERVER_URL}/mcp`); console.log('Request body:', requestBody); const response = await fetch(`${SERVER_URL}/mcp`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, credentials: 'include', // Incluir cookies body: JSON.stringify(requestBody) }); console.log('Response status:', response.status); console.log('Response headers:', Object.fromEntries(response.headers.entries())); const result = await response.json(); console.log('Response body:', result); if (response.ok && result.result) { return result.result.content[0].text; } else { throw new Error(result.error ? result.error.message : 'Error en la respuesta'); } } catch (error) { console.error('Error en makeMcpRequest:', error); throw new Error(`Error: ${error.message}`); } } function showResult(elementId, message, isError = false) { const element = document.getElementById(elementId); element.textContent = message; element.className = `result ${isError ? 'error' : 'success'}`; element.style.display = 'block'; } async function testWebSearch() { try { const query = document.getElementById('searchQuery').value; if (!query) { showResult('webSearchResult', 'Por favor, introduce una consulta de búsqueda', true); return; } const result = await makeMcpRequest('web_search', { query: query }); showResult('webSearchResult', result); } catch (error) { showResult('webSearchResult', error.message, true); } } </script> </body> </html> """ # Endpoints MCP HTTP (ahora protegidos) @app.get("/") async def root(): """Endpoint raíz con información del servidor""" return { "name": "MCP HTTP Server", "version": "1.0.0", "description": "Servidor MCP con transporte HTTP", "protocol": "MCP over HTTP", "tools": ["web_search"], "status": "running" } @app.post("/mcp") async def mcp_endpoint( request: MCPRequest, http_request: Request ): """ Endpoint principal MCP que maneja todas las peticiones del protocolo MCP """ logger.info(f"Petición MCP recibida: {request.method}") try: if request.method == "initialize": return await handle_initialize(request) elif request.method == "notifications/initialized": return await handle_initialized(request) elif request.method == "tools/list": # tools/list no requiere autenticación específica return await handle_tools_list(request) elif request.method == "tools/call": # tools/call requiere autenticación # Debug: verificar cookies recibidas cookies = http_request.cookies logger.info(f"Cookies recibidas en MCP: {list(cookies.keys())}") logger.info(f"Access token presente: {'access_token' in cookies}") try: current_user = await get_user_with_mcp_read_from_request(http_request) logger.info(f"Usuario autenticado: {current_user.email}") return await handle_tools_call(request, current_user) except HTTPException as e: logger.warning(f"Error de autenticación en MCP: {e.detail}") return MCPResponse( id=request.id, error={ "code": -32603, "message": f"Error de autenticación: {e.detail}" } ) else: return MCPResponse( id=request.id, error={ "code": -32601, "message": f"Método no encontrado: {request.method}" } ) except Exception as e: logger.error(f"Error procesando petición MCP: {e}", exc_info=True) return MCPResponse( id=request.id, error={ "code": -32603, "message": f"Error interno: {str(e)}" } ) async def handle_initialize(request: MCPRequest) -> MCPResponse: """Maneja la inicialización del servidor MCP""" logger.info("Inicializando servidor MCP") params = request.params or {} server_state["client_info"] = params.get("clientInfo", {}) server_state["capabilities"] = params.get("capabilities", {}) server_state["initialized"] = True return MCPResponse( id=request.id, result={ "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "mcp-http-starter", "version": "1.0.0" } } ) async def handle_initialized(request: MCPRequest) -> MCPResponse: """Maneja la notificación de inicialización completada""" logger.info("Servidor MCP inicializado completamente") # Las notificaciones no requieren respuesta return MCPResponse(id=request.id, result=None) async def handle_tools_list(request: MCPRequest) -> MCPResponse: """Lista las herramientas disponibles""" logger.info("Listando herramientas disponibles") tools = [ { "name": "web_search", "description": "Realiza una búsqueda web usando TAVILY API con fallback a DuckDuckGo. " "Combina búsqueda web con procesamiento local usando Ollama cuando está disponible.", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "Consulta de búsqueda" }, "model": { "type": "string", "description": "Modelo de Ollama a usar (opcional, por defecto gpt-oss:20b)", "default": "gpt-oss:20b" } }, "required": ["query"] } } ] return MCPResponse( id=request.id, result={"tools": tools} ) async def handle_tools_call(request: MCPRequest, current_user: User) -> MCPResponse: """Ejecuta una herramienta específica""" params = request.params or {} tool_name = params.get("name") arguments = params.get("arguments", {}) logger.info(f"Ejecutando herramienta: {tool_name} para usuario: {current_user.email}") logger.debug(f"Argumentos: {arguments}") # Verificar permisos específicos por herramienta if tool_name == "web_search" and current_user.role == "readonly": return MCPResponse( id=request.id, error={ "code": -32603, "message": "Permisos insuficientes para web_search" } ) try: if tool_name == "web_search": query = arguments.get("query", "") model = arguments.get("model", "gpt-oss:20b") result = await web_search_tool(query, model) else: raise ValueError(f"Herramienta desconocida: {tool_name}") return MCPResponse( id=request.id, result={ "content": [ { "type": "text", "text": str(result) } ] } ) except Exception as e: logger.error(f"Error ejecutando herramienta {tool_name}: {e}", exc_info=True) return MCPResponse( id=request.id, error={ "code": -32603, "message": f"Error ejecutando herramienta: {str(e)}" } ) # Endpoints adicionales para facilitar el testing @app.get("/health") async def health_check(): """Endpoint de salud del servidor""" return { "status": "healthy", "timestamp": datetime.now(timezone.utc).isoformat(), "initialized": server_state["initialized"] } @app.get("/tools") async def list_tools_endpoint(): """Endpoint directo para listar herramientas (sin MCP)""" return { "tools": [ { "name": "web_search", "description": "Realiza una búsqueda web usando TAVILY API con fallback a DuckDuckGo. Combina búsqueda web con procesamiento local usando Ollama cuando está disponible." } ] } if __name__ == "__main__": import uvicorn import os # Configuración para producción (Railway) port = int(os.getenv("PORT", 8001)) host = "0.0.0.0" if os.getenv("RAILWAY_ENVIRONMENT") else "127.0.0.1" environment = os.getenv("ENVIRONMENT", "development") if environment == "production": logger.info("Iniciando servidor MCP HTTP en modo PRODUCCIÓN...") logger.info("Herramientas registradas: web_search") logger.info(f"Servidor disponible en: https://mcptavilidateoaut-production.up.railway.app") logger.info("Documentación API: /docs") else: logger.info("Iniciando servidor MCP HTTP en modo DESARROLLO...") logger.info("Herramientas registradas: web_search") logger.info(f"Servidor disponible en: http://localhost:{port}") logger.info("Documentación API: http://localhost:8001/docs") uvicorn.run( app, host=host, port=port, log_level="info" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/growley/MCP_TAVILI_DATE_OAUT'

If you have feedback or need assistance with the MCP directory API, please join our Discord server