Skip to main content
Glama

RTIdeas MCP API Server

rtideas_api_server.py32.9 kB
#!/usr/bin/env python3 """ RTIdeas API Server - Remote Model Context Protocol Implementation Servidor API REST para RTIdeas con soporte MongoDB """ import asyncio import json import logging import os from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Union from fastapi import FastAPI, HTTPException, Query, Path, Body from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel, Field import uvicorn # Import MCP configuration and MongoDB connector from config import ( MCP_SERVER_NAME, MCP_SERVER_VERSION, validate_config, get_config_summary, DEFAULT_SEARCH_LIMIT, MAX_SEARCH_LIMIT, DEFAULT_CLUSTER_METHOD, DEFAULT_CLUSTER_K, DEFAULT_SIMILARITY_THRESHOLD, API_HOST, API_PORT, API_RELOAD, CORS_ORIGINS, CORS_CREDENTIALS ) from mongodb_connector import get_mongodb_connector, MongoDBConnector # Configurar logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Pydantic models for API requests/responses class ToolRequest(BaseModel): """Base model for tool requests""" arguments: Dict[str, Any] = Field(default_factory=dict) class ListSessionsRequest(BaseModel): """Request model for list_sessions tool""" limit: int = Field(default=20, ge=1, le=100) adminId: Optional[str] = None class GetSessionRequest(BaseModel): """Request model for get_session tool""" sessionId: str class ListIdeasRequest(BaseModel): """Request model for list_ideas tool""" sessionId: str limit: int = Field(default=20, ge=1, le=100) class SearchIdeasRequest(BaseModel): """Request model for search_ideas tool""" sessionId: str query: str limit: int = Field(default=10, ge=1, le=50) class GetIdeaConnectionsRequest(BaseModel): """Request model for get_idea_connections tool""" sessionId: str limit: int = Field(default=50, ge=1, le=100) class GetIdeaClustersRequest(BaseModel): """Request model for get_idea_clusters tool""" sessionId: str class GetSessionStatsRequest(BaseModel): """Request model for get_session_stats tool""" sessionId: str class AnalyzeSessionRequest(BaseModel): """Request model for analyze_session tool""" sessionId: str include_clusters: bool = True include_connections: bool = True class HelloRTIdeasRequest(BaseModel): """Request model for hello_rtideas tool""" name: str = "Usuario" class APIResponse(BaseModel): """Standard API response model""" success: bool data: Any = None error: Optional[str] = None timestamp: str = Field(default_factory=lambda: datetime.now().isoformat()) tool: Optional[str] = None class RTIdeasAPIServer: """Servidor API REST para RTIdeas con soporte MongoDB""" def __init__(self): self.app = FastAPI( title="RTIdeas MCP API Server", description="API REST para el servidor MCP de RTIdeas con soporte MongoDB", version=MCP_SERVER_VERSION, docs_url="/docs", redoc_url="/redoc" ) # Validate configuration is_valid, errors = validate_config() if not is_valid: logger.error("❌ Configuration errors:") for error in errors: logger.error(f" - {error}") raise ValueError("Invalid MCP configuration") # Initialize MongoDB connector try: self.db_connector = get_mongodb_connector() logger.info("🚀 RTIdeas API Server Inicializado con MongoDB") logger.info(f"🗄️ Conectado a base de datos MongoDB") # Test connection if self.db_connector.test_connection(): collections_info = self.db_connector.get_collections_info() logger.info(f"📊 Colecciones disponibles: {collections_info.get('total_collections', 0)}") else: logger.warning("⚠️ No se pudo verificar la conexión a MongoDB") except Exception as e: logger.error(f"❌ Error inicializando conexión MongoDB: {e}") raise # Setup CORS self.app.add_middleware( CORSMiddleware, allow_origins=CORS_ORIGINS, allow_credentials=CORS_CREDENTIALS, allow_methods=["*"], allow_headers=["*"], ) # Register routes self._setup_routes() # Log configuration summary logger.info(f"⚙️ Configuración: {get_config_summary()}") def _setup_routes(self): """Configurar rutas de la API""" @self.app.get("/", response_model=APIResponse) async def root(): """Endpoint raíz con información del servidor""" return APIResponse( success=True, data={ "server": MCP_SERVER_NAME, "version": MCP_SERVER_VERSION, "status": "running", "mongodb_connected": self.db_connector.test_connection(), "endpoints": { "tools": "/tools", "resources": "/resources", "health": "/health", "docs": "/docs" } } ) @self.app.get("/health", response_model=APIResponse) async def health_check(): """Health check endpoint""" mongodb_status = self.db_connector.test_connection() return APIResponse( success=mongodb_status, data={ "status": "healthy" if mongodb_status else "unhealthy", "mongodb": "connected" if mongodb_status else "disconnected", "timestamp": datetime.now().isoformat() } ) @self.app.get("/tools", response_model=APIResponse) async def list_tools(): """Listar todas las herramientas disponibles""" tools = [ { "name": "list_sessions", "description": "Listar todas las sesiones activas con metadatos básicos", "endpoint": "/tools/list_sessions", "method": "POST" }, { "name": "get_session", "description": "Obtener información detallada de una sesión específica", "endpoint": "/tools/get_session", "method": "POST" }, { "name": "list_ideas", "description": "Listar ideas de una sesión específica con paginación", "endpoint": "/tools/list_ideas", "method": "POST" }, { "name": "search_ideas", "description": "Buscar ideas usando búsqueda de texto", "endpoint": "/tools/search_ideas", "method": "POST" }, { "name": "get_idea_connections", "description": "Obtener conexiones entre ideas en una sesión", "endpoint": "/tools/get_idea_connections", "method": "POST" }, { "name": "get_idea_clusters", "description": "Obtener clusters de ideas agrupadas por similitud", "endpoint": "/tools/get_idea_clusters", "method": "POST" }, { "name": "get_session_stats", "description": "Obtener estadísticas detalladas de una sesión", "endpoint": "/tools/get_session_stats", "method": "POST" }, { "name": "analyze_session", "description": "Realizar análisis completo de una sesión", "endpoint": "/tools/analyze_session", "method": "POST" }, { "name": "hello_rtideas", "description": "Herramienta de prueba para verificar que el MCP está funcionando", "endpoint": "/tools/hello_rtideas", "method": "POST" } ] return APIResponse( success=True, data={ "total_tools": len(tools), "tools": tools } ) @self.app.get("/resources", response_model=APIResponse) async def list_resources(): """Listar todos los recursos disponibles""" resources = [ { "uri": "test://sessions", "name": "Sesiones RTIdeas", "description": "Lista de todas las sesiones activas", "mimeType": "application/json" }, { "uri": "test://ideas/{sessionId}", "name": "Ideas por Sesión", "description": "Ideas de una sesión específica", "mimeType": "application/json" }, { "uri": "rtideas://connections/{sessionId}", "name": "Conexiones de Ideas", "description": "Conexiones entre ideas en una sesión", "mimeType": "application/json" }, { "uri": "test://clusters/{sessionId}", "name": "Clusters de Ideas", "description": "Clusters de ideas agrupadas por similitud", "mimeType": "application/json" }, { "uri": "test://stats/{sessionId}", "name": "Estadísticas de Sesión", "description": "Estadísticas detalladas de una sesión", "mimeType": "application/json" } ] return APIResponse( success=True, data={ "total_resources": len(resources), "resources": resources } ) # Tool endpoints @self.app.post("/tools/list_sessions", response_model=APIResponse) async def list_sessions(request: ListSessionsRequest): """Listar sesiones""" try: result = await self._list_sessions_tool(request.dict()) return APIResponse(success=True, data=result, tool="list_sessions") except Exception as e: logger.error(f"Error en list_sessions: {e}") return APIResponse(success=False, error=str(e), tool="list_sessions") @self.app.post("/tools/get_session", response_model=APIResponse) async def get_session(request: GetSessionRequest): """Obtener sesión específica""" try: result = await self._get_session_tool(request.dict()) return APIResponse(success=True, data=result, tool="get_session") except Exception as e: logger.error(f"Error en get_session: {e}") return APIResponse(success=False, error=str(e), tool="get_session") @self.app.post("/tools/list_ideas", response_model=APIResponse) async def list_ideas(request: ListIdeasRequest): """Listar ideas de una sesión""" try: result = await self._list_ideas_tool(request.dict()) return APIResponse(success=True, data=result, tool="list_ideas") except Exception as e: logger.error(f"Error en list_ideas: {e}") return APIResponse(success=False, error=str(e), tool="list_ideas") @self.app.post("/tools/search_ideas", response_model=APIResponse) async def search_ideas(request: SearchIdeasRequest): """Buscar ideas""" try: result = await self._search_ideas_tool(request.dict()) return APIResponse(success=True, data=result, tool="search_ideas") except Exception as e: logger.error(f"Error en search_ideas: {e}") return APIResponse(success=False, error=str(e), tool="search_ideas") @self.app.post("/tools/get_idea_connections", response_model=APIResponse) async def get_idea_connections(request: GetIdeaConnectionsRequest): """Obtener conexiones de ideas""" try: result = await self._get_idea_connections_tool(request.dict()) return APIResponse(success=True, data=result, tool="get_idea_connections") except Exception as e: logger.error(f"Error en get_idea_connections: {e}") return APIResponse(success=False, error=str(e), tool="get_idea_connections") @self.app.post("/tools/get_idea_clusters", response_model=APIResponse) async def get_idea_clusters(request: GetIdeaClustersRequest): """Obtener clusters de ideas""" try: result = await self._get_idea_clusters_tool(request.dict()) return APIResponse(success=True, data=result, tool="get_idea_clusters") except Exception as e: logger.error(f"Error en get_idea_clusters: {e}") return APIResponse(success=False, error=str(e), tool="get_idea_clusters") @self.app.post("/tools/get_session_stats", response_model=APIResponse) async def get_session_stats(request: GetSessionStatsRequest): """Obtener estadísticas de sesión""" try: result = await self._get_session_stats_tool(request.dict()) return APIResponse(success=True, data=result, tool="get_session_stats") except Exception as e: logger.error(f"Error en get_session_stats: {e}") return APIResponse(success=False, error=str(e), tool="get_session_stats") @self.app.post("/tools/analyze_session", response_model=APIResponse) async def analyze_session(request: AnalyzeSessionRequest): """Analizar sesión completa""" try: result = await self._analyze_session_tool(request.dict()) return APIResponse(success=True, data=result, tool="analyze_session") except Exception as e: logger.error(f"Error en analyze_session: {e}") return APIResponse(success=False, error=str(e), tool="analyze_session") @self.app.post("/tools/hello_rtideas", response_model=APIResponse) async def hello_rtideas(request: HelloRTIdeasRequest): """Herramienta de prueba""" try: result = await self._hello_rtideas_tool(request.dict()) return APIResponse(success=True, data=result, tool="hello_rtideas") except Exception as e: logger.error(f"Error en hello_rtideas: {e}") return APIResponse(success=False, error=str(e), tool="hello_rtideas") # Resource endpoints @self.app.get("/resources/sessions", response_model=APIResponse) async def read_sessions(): """Leer todas las sesiones""" try: result = await self._read_sessions() return APIResponse(success=True, data=json.loads(result)) except Exception as e: logger.error(f"Error leyendo sesiones: {e}") return APIResponse(success=False, error=str(e)) @self.app.get("/resources/ideas/{session_id}", response_model=APIResponse) async def read_ideas(session_id: str = Path(..., description="ID de la sesión")): """Leer ideas de una sesión""" try: result = await self._read_ideas(session_id) return APIResponse(success=True, data=json.loads(result)) except Exception as e: logger.error(f"Error leyendo ideas: {e}") return APIResponse(success=False, error=str(e)) @self.app.get("/resources/connections/{session_id}", response_model=APIResponse) async def read_connections(session_id: str = Path(..., description="ID de la sesión")): """Leer conexiones de una sesión""" try: result = await self._read_connections(session_id) return APIResponse(success=True, data=json.loads(result)) except Exception as e: logger.error(f"Error leyendo conexiones: {e}") return APIResponse(success=False, error=str(e)) @self.app.get("/resources/clusters/{session_id}", response_model=APIResponse) async def read_clusters(session_id: str = Path(..., description="ID de la sesión")): """Leer clusters de una sesión""" try: result = await self._read_clusters(session_id) return APIResponse(success=True, data=json.loads(result)) except Exception as e: logger.error(f"Error leyendo clusters: {e}") return APIResponse(success=False, error=str(e)) @self.app.get("/resources/stats/{session_id}", response_model=APIResponse) async def read_session_stats(session_id: str = Path(..., description="ID de la sesión")): """Leer estadísticas de una sesión""" try: result = await self._read_session_stats(session_id) return APIResponse(success=True, data=json.loads(result)) except Exception as e: logger.error(f"Error leyendo estadísticas: {e}") return APIResponse(success=False, error=str(e)) # Tool implementations (same as original MCP server) async def _list_sessions_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Herramienta: Listar sesiones""" limit = min(arguments.get('limit', 20), 100) admin_id = arguments.get('adminId') try: sessions = self.db_connector.fetch_all_sessions(limit=limit) # Filter by adminId if provided if admin_id: sessions = [s for s in sessions if s.get('adminId') == admin_id] return { "tool": "list_sessions", "timestamp": datetime.now().isoformat(), "total_sessions": len(sessions), "sessions": sessions } except Exception as e: logger.error(f"Error en list_sessions: {e}") return {"error": str(e)} async def _get_session_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Herramienta: Obtener sesión específica""" session_id = arguments.get('sessionId') if not session_id: return {"error": "sessionId is required"} try: session = self.db_connector.fetch_session_by_id(session_id) if not session: return {"error": f"Session {session_id} not found"} return { "tool": "get_session", "timestamp": datetime.now().isoformat(), "session": session } except Exception as e: logger.error(f"Error en get_session: {e}") return {"error": str(e)} async def _list_ideas_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Herramienta: Listar ideas de una sesión""" session_id = arguments.get('sessionId') limit = min(arguments.get('limit', 20), 100) if not session_id: return {"error": "sessionId is required"} try: ideas = self.db_connector.fetch_ideas_by_session(session_id, limit=limit) return { "tool": "list_ideas", "timestamp": datetime.now().isoformat(), "session_id": session_id, "total_ideas": len(ideas), "ideas": ideas } except Exception as e: logger.error(f"Error en list_ideas: {e}") return {"error": str(e)} async def _search_ideas_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Herramienta: Buscar ideas""" session_id = arguments.get('sessionId') query = arguments.get('query') limit = min(arguments.get('limit', DEFAULT_SEARCH_LIMIT), MAX_SEARCH_LIMIT) if not session_id or not query: return {"error": "sessionId and query are required"} try: ideas = self.db_connector.search_ideas(session_id, query, limit=limit) return { "tool": "search_ideas", "timestamp": datetime.now().isoformat(), "session_id": session_id, "query": query, "total_results": len(ideas), "ideas": ideas } except Exception as e: logger.error(f"Error en search_ideas: {e}") return {"error": str(e)} async def _get_idea_connections_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Herramienta: Obtener conexiones de ideas""" session_id = arguments.get('sessionId') limit = min(arguments.get('limit', 50), 100) if not session_id: return {"error": "sessionId is required"} try: connections = self.db_connector.fetch_idea_connections(session_id, limit=limit) return { "tool": "get_idea_connections", "timestamp": datetime.now().isoformat(), "session_id": session_id, "total_connections": len(connections), "connections": connections } except Exception as e: logger.error(f"Error en get_idea_connections: {e}") return {"error": str(e)} async def _get_idea_clusters_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Herramienta: Obtener clusters de ideas""" session_id = arguments.get('sessionId') if not session_id: return {"error": "sessionId is required"} try: clusters = self.db_connector.fetch_idea_clusters(session_id) return { "tool": "get_idea_clusters", "timestamp": datetime.now().isoformat(), "session_id": session_id, "total_clusters": len(clusters), "clusters": clusters } except Exception as e: logger.error(f"Error en get_idea_clusters: {e}") return {"error": str(e)} async def _get_session_stats_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Herramienta: Obtener estadísticas de sesión""" session_id = arguments.get('sessionId') if not session_id: return {"error": "sessionId is required"} try: stats = self.db_connector.get_session_stats(session_id) return { "tool": "get_session_stats", "timestamp": datetime.now().isoformat(), "stats": stats } except Exception as e: logger.error(f"Error en get_session_stats: {e}") return {"error": str(e)} async def _analyze_session_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Herramienta: Analizar sesión completa""" session_id = arguments.get('sessionId') include_clusters = arguments.get('include_clusters', True) include_connections = arguments.get('include_connections', True) if not session_id: return {"error": "sessionId is required"} try: # Get session info session = self.db_connector.fetch_session_by_id(session_id) if not session: return {"error": f"Session {session_id} not found"} # Get ideas ideas = self.db_connector.fetch_ideas_by_session(session_id, limit=100) # Get connections if requested connections = [] if include_connections: connections = self.db_connector.fetch_idea_connections(session_id, limit=100) # Get clusters if requested clusters = [] if include_clusters: clusters = self.db_connector.fetch_idea_clusters(session_id) # Get stats stats = self.db_connector.get_session_stats(session_id) # Generate analysis analysis = { "session_info": session, "ideas_analysis": { "total_ideas": len(ideas), "ideas_with_embeddings": len([i for i in ideas if i.get('embedding')]), "ideas_with_tags": len([i for i in ideas if i.get('tags')]) }, "connections_analysis": { "total_connections": len(connections), "average_similarity": sum(c.get('similarity', 0) for c in connections) / len(connections) if connections else 0 } if include_connections else None, "clusters_analysis": { "total_clusters": len(clusters), "clusters_with_ideas": len([c for c in clusters if c.get('ideaIds')]) } if include_clusters else None, "stats": stats } return { "tool": "analyze_session", "timestamp": datetime.now().isoformat(), "session_id": session_id, "analysis": analysis } except Exception as e: logger.error(f"Error en analyze_session: {e}") return {"error": str(e)} async def _hello_rtideas_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Herramienta: Saludo de prueba""" name = arguments.get('name', 'Usuario') return { "tool": "hello_rtideas", "message": f"¡Hola, {name}! 🎉", "timestamp": datetime.now().isoformat(), "server": "RTIdeas API Server", "version": MCP_SERVER_VERSION, "status": "working", "mongodb_connected": self.db_connector.test_connection() } # Resource implementations (same as original MCP server) async def _read_sessions(self) -> str: """Leer todas las sesiones""" try: sessions = self.db_connector.fetch_all_sessions(limit=50) return json.dumps({ "timestamp": datetime.now().isoformat(), "total_sessions": len(sessions), "sessions": sessions }, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"Error leyendo sesiones: {e}") return json.dumps({ "error": str(e), "sessions": [] }) async def _read_ideas(self, session_id: str) -> str: """Leer ideas de una sesión""" try: ideas = self.db_connector.fetch_ideas_by_session(session_id, limit=100) return json.dumps({ "timestamp": datetime.now().isoformat(), "session_id": session_id, "total_ideas": len(ideas), "ideas": ideas }, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"Error leyendo ideas: {e}") return json.dumps({ "error": str(e), "ideas": [] }) async def _read_connections(self, session_id: str) -> str: """Leer conexiones de una sesión""" try: connections = self.db_connector.fetch_idea_connections(session_id, limit=100) return json.dumps({ "timestamp": datetime.now().isoformat(), "session_id": session_id, "total_connections": len(connections), "connections": connections }, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"Error leyendo conexiones: {e}") return json.dumps({ "error": str(e), "connections": [] }) async def _read_clusters(self, session_id: str) -> str: """Leer clusters de una sesión""" try: clusters = self.db_connector.fetch_idea_clusters(session_id) return json.dumps({ "timestamp": datetime.now().isoformat(), "session_id": session_id, "total_clusters": len(clusters), "clusters": clusters }, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"Error leyendo clusters: {e}") return json.dumps({ "error": str(e), "clusters": [] }) async def _read_session_stats(self, session_id: str) -> str: """Leer estadísticas de una sesión""" try: stats = self.db_connector.get_session_stats(session_id) return json.dumps({ "timestamp": datetime.now().isoformat(), "stats": stats }, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"Error leyendo estadísticas: {e}") return json.dumps({ "error": str(e), "stats": {} }) def run(self, host: str = None, port: int = None, reload: bool = None): """Ejecutar el servidor API""" # Use config defaults if not provided host = host or API_HOST port = port or API_PORT reload = reload if reload is not None else API_RELOAD logger.info("🎯 Iniciando RTIdeas API Server") logger.info(f"📡 Servidor disponible en http://{host}:{port}") logger.info(f"📚 Documentación disponible en http://{host}:{port}/docs") if reload: # Para modo reload, usar import string en lugar del objeto app uvicorn.run( "rtideas_api_server:app", host=host, port=port, reload=True, log_level="info" ) else: # Para producción, usar el objeto app directamente uvicorn.run( self.app, host=host, port=port, reload=False, log_level="info" ) def main(): """Función principal""" import argparse parser = argparse.ArgumentParser(description="RTIdeas API Server") parser.add_argument("--host", default=API_HOST, help="Host to bind to") parser.add_argument("--port", type=int, default=API_PORT, help="Port to bind to") parser.add_argument("--reload", action="store_true", help="Enable auto-reload for development") args = parser.parse_args() server = RTIdeasAPIServer() server.run(host=args.host, port=args.port, reload=args.reload) # Crear instancia de la app para modo reload def create_app(): """Crear instancia de la aplicación FastAPI""" server = RTIdeasAPIServer() return server.app # Variable app para uvicorn reload app = create_app()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/automationagent10-max/MCP1'

If you have feedback or need assistance with the MCP directory API, please join our Discord server