Crawlab MCP Server
Official
by crawlab-team
- tests
- clients
import os
import sys
from unittest.mock import AsyncMock, MagicMock
import pytest
import time
# Add the parent directory to sys.path to import the client module
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crawlab_mcp.clients.client import MCPClient
@pytest.fixture
def mcp_client():
"""Create a real MCPClient for testing"""
client = MCPClient()
return client
# Test MCPClient core properties
def test_mcp_client_init(mcp_client):
"""Test that MCPClient initializes with the correct core properties"""
assert mcp_client.session is None
assert isinstance(mcp_client.tools, list)
assert isinstance(mcp_client.tool_tags, list)
assert mcp_client.connection_type == "sse"
assert hasattr(mcp_client, "api_key")
# Test connection method returns an exit stack
@pytest.mark.asyncio
async def test_connect_to_server_returns_exit_stack(monkeypatch, mcp_client):
"""Test that connect_to_server returns an exit stack for resource management"""
# Mock dependencies
mock_read_stream = AsyncMock()
mock_write_stream = AsyncMock()
mock_session = AsyncMock()
mock_exit_stack = AsyncMock()
mock_client_session = AsyncMock()
# Mock the AsyncExitStack
mock_exit_stack.enter_async_context.side_effect = [
(mock_read_stream, mock_write_stream),
mock_session
]
# Setup mock responses
mock_session.list_tools.return_value = MagicMock(tools=[])
mock_session.call_tool.return_value = MagicMock(
content='{"tags": []}'
)
# Apply patches
monkeypatch.setattr("crawlab_mcp.clients.client.AsyncExitStack", lambda: mock_exit_stack)
monkeypatch.setattr("crawlab_mcp.clients.client.ClientSession", lambda *args: mock_client_session)
monkeypatch.setattr("crawlab_mcp.clients.client.sse_client", AsyncMock(
return_value=(mock_read_stream, mock_write_stream)))
# Call the method
result = await mcp_client.connect_to_server("http://test-server.com")
# Verify result is the exit stack
assert result == mock_exit_stack
@pytest.mark.asyncio
async def test_real_connection_to_server(mcp_client):
"""Test a real connection to the MCP server.
This test starts a real MCP server using the CLI module and then
connects to it with a real client.
"""
import asyncio
import contextlib
import multiprocessing
import os
import tempfile
from crawlab_mcp.servers.server import create_mcp_server, run_with_sse
# Generate a random port number to avoid conflicts
import random
port = random.randint(20000, 30000)
host = "127.0.0.1"
# Use a temp file for the openapi spec
# If this fails, we can use a fixed path from the repo
try:
# Create a temporary file for a minimal OpenAPI spec
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.yaml') as f:
spec_file = f.name
f.write("""
openapi: 3.0.0
info:
title: Test API
version: 1.0.0
paths:
/test:
get:
operationId: test
summary: Test endpoint
responses:
'200':
description: Success
""")
except Exception as e:
# If temp file creation fails, use a default spec from the repo
print(f"Failed to create temp spec file: {e}")
# Look for spec file in common locations
repo_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
spec_file = os.path.join(repo_root, "openapi", "openapi.yaml")
if not os.path.exists(spec_file):
spec_file = os.path.join(repo_root, "..", "openapi", "openapi.yaml")
# Define server process function to run in another process
def run_server(host, port, spec_path):
try:
# Create and run the server
mcp_server = create_mcp_server(spec_path)
run_with_sse(mcp_server, host=host, port=port)
except KeyboardInterrupt:
print("Server stopped by Ctrl+C")
except Exception as e:
print(f"Server error: {e}")
# Start server in a separate process
server_process = multiprocessing.Process(
target=run_server,
args=(host, port, spec_file),
daemon=True # Automatically terminate when parent process exits
)
try:
# Start the server
server_process.start()
# Wait for server to start
print(f"Waiting for server to start on {host}:{port}...")
for _ in range(10):
await asyncio.sleep(0.5)
try:
# Try connecting to see if server is up
reader, writer = await asyncio.open_connection(host, port)
writer.close()
await writer.wait_closed()
print(f"Server started on {host}:{port}")
break
except (ConnectionRefusedError, OSError):
continue
else:
raise RuntimeError(f"Failed to connect to server on {host}:{port}")
# Connect to the real server
server_url = f"http://{host}:{port}/sse"
exit_stack = await mcp_client.connect_to_server(server_url)
try:
# Verify client is properly initialized
assert mcp_client.session is not None, "Session should be established"
assert hasattr(mcp_client.session, "initialize"), "Session should have initialize method"
# Verify tools are loaded
assert isinstance(mcp_client.tools, list), "Tools should be a list"
# Verify tags are loaded
assert isinstance(mcp_client.tool_tags, list), "Tool tags should be a list"
# Test list_tools operation to confirm the connection is working
tools_response = await mcp_client.session.list_tools()
assert hasattr(tools_response, "tools"), "Should get tools response"
print(f"Connection successful, found {len(mcp_client.tools)} tools")
finally:
# Clean up client resources
await exit_stack.aclose()
finally:
# Clean up the server process
if server_process.is_alive():
print("Stopping server...")
# Use both SIGTERM and then SIGKILL if needed
server_process.terminate()
# Give it a moment to terminate gracefully
for _ in range(5):
if not server_process.is_alive():
break
time.sleep(0.2)
# If still alive, force kill
if server_process.is_alive():
print("Server didn't terminate gracefully, killing...")
server_process.kill()
server_process.join(timeout=1)
# Don't wait indefinitely - just detach if still running
if server_process.is_alive():
print("WARNING: Server process couldn't be terminated properly")
# Setting daemon=True earlier should ensure it's killed on exit
# Remove temp file if created
if spec_file and os.path.exists(spec_file) and spec_file.endswith('.yaml'):
try:
os.unlink(spec_file)
except OSError:
pass