Trino MCP Server

  • tests
""" Pytest configuration for the Trino MCP server tests. """ import os import time import json import pytest import requests import subprocess import signal from typing import Dict, Any, Iterator, Tuple # Define constants TEST_SERVER_PORT = 7000 # Using port 7000 to avoid ALL conflicts with existing containers TEST_SERVER_URL = f"http://localhost:{TEST_SERVER_PORT}" TRINO_HOST = os.environ.get("TEST_TRINO_HOST", "localhost") TRINO_PORT = int(os.environ.get("TEST_TRINO_PORT", "9095")) TRINO_USER = os.environ.get("TEST_TRINO_USER", "trino") class TrinoMCPTestServer: """Helper class to manage a test instance of the Trino MCP server.""" def __init__(self, port: int = TEST_SERVER_PORT): self.port = port self.process = None def start(self) -> None: """Start the server process.""" cmd = [ "python", "-m", "trino_mcp.server", "--transport", "sse", "--port", str(self.port), "--trino-host", TRINO_HOST, "--trino-port", str(TRINO_PORT), "--trino-user", TRINO_USER, "--trino-catalog", "memory", "--debug" ] env = os.environ.copy() env["PYTHONPATH"] = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) self.process = subprocess.Popen( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Wait for server to start self._wait_for_server() def stop(self) -> None: """Stop the server process.""" if self.process: self.process.send_signal(signal.SIGINT) self.process.wait() self.process = None def _wait_for_server(self, max_retries: int = 10, retry_interval: float = 0.5) -> None: """Wait for the server to become available.""" for _ in range(max_retries): try: response = requests.get(f"{TEST_SERVER_URL}/mcp") if response.status_code == 200: return except requests.exceptions.ConnectionError: pass time.sleep(retry_interval) raise TimeoutError(f"Server did not start within {max_retries * retry_interval} seconds") def check_trino_available() -> bool: """Check if Trino server is available for testing.""" try: response = requests.get(f"http://{TRINO_HOST}:{TRINO_PORT}/v1/info") return response.status_code == 200 except requests.exceptions.ConnectionError: return False class MCPClient: """Simple MCP client for testing.""" def __init__(self, base_url: str = TEST_SERVER_URL): self.base_url = base_url self.next_id = 1 self.initialized = False def initialize(self) -> Dict[str, Any]: """Initialize the MCP session.""" if self.initialized: return {"already_initialized": True} response = self._send_request("initialize", { "capabilities": {} }) self.initialized = True return response def list_tools(self) -> Dict[str, Any]: """List available tools.""" return self._send_request("tools/list") def list_resources(self, source: str = None, path: str = None) -> Dict[str, Any]: """List resources.""" params = {} if source: params["source"] = source if path: params["path"] = path return self._send_request("resources/list", params) def get_resource(self, source: str, path: str) -> Dict[str, Any]: """Get a specific resource.""" return self._send_request("resources/get", { "source": source, "path": path }) def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Call a tool with the given arguments.""" return self._send_request("tools/call", { "name": name, "arguments": arguments }) def shutdown(self) -> Dict[str, Any]: """Shutdown the MCP session.""" response = self._send_request("shutdown") self.initialized = False return response def _send_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """Send a JSON-RPC request to the server.""" request = { "jsonrpc": "2.0", "id": self.next_id, "method": method } if params is not None: request["params"] = params self.next_id += 1 response = requests.post( f"{self.base_url}/mcp/message", json=request ) if response.status_code != 200: raise Exception(f"Request failed with status {response.status_code}: {response.text}") return response.json() @pytest.fixture(scope="session") def trino_available() -> bool: """Check if Trino is available.""" available = check_trino_available() if not available: pytest.skip("Trino server is not available for testing") return available @pytest.fixture(scope="session") def mcp_server(trino_available) -> Iterator[None]: """ Start a test instance of the Trino MCP server for the test session. Args: trino_available: Fixture to ensure Trino is available. Yields: None """ server = TrinoMCPTestServer() try: server.start() yield finally: server.stop() @pytest.fixture def mcp_client(mcp_server) -> Iterator[MCPClient]: """ Create a test MCP client connected to the test server. Args: mcp_server: The server fixture. Yields: MCPClient: An initialized MCP client. """ client = MCPClient() client.initialize() try: yield client finally: try: client.shutdown() except: pass # Ignore errors during shutdown