rtideas_api_server.py•32.9 kB
#!/usr/bin/env python3
"""
RTIdeas API Server - Remote Model Context Protocol Implementation
Servidor API REST para RTIdeas con soporte MongoDB
"""
import asyncio
import json
import logging
import os
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from fastapi import FastAPI, HTTPException, Query, Path, Body
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
import uvicorn
# Import MCP configuration and MongoDB connector
from config import (
MCP_SERVER_NAME, MCP_SERVER_VERSION, validate_config, get_config_summary,
DEFAULT_SEARCH_LIMIT, MAX_SEARCH_LIMIT, DEFAULT_CLUSTER_METHOD,
DEFAULT_CLUSTER_K, DEFAULT_SIMILARITY_THRESHOLD, API_HOST, API_PORT, API_RELOAD,
CORS_ORIGINS, CORS_CREDENTIALS
)
from mongodb_connector import get_mongodb_connector, MongoDBConnector
# Configurar logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Pydantic models for API requests/responses
class ToolRequest(BaseModel):
"""Base model for tool requests"""
arguments: Dict[str, Any] = Field(default_factory=dict)
class ListSessionsRequest(BaseModel):
"""Request model for list_sessions tool"""
limit: int = Field(default=20, ge=1, le=100)
adminId: Optional[str] = None
class GetSessionRequest(BaseModel):
"""Request model for get_session tool"""
sessionId: str
class ListIdeasRequest(BaseModel):
"""Request model for list_ideas tool"""
sessionId: str
limit: int = Field(default=20, ge=1, le=100)
class SearchIdeasRequest(BaseModel):
"""Request model for search_ideas tool"""
sessionId: str
query: str
limit: int = Field(default=10, ge=1, le=50)
class GetIdeaConnectionsRequest(BaseModel):
"""Request model for get_idea_connections tool"""
sessionId: str
limit: int = Field(default=50, ge=1, le=100)
class GetIdeaClustersRequest(BaseModel):
"""Request model for get_idea_clusters tool"""
sessionId: str
class GetSessionStatsRequest(BaseModel):
"""Request model for get_session_stats tool"""
sessionId: str
class AnalyzeSessionRequest(BaseModel):
"""Request model for analyze_session tool"""
sessionId: str
include_clusters: bool = True
include_connections: bool = True
class HelloRTIdeasRequest(BaseModel):
"""Request model for hello_rtideas tool"""
name: str = "Usuario"
class APIResponse(BaseModel):
"""Standard API response model"""
success: bool
data: Any = None
error: Optional[str] = None
timestamp: str = Field(default_factory=lambda: datetime.now().isoformat())
tool: Optional[str] = None
class RTIdeasAPIServer:
"""Servidor API REST para RTIdeas con soporte MongoDB"""
def __init__(self):
self.app = FastAPI(
title="RTIdeas MCP API Server",
description="API REST para el servidor MCP de RTIdeas con soporte MongoDB",
version=MCP_SERVER_VERSION,
docs_url="/docs",
redoc_url="/redoc"
)
# Validate configuration
is_valid, errors = validate_config()
if not is_valid:
logger.error("❌ Configuration errors:")
for error in errors:
logger.error(f" - {error}")
raise ValueError("Invalid MCP configuration")
# Initialize MongoDB connector
try:
self.db_connector = get_mongodb_connector()
logger.info("🚀 RTIdeas API Server Inicializado con MongoDB")
logger.info(f"🗄️ Conectado a base de datos MongoDB")
# Test connection
if self.db_connector.test_connection():
collections_info = self.db_connector.get_collections_info()
logger.info(f"📊 Colecciones disponibles: {collections_info.get('total_collections', 0)}")
else:
logger.warning("⚠️ No se pudo verificar la conexión a MongoDB")
except Exception as e:
logger.error(f"❌ Error inicializando conexión MongoDB: {e}")
raise
# Setup CORS
self.app.add_middleware(
CORSMiddleware,
allow_origins=CORS_ORIGINS,
allow_credentials=CORS_CREDENTIALS,
allow_methods=["*"],
allow_headers=["*"],
)
# Register routes
self._setup_routes()
# Log configuration summary
logger.info(f"⚙️ Configuración: {get_config_summary()}")
def _setup_routes(self):
"""Configurar rutas de la API"""
@self.app.get("/", response_model=APIResponse)
async def root():
"""Endpoint raíz con información del servidor"""
return APIResponse(
success=True,
data={
"server": MCP_SERVER_NAME,
"version": MCP_SERVER_VERSION,
"status": "running",
"mongodb_connected": self.db_connector.test_connection(),
"endpoints": {
"tools": "/tools",
"resources": "/resources",
"health": "/health",
"docs": "/docs"
}
}
)
@self.app.get("/health", response_model=APIResponse)
async def health_check():
"""Health check endpoint"""
mongodb_status = self.db_connector.test_connection()
return APIResponse(
success=mongodb_status,
data={
"status": "healthy" if mongodb_status else "unhealthy",
"mongodb": "connected" if mongodb_status else "disconnected",
"timestamp": datetime.now().isoformat()
}
)
@self.app.get("/tools", response_model=APIResponse)
async def list_tools():
"""Listar todas las herramientas disponibles"""
tools = [
{
"name": "list_sessions",
"description": "Listar todas las sesiones activas con metadatos básicos",
"endpoint": "/tools/list_sessions",
"method": "POST"
},
{
"name": "get_session",
"description": "Obtener información detallada de una sesión específica",
"endpoint": "/tools/get_session",
"method": "POST"
},
{
"name": "list_ideas",
"description": "Listar ideas de una sesión específica con paginación",
"endpoint": "/tools/list_ideas",
"method": "POST"
},
{
"name": "search_ideas",
"description": "Buscar ideas usando búsqueda de texto",
"endpoint": "/tools/search_ideas",
"method": "POST"
},
{
"name": "get_idea_connections",
"description": "Obtener conexiones entre ideas en una sesión",
"endpoint": "/tools/get_idea_connections",
"method": "POST"
},
{
"name": "get_idea_clusters",
"description": "Obtener clusters de ideas agrupadas por similitud",
"endpoint": "/tools/get_idea_clusters",
"method": "POST"
},
{
"name": "get_session_stats",
"description": "Obtener estadísticas detalladas de una sesión",
"endpoint": "/tools/get_session_stats",
"method": "POST"
},
{
"name": "analyze_session",
"description": "Realizar análisis completo de una sesión",
"endpoint": "/tools/analyze_session",
"method": "POST"
},
{
"name": "hello_rtideas",
"description": "Herramienta de prueba para verificar que el MCP está funcionando",
"endpoint": "/tools/hello_rtideas",
"method": "POST"
}
]
return APIResponse(
success=True,
data={
"total_tools": len(tools),
"tools": tools
}
)
@self.app.get("/resources", response_model=APIResponse)
async def list_resources():
"""Listar todos los recursos disponibles"""
resources = [
{
"uri": "test://sessions",
"name": "Sesiones RTIdeas",
"description": "Lista de todas las sesiones activas",
"mimeType": "application/json"
},
{
"uri": "test://ideas/{sessionId}",
"name": "Ideas por Sesión",
"description": "Ideas de una sesión específica",
"mimeType": "application/json"
},
{
"uri": "rtideas://connections/{sessionId}",
"name": "Conexiones de Ideas",
"description": "Conexiones entre ideas en una sesión",
"mimeType": "application/json"
},
{
"uri": "test://clusters/{sessionId}",
"name": "Clusters de Ideas",
"description": "Clusters de ideas agrupadas por similitud",
"mimeType": "application/json"
},
{
"uri": "test://stats/{sessionId}",
"name": "Estadísticas de Sesión",
"description": "Estadísticas detalladas de una sesión",
"mimeType": "application/json"
}
]
return APIResponse(
success=True,
data={
"total_resources": len(resources),
"resources": resources
}
)
# Tool endpoints
@self.app.post("/tools/list_sessions", response_model=APIResponse)
async def list_sessions(request: ListSessionsRequest):
"""Listar sesiones"""
try:
result = await self._list_sessions_tool(request.dict())
return APIResponse(success=True, data=result, tool="list_sessions")
except Exception as e:
logger.error(f"Error en list_sessions: {e}")
return APIResponse(success=False, error=str(e), tool="list_sessions")
@self.app.post("/tools/get_session", response_model=APIResponse)
async def get_session(request: GetSessionRequest):
"""Obtener sesión específica"""
try:
result = await self._get_session_tool(request.dict())
return APIResponse(success=True, data=result, tool="get_session")
except Exception as e:
logger.error(f"Error en get_session: {e}")
return APIResponse(success=False, error=str(e), tool="get_session")
@self.app.post("/tools/list_ideas", response_model=APIResponse)
async def list_ideas(request: ListIdeasRequest):
"""Listar ideas de una sesión"""
try:
result = await self._list_ideas_tool(request.dict())
return APIResponse(success=True, data=result, tool="list_ideas")
except Exception as e:
logger.error(f"Error en list_ideas: {e}")
return APIResponse(success=False, error=str(e), tool="list_ideas")
@self.app.post("/tools/search_ideas", response_model=APIResponse)
async def search_ideas(request: SearchIdeasRequest):
"""Buscar ideas"""
try:
result = await self._search_ideas_tool(request.dict())
return APIResponse(success=True, data=result, tool="search_ideas")
except Exception as e:
logger.error(f"Error en search_ideas: {e}")
return APIResponse(success=False, error=str(e), tool="search_ideas")
@self.app.post("/tools/get_idea_connections", response_model=APIResponse)
async def get_idea_connections(request: GetIdeaConnectionsRequest):
"""Obtener conexiones de ideas"""
try:
result = await self._get_idea_connections_tool(request.dict())
return APIResponse(success=True, data=result, tool="get_idea_connections")
except Exception as e:
logger.error(f"Error en get_idea_connections: {e}")
return APIResponse(success=False, error=str(e), tool="get_idea_connections")
@self.app.post("/tools/get_idea_clusters", response_model=APIResponse)
async def get_idea_clusters(request: GetIdeaClustersRequest):
"""Obtener clusters de ideas"""
try:
result = await self._get_idea_clusters_tool(request.dict())
return APIResponse(success=True, data=result, tool="get_idea_clusters")
except Exception as e:
logger.error(f"Error en get_idea_clusters: {e}")
return APIResponse(success=False, error=str(e), tool="get_idea_clusters")
@self.app.post("/tools/get_session_stats", response_model=APIResponse)
async def get_session_stats(request: GetSessionStatsRequest):
"""Obtener estadísticas de sesión"""
try:
result = await self._get_session_stats_tool(request.dict())
return APIResponse(success=True, data=result, tool="get_session_stats")
except Exception as e:
logger.error(f"Error en get_session_stats: {e}")
return APIResponse(success=False, error=str(e), tool="get_session_stats")
@self.app.post("/tools/analyze_session", response_model=APIResponse)
async def analyze_session(request: AnalyzeSessionRequest):
"""Analizar sesión completa"""
try:
result = await self._analyze_session_tool(request.dict())
return APIResponse(success=True, data=result, tool="analyze_session")
except Exception as e:
logger.error(f"Error en analyze_session: {e}")
return APIResponse(success=False, error=str(e), tool="analyze_session")
@self.app.post("/tools/hello_rtideas", response_model=APIResponse)
async def hello_rtideas(request: HelloRTIdeasRequest):
"""Herramienta de prueba"""
try:
result = await self._hello_rtideas_tool(request.dict())
return APIResponse(success=True, data=result, tool="hello_rtideas")
except Exception as e:
logger.error(f"Error en hello_rtideas: {e}")
return APIResponse(success=False, error=str(e), tool="hello_rtideas")
# Resource endpoints
@self.app.get("/resources/sessions", response_model=APIResponse)
async def read_sessions():
"""Leer todas las sesiones"""
try:
result = await self._read_sessions()
return APIResponse(success=True, data=json.loads(result))
except Exception as e:
logger.error(f"Error leyendo sesiones: {e}")
return APIResponse(success=False, error=str(e))
@self.app.get("/resources/ideas/{session_id}", response_model=APIResponse)
async def read_ideas(session_id: str = Path(..., description="ID de la sesión")):
"""Leer ideas de una sesión"""
try:
result = await self._read_ideas(session_id)
return APIResponse(success=True, data=json.loads(result))
except Exception as e:
logger.error(f"Error leyendo ideas: {e}")
return APIResponse(success=False, error=str(e))
@self.app.get("/resources/connections/{session_id}", response_model=APIResponse)
async def read_connections(session_id: str = Path(..., description="ID de la sesión")):
"""Leer conexiones de una sesión"""
try:
result = await self._read_connections(session_id)
return APIResponse(success=True, data=json.loads(result))
except Exception as e:
logger.error(f"Error leyendo conexiones: {e}")
return APIResponse(success=False, error=str(e))
@self.app.get("/resources/clusters/{session_id}", response_model=APIResponse)
async def read_clusters(session_id: str = Path(..., description="ID de la sesión")):
"""Leer clusters de una sesión"""
try:
result = await self._read_clusters(session_id)
return APIResponse(success=True, data=json.loads(result))
except Exception as e:
logger.error(f"Error leyendo clusters: {e}")
return APIResponse(success=False, error=str(e))
@self.app.get("/resources/stats/{session_id}", response_model=APIResponse)
async def read_session_stats(session_id: str = Path(..., description="ID de la sesión")):
"""Leer estadísticas de una sesión"""
try:
result = await self._read_session_stats(session_id)
return APIResponse(success=True, data=json.loads(result))
except Exception as e:
logger.error(f"Error leyendo estadísticas: {e}")
return APIResponse(success=False, error=str(e))
# Tool implementations (same as original MCP server)
async def _list_sessions_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Herramienta: Listar sesiones"""
limit = min(arguments.get('limit', 20), 100)
admin_id = arguments.get('adminId')
try:
sessions = self.db_connector.fetch_all_sessions(limit=limit)
# Filter by adminId if provided
if admin_id:
sessions = [s for s in sessions if s.get('adminId') == admin_id]
return {
"tool": "list_sessions",
"timestamp": datetime.now().isoformat(),
"total_sessions": len(sessions),
"sessions": sessions
}
except Exception as e:
logger.error(f"Error en list_sessions: {e}")
return {"error": str(e)}
async def _get_session_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Herramienta: Obtener sesión específica"""
session_id = arguments.get('sessionId')
if not session_id:
return {"error": "sessionId is required"}
try:
session = self.db_connector.fetch_session_by_id(session_id)
if not session:
return {"error": f"Session {session_id} not found"}
return {
"tool": "get_session",
"timestamp": datetime.now().isoformat(),
"session": session
}
except Exception as e:
logger.error(f"Error en get_session: {e}")
return {"error": str(e)}
async def _list_ideas_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Herramienta: Listar ideas de una sesión"""
session_id = arguments.get('sessionId')
limit = min(arguments.get('limit', 20), 100)
if not session_id:
return {"error": "sessionId is required"}
try:
ideas = self.db_connector.fetch_ideas_by_session(session_id, limit=limit)
return {
"tool": "list_ideas",
"timestamp": datetime.now().isoformat(),
"session_id": session_id,
"total_ideas": len(ideas),
"ideas": ideas
}
except Exception as e:
logger.error(f"Error en list_ideas: {e}")
return {"error": str(e)}
async def _search_ideas_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Herramienta: Buscar ideas"""
session_id = arguments.get('sessionId')
query = arguments.get('query')
limit = min(arguments.get('limit', DEFAULT_SEARCH_LIMIT), MAX_SEARCH_LIMIT)
if not session_id or not query:
return {"error": "sessionId and query are required"}
try:
ideas = self.db_connector.search_ideas(session_id, query, limit=limit)
return {
"tool": "search_ideas",
"timestamp": datetime.now().isoformat(),
"session_id": session_id,
"query": query,
"total_results": len(ideas),
"ideas": ideas
}
except Exception as e:
logger.error(f"Error en search_ideas: {e}")
return {"error": str(e)}
async def _get_idea_connections_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Herramienta: Obtener conexiones de ideas"""
session_id = arguments.get('sessionId')
limit = min(arguments.get('limit', 50), 100)
if not session_id:
return {"error": "sessionId is required"}
try:
connections = self.db_connector.fetch_idea_connections(session_id, limit=limit)
return {
"tool": "get_idea_connections",
"timestamp": datetime.now().isoformat(),
"session_id": session_id,
"total_connections": len(connections),
"connections": connections
}
except Exception as e:
logger.error(f"Error en get_idea_connections: {e}")
return {"error": str(e)}
async def _get_idea_clusters_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Herramienta: Obtener clusters de ideas"""
session_id = arguments.get('sessionId')
if not session_id:
return {"error": "sessionId is required"}
try:
clusters = self.db_connector.fetch_idea_clusters(session_id)
return {
"tool": "get_idea_clusters",
"timestamp": datetime.now().isoformat(),
"session_id": session_id,
"total_clusters": len(clusters),
"clusters": clusters
}
except Exception as e:
logger.error(f"Error en get_idea_clusters: {e}")
return {"error": str(e)}
async def _get_session_stats_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Herramienta: Obtener estadísticas de sesión"""
session_id = arguments.get('sessionId')
if not session_id:
return {"error": "sessionId is required"}
try:
stats = self.db_connector.get_session_stats(session_id)
return {
"tool": "get_session_stats",
"timestamp": datetime.now().isoformat(),
"stats": stats
}
except Exception as e:
logger.error(f"Error en get_session_stats: {e}")
return {"error": str(e)}
async def _analyze_session_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Herramienta: Analizar sesión completa"""
session_id = arguments.get('sessionId')
include_clusters = arguments.get('include_clusters', True)
include_connections = arguments.get('include_connections', True)
if not session_id:
return {"error": "sessionId is required"}
try:
# Get session info
session = self.db_connector.fetch_session_by_id(session_id)
if not session:
return {"error": f"Session {session_id} not found"}
# Get ideas
ideas = self.db_connector.fetch_ideas_by_session(session_id, limit=100)
# Get connections if requested
connections = []
if include_connections:
connections = self.db_connector.fetch_idea_connections(session_id, limit=100)
# Get clusters if requested
clusters = []
if include_clusters:
clusters = self.db_connector.fetch_idea_clusters(session_id)
# Get stats
stats = self.db_connector.get_session_stats(session_id)
# Generate analysis
analysis = {
"session_info": session,
"ideas_analysis": {
"total_ideas": len(ideas),
"ideas_with_embeddings": len([i for i in ideas if i.get('embedding')]),
"ideas_with_tags": len([i for i in ideas if i.get('tags')])
},
"connections_analysis": {
"total_connections": len(connections),
"average_similarity": sum(c.get('similarity', 0) for c in connections) / len(connections) if connections else 0
} if include_connections else None,
"clusters_analysis": {
"total_clusters": len(clusters),
"clusters_with_ideas": len([c for c in clusters if c.get('ideaIds')])
} if include_clusters else None,
"stats": stats
}
return {
"tool": "analyze_session",
"timestamp": datetime.now().isoformat(),
"session_id": session_id,
"analysis": analysis
}
except Exception as e:
logger.error(f"Error en analyze_session: {e}")
return {"error": str(e)}
async def _hello_rtideas_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Herramienta: Saludo de prueba"""
name = arguments.get('name', 'Usuario')
return {
"tool": "hello_rtideas",
"message": f"¡Hola, {name}! 🎉",
"timestamp": datetime.now().isoformat(),
"server": "RTIdeas API Server",
"version": MCP_SERVER_VERSION,
"status": "working",
"mongodb_connected": self.db_connector.test_connection()
}
# Resource implementations (same as original MCP server)
async def _read_sessions(self) -> str:
"""Leer todas las sesiones"""
try:
sessions = self.db_connector.fetch_all_sessions(limit=50)
return json.dumps({
"timestamp": datetime.now().isoformat(),
"total_sessions": len(sessions),
"sessions": sessions
}, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Error leyendo sesiones: {e}")
return json.dumps({
"error": str(e),
"sessions": []
})
async def _read_ideas(self, session_id: str) -> str:
"""Leer ideas de una sesión"""
try:
ideas = self.db_connector.fetch_ideas_by_session(session_id, limit=100)
return json.dumps({
"timestamp": datetime.now().isoformat(),
"session_id": session_id,
"total_ideas": len(ideas),
"ideas": ideas
}, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Error leyendo ideas: {e}")
return json.dumps({
"error": str(e),
"ideas": []
})
async def _read_connections(self, session_id: str) -> str:
"""Leer conexiones de una sesión"""
try:
connections = self.db_connector.fetch_idea_connections(session_id, limit=100)
return json.dumps({
"timestamp": datetime.now().isoformat(),
"session_id": session_id,
"total_connections": len(connections),
"connections": connections
}, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Error leyendo conexiones: {e}")
return json.dumps({
"error": str(e),
"connections": []
})
async def _read_clusters(self, session_id: str) -> str:
"""Leer clusters de una sesión"""
try:
clusters = self.db_connector.fetch_idea_clusters(session_id)
return json.dumps({
"timestamp": datetime.now().isoformat(),
"session_id": session_id,
"total_clusters": len(clusters),
"clusters": clusters
}, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Error leyendo clusters: {e}")
return json.dumps({
"error": str(e),
"clusters": []
})
async def _read_session_stats(self, session_id: str) -> str:
"""Leer estadísticas de una sesión"""
try:
stats = self.db_connector.get_session_stats(session_id)
return json.dumps({
"timestamp": datetime.now().isoformat(),
"stats": stats
}, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Error leyendo estadísticas: {e}")
return json.dumps({
"error": str(e),
"stats": {}
})
def run(self, host: str = None, port: int = None, reload: bool = None):
"""Ejecutar el servidor API"""
# Use config defaults if not provided
host = host or API_HOST
port = port or API_PORT
reload = reload if reload is not None else API_RELOAD
logger.info("🎯 Iniciando RTIdeas API Server")
logger.info(f"📡 Servidor disponible en http://{host}:{port}")
logger.info(f"📚 Documentación disponible en http://{host}:{port}/docs")
if reload:
# Para modo reload, usar import string en lugar del objeto app
uvicorn.run(
"rtideas_api_server:app",
host=host,
port=port,
reload=True,
log_level="info"
)
else:
# Para producción, usar el objeto app directamente
uvicorn.run(
self.app,
host=host,
port=port,
reload=False,
log_level="info"
)
def main():
"""Función principal"""
import argparse
parser = argparse.ArgumentParser(description="RTIdeas API Server")
parser.add_argument("--host", default=API_HOST, help="Host to bind to")
parser.add_argument("--port", type=int, default=API_PORT, help="Port to bind to")
parser.add_argument("--reload", action="store_true", help="Enable auto-reload for development")
args = parser.parse_args()
server = RTIdeasAPIServer()
server.run(host=args.host, port=args.port, reload=args.reload)
# Crear instancia de la app para modo reload
def create_app():
"""Crear instancia de la aplicación FastAPI"""
server = RTIdeasAPIServer()
return server.app
# Variable app para uvicorn reload
app = create_app()