Skip to main content
Glama

MCP Firebird

#!/usr/bin/env python3 # test-sse.py # Script simple para probar la conexión SSE con el servidor MCP Firebird import requests import json import time import threading # Configuración SERVER_URL = "http://localhost:3003" SESSION_ID = None # Función para enviar una solicitud al servidor def send_request(method, params=None, session_id=None): if params is None: params = {} if session_id is None: print("No hay ID de sesión disponible") return None request = { "jsonrpc": "2.0", "id": str(int(time.time())), "method": method, "params": params } print(f"Enviando solicitud: {json.dumps(request, indent=2)}") try: response = requests.post( f"{SERVER_URL}/message?sessionId={session_id}", headers={"Content-Type": "application/json"}, json=request ) print(f"Código de estado: {response.status_code}") if response.status_code != 200: print(f"Error: {response.text}") return None try: return response.json() except json.JSONDecodeError: print(f"Error al decodificar JSON: {response.text}") return None except Exception as e: print(f"Error al enviar la solicitud: {e}") return None # Función principal def main(): print(f"Probando conexión SSE con el servidor {SERVER_URL}") # Establecer la conexión SSE try: # Conectar sin sessionId, el servidor nos enviará un endpoint con el sessionId print("Conectando al servidor SSE...") sse_response = requests.get(f"{SERVER_URL}", stream=True) print(f"Conexión SSE establecida con código de estado: {sse_response.status_code}") # Iniciar un hilo para leer eventos SSE endpoint_event_received = threading.Event() def read_sse(): current_event = None event_data = "" for line in sse_response.iter_lines(): if line: decoded_line = line.decode('utf-8') print(f"Recibido: {decoded_line}") # Manejar líneas de eventos SSE if decoded_line.startswith('event:'): current_event = decoded_line[6:].strip() print(f"Evento: {current_event}") elif decoded_line.startswith('data:'): event_data = decoded_line[5:].strip() print(f"Datos: {event_data}") # Si tenemos un evento y datos, procesarlos if current_event == 'endpoint': print(f"Evento endpoint recibido: {event_data}") # Extraer el sessionId de la URL del endpoint if '?sessionId=' in event_data: global SESSION_ID SESSION_ID = event_data.split('?sessionId=')[1] print(f"Nuevo ID de sesión: {SESSION_ID}") endpoint_event_received.set() current_event = None sse_thread = threading.Thread(target=read_sse) sse_thread.daemon = True sse_thread.start() # Esperar a que se reciba el evento endpoint print("Esperando evento endpoint...") if not endpoint_event_received.wait(timeout=10): print("Tiempo de espera agotado para el evento endpoint") return # Probar get-methods print("\n--- Probando get-methods ---") methods_response = send_request("get-methods", session_id=SESSION_ID) if methods_response: print(f"Métodos disponibles: {json.dumps(methods_response, indent=2)}") # Probar list-tables print("\n--- Probando list-tables ---") tables_response = send_request("list-tables", session_id=SESSION_ID) if tables_response: print(f"Tablas disponibles: {json.dumps(tables_response, indent=2)}") # Mantener el script en ejecución por un tiempo print("\nManteniendo la conexión abierta por 5 segundos...") time.sleep(5) except Exception as e: print(f"Error: {e}") finally: print("Finalizando prueba") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PuroDelphi/mcpFirebird'

If you have feedback or need assistance with the MCP directory API, please join our Discord server