conftest.py•6.2 kB
"""
Pytest configuration for the Trino MCP server tests.
"""
import os
import time
import json
import pytest
import requests
import subprocess
import signal
from typing import Dict, Any, Iterator, Tuple
# Define constants
TEST_SERVER_PORT = 7000  # Using port 7000 to avoid ALL conflicts with existing containers
TEST_SERVER_URL = f"http://localhost:{TEST_SERVER_PORT}"
TRINO_HOST = os.environ.get("TEST_TRINO_HOST", "localhost")
TRINO_PORT = int(os.environ.get("TEST_TRINO_PORT", "9095"))
TRINO_USER = os.environ.get("TEST_TRINO_USER", "trino")
class TrinoMCPTestServer:
    """Helper class to manage a test instance of the Trino MCP server."""
    
    def __init__(self, port: int = TEST_SERVER_PORT):
        self.port = port
        self.process = None
        
    def start(self) -> None:
        """Start the server process."""
        cmd = [
            "python", "-m", "trino_mcp.server",
            "--transport", "sse",
            "--port", str(self.port),
            "--trino-host", TRINO_HOST,
            "--trino-port", str(TRINO_PORT),
            "--trino-user", TRINO_USER,
            "--trino-catalog", "memory",
            "--debug"
        ]
        
        env = os.environ.copy()
        env["PYTHONPATH"] = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
        
        self.process = subprocess.Popen(
            cmd,
            env=env,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        
        # Wait for server to start
        self._wait_for_server()
        
    def stop(self) -> None:
        """Stop the server process."""
        if self.process:
            self.process.send_signal(signal.SIGINT)
            self.process.wait()
            self.process = None
            
    def _wait_for_server(self, max_retries: int = 10, retry_interval: float = 0.5) -> None:
        """Wait for the server to become available."""
        for _ in range(max_retries):
            try:
                response = requests.get(f"{TEST_SERVER_URL}/mcp")
                if response.status_code == 200:
                    return
            except requests.exceptions.ConnectionError:
                pass
            
            time.sleep(retry_interval)
            
        raise TimeoutError(f"Server did not start within {max_retries * retry_interval} seconds")
def check_trino_available() -> bool:
    """Check if Trino server is available for testing."""
    try:
        response = requests.get(f"http://{TRINO_HOST}:{TRINO_PORT}/v1/info")
        return response.status_code == 200
    except requests.exceptions.ConnectionError:
        return False
class MCPClient:
    """Simple MCP client for testing."""
    
    def __init__(self, base_url: str = TEST_SERVER_URL):
        self.base_url = base_url
        self.next_id = 1
        self.initialized = False
        
    def initialize(self) -> Dict[str, Any]:
        """Initialize the MCP session."""
        if self.initialized:
            return {"already_initialized": True}
            
        response = self._send_request("initialize", {
            "capabilities": {}
        })
        
        self.initialized = True
        return response
    
    def list_tools(self) -> Dict[str, Any]:
        """List available tools."""
        return self._send_request("tools/list")
    
    def list_resources(self, source: str = None, path: str = None) -> Dict[str, Any]:
        """List resources."""
        params = {}
        if source:
            params["source"] = source
        if path:
            params["path"] = path
            
        return self._send_request("resources/list", params)
    
    def get_resource(self, source: str, path: str) -> Dict[str, Any]:
        """Get a specific resource."""
        return self._send_request("resources/get", {
            "source": source,
            "path": path
        })
    
    def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """Call a tool with the given arguments."""
        return self._send_request("tools/call", {
            "name": name,
            "arguments": arguments
        })
    
    def shutdown(self) -> Dict[str, Any]:
        """Shutdown the MCP session."""
        response = self._send_request("shutdown")
        self.initialized = False
        return response
    
    def _send_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
        """Send a JSON-RPC request to the server."""
        request = {
            "jsonrpc": "2.0",
            "id": self.next_id,
            "method": method
        }
        
        if params is not None:
            request["params"] = params
            
        self.next_id += 1
        
        response = requests.post(
            f"{self.base_url}/mcp/message",
            json=request
        )
        
        if response.status_code != 200:
            raise Exception(f"Request failed with status {response.status_code}: {response.text}")
            
        return response.json()
@pytest.fixture(scope="session")
def trino_available() -> bool:
    """Check if Trino is available."""
    available = check_trino_available()
    if not available:
        pytest.skip("Trino server is not available for testing")
    return available
@pytest.fixture(scope="session")
def mcp_server(trino_available) -> Iterator[None]:
    """
    Start a test instance of the Trino MCP server for the test session.
    
    Args:
        trino_available: Fixture to ensure Trino is available.
        
    Yields:
        None
    """
    server = TrinoMCPTestServer()
    try:
        server.start()
        yield
    finally:
        server.stop()
@pytest.fixture
def mcp_client(mcp_server) -> Iterator[MCPClient]:
    """
    Create a test MCP client connected to the test server.
    
    Args:
        mcp_server: The server fixture.
        
    Yields:
        MCPClient: An initialized MCP client.
    """
    client = MCPClient()
    client.initialize()
    try:
        yield client
    finally:
        try:
            client.shutdown()
        except:
            pass  # Ignore errors during shutdown