import asyncio
import os
import httpx
from dotenv import load_dotenv
from langchain_mcp_adapters.client import MultiServerMCPClient
load_dotenv()
MCP_URL = os.getenv("MCP_URL", "https://vocal-blue-hawk.fastmcp.app/mcp")
MCP_TOKEN = os.getenv("MCP_TOKEN", "NGgba3XYHcwzQpKv9wcC_yor6pfaYSDaBfvxvwxP51k")
async def test_full():
print("=" * 60)
print("PRUEBA COMPLETA DE CONEXION")
print("=" * 60)
print(f"URL: {MCP_URL}")
print()
is_fastmcp_cloud = "fastmcp.app" in MCP_URL
# Prueba 1: HTTP directo
print("1. Prueba HTTP directa...")
try:
headers = {"Content-Type": "application/json"}
# Solo agregar auth si es localhost
if not is_fastmcp_cloud:
headers["Authorization"] = f"Bearer {MCP_TOKEN}"
print(f" Usando autenticación Bearer (modo local)")
else:
print(f" Sin autenticación (FastMCP Cloud maneja esto)")
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
MCP_URL,
headers=headers,
json={"method": "initialize", "params": {}}
)
print(f" Status: {response.status_code}")
print(f" Response: {response.text[:300]}")
except Exception as e:
print(f" Error: {e}")
print()
print("2. Prueba con MultiServerMCPClient...")
try:
# Configurar cliente según el tipo de conexión
config = {
"fgj-server": {
"transport": "streamable_http",
"url": MCP_URL,
}
}
# Solo agregar headers de auth para localhost
if not is_fastmcp_cloud:
config["fgj-server"]["headers"] = {
"Authorization": f"Bearer {MCP_TOKEN}"
}
client = MultiServerMCPClient(config)
tools = await client.get_tools()
print(f" SUCCESS! Herramientas encontradas: {len(tools)}")
for tool in tools:
print(f" - {tool.name}")
# Probar el prompt
context = await client.get_prompt(
server_name="fgj-server",
prompt_name="email_context"
)
print(f" Prompt obtenido: {len(context)} mensajes")
print()
print("=" * 60)
print("CONEXION EXITOSA!")
print("=" * 60)
return True
except Exception as e:
print(f" ERROR: {type(e).__name__}")
print(f" Detalle: {str(e)[:500]}")
print()
print("=" * 60)
print("DIAGNOSTICO:")
print("=" * 60)
if is_fastmcp_cloud:
print("Conectando a FastMCP Cloud:")
print("1. Verifica que el servidor este desplegado y running")
print("2. Verifica en FastMCP Cloud que EMAIL_USER y EMAIL_PASS esten configurados")
print("3. Revisa los logs del servidor en FastMCP Cloud dashboard")
print("4. Espera 2-3 minutos despues de desplegar cambios")
print("5. La URL debe ser: https://tu-proyecto.fastmcp.app/mcp")
else:
print("Conectando a servidor local:")
print("1. Verifica que el servidor este corriendo (python server.py)")
print("2. Verifica que MCP_TOKEN este configurado en .env")
print("3. La URL debe ser: http://localhost:8000/api")
return False
if __name__ == "__main__":
asyncio.run(test_full())