from fastmcp import FastMCP
import httpx
import os
from typing import Dict, Any
import json
# Inicializar servidor MCP
mcp = FastMCP("MelviChat-Universal")
BASE_URL = "http://127.0.0.1:8000" # URL de tu backend
TOKEN_FILE = ".mcp_token"
# Variable global para el tenant actual
CURRENT_TENANT = None
def save_token(tenant_id: str, token: str):
"""Guardar token de autenticación"""
with open(TOKEN_FILE, "w") as f:
json.dump({"tenant_id": tenant_id, "token": token}, f)
def load_token() -> Dict[str, str]:
"""Cargar token guardado"""
if not os.path.exists(TOKEN_FILE):
return None
with open(TOKEN_FILE, "r") as f:
return json.load(f)
@mcp.tool()
async def login_tenant(email: str, password: str) -> Dict[str, Any]:
"""
Iniciar sesión como empresa/tenant
Ejemplo:
"Logueame con admin@empresa.com y micontraseña"
"""
global CURRENT_TENANT
try:
async with httpx.AsyncClient() as client:
# Llamar al endpoint de login de tu backend
response = await client.post(
f"{BASE_URL}/tenants/login",
json={"email": email, "password": password}
)
if response.status_code == 200:
data = response.json()
CURRENT_TENANT = {
"id": data["tenant"]["id"],
"name": data["tenant"]["business_name"],
"email": email
}
save_token(CURRENT_TENANT["id"], data["access_token"])
return {
"status": "success",
"message": f"Bienvenido a {CURRENT_TENANT['name']}",
"tenant": CURRENT_TENANT
}
else:
return {"error": "Credenciales inválidas o cuenta inactiva"}
except Exception as e:
return {"error": f"Error de conexión: {str(e)}"}
@mcp.tool()
async def query_ai_agent(prompt: str) -> Dict[str, Any]:
"""
Consultar al Agente IA del sistema (Gemini) sobre la base de datos principal.
Ejemplos:
"¿Cuántos usuarios hay?"
"Muéstrame las ventas de la última semana"
"""
token_data = load_token()
headers = {}
if token_data:
headers = {
"Authorization": f"Bearer {token_data['token']}",
"X-Tenant-ID": str(token_data['tenant_id'])
}
try:
async with httpx.AsyncClient() as client:
# Enviar la consulta al backend
response = await client.post(
f"{BASE_URL}/ai/query",
headers=headers,
json={"prompt": prompt},
timeout=60.0
)
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
return {"error": "Sesión expirada. Por favor inicia sesión nuevamente"}
else:
return {"error": f"Error en la consulta IA: {response.text}"}
except Exception as e:
return {"error": f"Error de conexión: {str(e)}"}
@mcp.tool()
async def list_registered_databases() -> Dict[str, Any]:
"""
Listar todas las bases de datos configuradas en la aplicación.
Retorna ID, nombre y tipo de cada conexión.
"""
token_data = load_token()
headers = {}
if token_data:
headers = {
"Authorization": f"Bearer {token_data['token']}",
"X-Tenant-ID": str(token_data['tenant_id'])
}
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{BASE_URL}/connections/",
headers=headers
)
if response.status_code == 200:
connections = response.json()
return {
"count": len(connections),
"databases": [
{
"id": conn["id"],
"name": conn["name"],
"type": conn["db_type"],
"host": f"{conn['server']}:{conn['port']}" if conn['port'] else conn['server'],
"database": conn["database"]
}
for conn in connections
]
}
else:
return {"error": f"Error al listar bases de datos: {response.status_code} {response.text}"}
except Exception as e:
return {"error": f"Error de conexión: {str(e)}"}
@mcp.tool()
async def query_registered_database(connection_id: int, sql_query: str) -> Dict[str, Any]:
"""
Ejecutar una consulta SQL directa en una base de datos registrada.
Args:
connection_id: El ID de la base de datos (obtener de list_registered_databases)
sql_query: La consulta SQL a ejecutar (ej: "SELECT * FROM Products")
"""
token_data = load_token()
headers = {}
if token_data:
headers = {
"Authorization": f"Bearer {token_data['token']}",
"X-Tenant-ID": str(token_data['tenant_id'])
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{BASE_URL}/connections/{connection_id}/query",
headers=headers,
json={"query": sql_query},
timeout=30.0
)
if response.status_code == 200:
data = response.json()
if isinstance(data, list):
return {
"status": "success",
"rows": len(data),
"data": data
}
return data
else:
return {"error": f"Error en la consulta: {response.text}"}
except Exception as e:
return {"error": f"Error de conexión: {str(e)}"}
@mcp.tool()
async def get_registered_database_schema(connection_id: int) -> Dict[str, Any]:
"""
Obtener el esquema completo de una base de datos registrada.
"""
token_data = load_token()
headers = {}
if token_data:
headers = {
"Authorization": f"Bearer {token_data['token']}",
"X-Tenant-ID": str(token_data['tenant_id'])
}
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{BASE_URL}/connections/{connection_id}/schema",
headers=headers,
timeout=60.0
)
if response.status_code == 200:
return response.json()
else:
return {"error": f"Error al obtener esquema: {response.text}"}
except Exception as e:
return {"error": f"Error de conexión: {str(e)}"}
if __name__ == "__main__":
# Los print() causan "trailing data" en MCP stdio - comentados
# print("Iniciando MCP Universal para MelviChat...")
# print(f"Conectado a backend: {BASE_URL}")
# print("Herramientas disponibles: list_registered_databases, query_registered_database, get_registered_database_schema")
mcp.run()