import asyncio
import os
import httpx
from dotenv import load_dotenv
from langchain_mcp_adapters.client import MultiServerMCPClient
load_dotenv()
MCP_URL = os.getenv("MCP_URL", "https://vocal-blue-hawk.fastmcp.app/mcp")
MCP_TOKEN = os.getenv("MCP_TOKEN", "NGgba3XYHcwzQpKv9wcC_yor6pfaYSDaBfvxvwxP51k")
async def test_full():
print("=" * 60)
print("PRUEBA COMPLETA DE CONEXION")
print("=" * 60)
print(f"URL: {MCP_URL}")
print(f"Token: {MCP_TOKEN}")
print()
# Prueba 1: HTTP directo
print("1. Prueba HTTP directa...")
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
MCP_URL,
headers={
"Authorization": f"Bearer {MCP_TOKEN}",
"Content-Type": "application/json"
},
json={"method": "initialize", "params": {}}
)
print(f" Status: {response.status_code}")
print(f" Response: {response.text[:300]}")
except Exception as e:
print(f" Error: {e}")
print()
print("2. Prueba con MultiServerMCPClient...")
try:
client = MultiServerMCPClient(
{
"fgj-server": {
"transport": "streamable_http",
"url": MCP_URL,
"headers": {
"Authorization": f"Bearer {MCP_TOKEN}"
}
}
}
)
tools = await client.get_tools()
print(f" SUCCESS! Herramientas encontradas: {len(tools)}")
for tool in tools:
print(f" - {tool.name}")
# Probar el prompt
context = await client.get_prompt(
server_name="fgj-server",
prompt_name="email_context"
)
print(f" Prompt obtenido: {len(context)} mensajes")
print()
print("=" * 60)
print("CONEXION EXITOSA!")
print("=" * 60)
return True
except Exception as e:
print(f" ERROR: {type(e).__name__}")
print(f" Detalle: {str(e)[:500]}")
print()
print("=" * 60)
print("DIAGNOSTICO:")
print("=" * 60)
print("1. Verifica en FastMCP Cloud que MCP_TOKEN este configurado")
print("2. Verifica que el valor sea exactamente: NGgba3XYHcwzQpKv9wcC_yor6pfaYSDaBfvxvwxP5lk")
print("3. Revisa los logs del servidor en FastMCP Cloud")
print("4. Espera 2-3 minutos despues de agregar las variables")
return False
if __name__ == "__main__":
asyncio.run(test_full())