"""
Pytest configuration for the Trino MCP server tests.
"""
import os
import time
import json
import pytest
import requests
import subprocess
import signal
from typing import Dict, Any, Iterator, Tuple
# Define constants
TEST_SERVER_PORT = 7000 # Using port 7000 to avoid ALL conflicts with existing containers
TEST_SERVER_URL = f"http://localhost:{TEST_SERVER_PORT}"
TRINO_HOST = os.environ.get("TEST_TRINO_HOST", "localhost")
TRINO_PORT = int(os.environ.get("TEST_TRINO_PORT", "9095"))
TRINO_USER = os.environ.get("TEST_TRINO_USER", "trino")
class TrinoMCPTestServer:
"""Helper class to manage a test instance of the Trino MCP server."""
def __init__(self, port: int = TEST_SERVER_PORT):
self.port = port
self.process = None
def start(self) -> None:
"""Start the server process."""
cmd = [
"python", "-m", "trino_mcp.server",
"--transport", "sse",
"--port", str(self.port),
"--trino-host", TRINO_HOST,
"--trino-port", str(TRINO_PORT),
"--trino-user", TRINO_USER,
"--trino-catalog", "memory",
"--debug"
]
env = os.environ.copy()
env["PYTHONPATH"] = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
self.process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Wait for server to start
self._wait_for_server()
def stop(self) -> None:
"""Stop the server process."""
if self.process:
self.process.send_signal(signal.SIGINT)
self.process.wait()
self.process = None
def _wait_for_server(self, max_retries: int = 10, retry_interval: float = 0.5) -> None:
"""Wait for the server to become available."""
for _ in range(max_retries):
try:
response = requests.get(f"{TEST_SERVER_URL}/mcp")
if response.status_code == 200:
return
except requests.exceptions.ConnectionError:
pass
time.sleep(retry_interval)
raise TimeoutError(f"Server did not start within {max_retries * retry_interval} seconds")
def check_trino_available() -> bool:
"""Check if Trino server is available for testing."""
try:
response = requests.get(f"http://{TRINO_HOST}:{TRINO_PORT}/v1/info")
return response.status_code == 200
except requests.exceptions.ConnectionError:
return False
class MCPClient:
"""Simple MCP client for testing."""
def __init__(self, base_url: str = TEST_SERVER_URL):
self.base_url = base_url
self.next_id = 1
self.initialized = False
def initialize(self) -> Dict[str, Any]:
"""Initialize the MCP session."""
if self.initialized:
return {"already_initialized": True}
response = self._send_request("initialize", {
"capabilities": {}
})
self.initialized = True
return response
def list_tools(self) -> Dict[str, Any]:
"""List available tools."""
return self._send_request("tools/list")
def list_resources(self, source: str = None, path: str = None) -> Dict[str, Any]:
"""List resources."""
params = {}
if source:
params["source"] = source
if path:
params["path"] = path
return self._send_request("resources/list", params)
def get_resource(self, source: str, path: str) -> Dict[str, Any]:
"""Get a specific resource."""
return self._send_request("resources/get", {
"source": source,
"path": path
})
def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Call a tool with the given arguments."""
return self._send_request("tools/call", {
"name": name,
"arguments": arguments
})
def shutdown(self) -> Dict[str, Any]:
"""Shutdown the MCP session."""
response = self._send_request("shutdown")
self.initialized = False
return response
def _send_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Send a JSON-RPC request to the server."""
request = {
"jsonrpc": "2.0",
"id": self.next_id,
"method": method
}
if params is not None:
request["params"] = params
self.next_id += 1
response = requests.post(
f"{self.base_url}/mcp/message",
json=request
)
if response.status_code != 200:
raise Exception(f"Request failed with status {response.status_code}: {response.text}")
return response.json()
@pytest.fixture(scope="session")
def trino_available() -> bool:
"""Check if Trino is available."""
available = check_trino_available()
if not available:
pytest.skip("Trino server is not available for testing")
return available
@pytest.fixture(scope="session")
def mcp_server(trino_available) -> Iterator[None]:
"""
Start a test instance of the Trino MCP server for the test session.
Args:
trino_available: Fixture to ensure Trino is available.
Yields:
None
"""
server = TrinoMCPTestServer()
try:
server.start()
yield
finally:
server.stop()
@pytest.fixture
def mcp_client(mcp_server) -> Iterator[MCPClient]:
"""
Create a test MCP client connected to the test server.
Args:
mcp_server: The server fixture.
Yields:
MCPClient: An initialized MCP client.
"""
client = MCPClient()
client.initialize()
try:
yield client
finally:
try:
client.shutdown()
except:
pass # Ignore errors during shutdown