Trino MCP Server

  • tests
#!/usr/bin/env python3 """ Simple test script for the Trino MCP client. This script connects to the Trino MCP server and performs some basic operations. """ import json import os import sys import time import threading from typing import Dict, Any, List, Optional, Callable import requests import sseclient import logging from rich.console import Console # Default port for MCP server, changed to match docker-compose.yml DEFAULT_MCP_HOST = "localhost" DEFAULT_MCP_PORT = 9096 class SSEListener: """ Server-Sent Events (SSE) listener for MCP. This runs in a separate thread to receive notifications from the server. """ def __init__(self, url: str, message_callback: Callable[[Dict[str, Any]], None]): """ Initialize the SSE listener. Args: url: The SSE endpoint URL. message_callback: Callback function to handle incoming messages. """ self.url = url self.message_callback = message_callback self.running = False self.thread = None def start(self) -> None: """Start the SSE listener in a separate thread.""" if self.running: return self.running = True self.thread = threading.Thread(target=self._listen) self.thread.daemon = True self.thread.start() def stop(self) -> None: """Stop the SSE listener.""" self.running = False if self.thread: self.thread.join(timeout=1.0) self.thread = None def _listen(self) -> None: """Listen for SSE events.""" try: headers = {"Accept": "text/event-stream"} response = requests.get(self.url, headers=headers, stream=True) client = sseclient.SSEClient(response) for event in client.events(): if not self.running: break try: if event.data: data = json.loads(event.data) self.message_callback(data) except json.JSONDecodeError: print(f"Failed to parse SSE message: {event.data}") except Exception as e: print(f"Error processing SSE message: {e}") except Exception as e: if self.running: print(f"SSE connection error: {e}") finally: self.running = False def test_sse_client(base_url=f"http://localhost:{DEFAULT_MCP_PORT}"): """ Test communication with the SSE transport. Args: base_url: The base URL of the SSE server. """ print(f"Testing SSE client with {base_url}...") # First, let's check what endpoints are available print("Checking available endpoints...") try: response = requests.get(base_url) print(f"Root path status: {response.status_code}") if response.status_code == 200: print(f"Content: {response.text[:500]}") # Print first 500 chars except Exception as e: print(f"Error checking root path: {e}") # Try common MCP endpoints endpoints_to_check = [ "/mcp", "/mcp/sse", "/mcp/2024-11-05", "/mcp/message", "/api/mcp", "/api/mcp/sse" ] for endpoint in endpoints_to_check: try: url = f"{base_url}{endpoint}" print(f"\nChecking endpoint: {url}") response = requests.get(url) print(f"Status: {response.status_code}") if response.status_code == 200: print(f"Content: {response.text[:100]}") # Print first 100 chars except Exception as e: print(f"Error: {e}") # Try the /sse endpoint with proper SSE headers print("\nChecking SSE endpoint with proper headers...") try: sse_url = f"{base_url}/sse" print(f"Connecting to SSE endpoint: {sse_url}") # Setup SSE message handler def handle_sse_message(message): print(f"Received SSE message: {message.data}") # Use the SSEClient to connect properly print("Starting SSE connection...") headers = {"Accept": "text/event-stream"} response = requests.get(sse_url, headers=headers, stream=True) client = sseclient.SSEClient(response) # Try to get the first few events print("Waiting for SSE events...") event_count = 0 for event in client.events(): print(f"Event received: {event.data}") event_count += 1 if event_count >= 3: # Get at most 3 events break except Exception as e: print(f"Error with SSE connection: {e}") if __name__ == "__main__": # Get the server URL from environment or command line server_url = os.environ.get("SERVER_URL", f"http://localhost:{DEFAULT_MCP_PORT}") if len(sys.argv) > 1: server_url = sys.argv[1] test_sse_client(server_url)