Skip to main content
Glama
johannhartmann

MCP Code Analysis Server

test_docker_mcp.py12.7 kB
"""Integration tests for MCP server running in Docker.""" import asyncio import json import subprocess import tempfile from collections.abc import Iterator from pathlib import Path from typing import Any import httpx import os import pytest class TestDockerMCPIntegration: """Test MCP server running in Docker containers.""" @pytest.fixture def mcp_server_url(self) -> str: """Get MCP server URL.""" port = os.environ.get("MCP_EXTERNAL_PORT", "8080") return f"http://localhost:{port}/mcp/" @pytest.fixture def test_repo_path(self) -> Iterator[str]: """Create a test Git repository accessible inside the Docker container. We create it under the project directory (mounted ro as /host_project in the container) and return the container-visible path so the server can access it. """ project_root = Path(__file__).resolve().parents[2] with tempfile.TemporaryDirectory(dir=project_root / "tests") as tmpdir: host_repo_path = Path(tmpdir) / "test_repo" host_repo_path.mkdir() # Create a simple Python file (host_repo_path / "main.py").write_text( ''' """Test module.""" def hello_world(): """Print hello world.""" print("Hello, World!") class Calculator: """Simple calculator.""" def add(self, a, b): """Add two numbers.""" return a + b ''', ) # Initialize git repo (config necessary for github ci) subprocess.run(["git", "init"], cwd=host_repo_path, check=True) subprocess.run( ["git", "config", "user.email", "test@invalid"], cwd=host_repo_path, check=True, ) subprocess.run( ["git", "config", "user.name", "test"], cwd=host_repo_path, check=True ) subprocess.run(["git", "add", "."], cwd=host_repo_path, check=True) subprocess.run( ["git", "commit", "-m", "Initial commit"], cwd=host_repo_path, check=True, ) # Translate host path to container-visible path under /host_project rel = host_repo_path.relative_to(project_root) container_path = f"/host_project/{rel.as_posix()}" yield container_path async def send_mcp_request( self, url: str, method: str, params: dict[str, Any] | None = None ) -> list[dict[str, Any]]: """Send an MCP request to the server.""" initialization_request_data = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2025-03-26", "capabilities": {}, "clientInfo": {"name": "test", "version": "0.0.1"}, }, } initialized_request_data = { "jsonrpc": "2.0", "method": "notifications/initialized", } request_data = { "jsonrpc": "2.0", "method": method, "params": params or {}, "id": 2, } headers = { "Content-Type": "application/json", "Accept": "application/json, text/event-stream", } async with httpx.AsyncClient() as client: # Initial POST initialization_response = await client.post( url, json=initialization_request_data, headers=headers, timeout=30.0, follow_redirects=True, ) # Capture session id if provided sid = initialization_response.headers.get("mcp-session-id") if sid: headers["mcp-session-id"] = sid await client.post( url, json=initialized_request_data, headers=headers, timeout=30.0, follow_redirects=True, ) response = await client.post( url, json=request_data, headers=headers, timeout=30.0, follow_redirects=True, ) # If initial succeeded, parse and return if response.status_code == 200: result: list[dict[str, Any]] = [] async for line in response.aiter_lines(): raw = line.strip() if not raw: continue if raw.startswith("data:"): raw = raw[len("data:") :].strip() if raw.startswith(":"): continue try: data = json.loads(raw) result.append(data) except json.JSONDecodeError: continue return result # No session id supplied and initial not 200 -> raise raise httpx.ConnectError( f"Request failed with status {response.status_code}: {response.text}" ) @staticmethod def _content_text_to_obj(event: dict[str, Any]) -> dict[str, Any]: """Helper to normalize event content[0]["text"] into a dict. The server may return the text field as a JSON string; parse if needed. """ content = event["result"]["content"][0]["text"] if isinstance(content, str): try: content = json.loads(content) except json.JSONDecodeError: # Fallback to empty structure to avoid KeyErrors content = {} return content @pytest.mark.asyncio @pytest.mark.integration async def test_list_tools(self, mcp_server_url: str) -> None: """Test listing available tools.""" result = await self.send_mcp_request(mcp_server_url, "tools/list") assert len(result) > 0 response = result[-1] # Get the final response assert "result" in response assert "tools" in response["result"] # Check that we have the expected tools tool_names = [tool["name"] for tool in response["result"]["tools"]] expected_tools = [ "add_repository", "list_repositories", "scan_repository", "semantic_search", "keyword_search", "get_code", "analyze_file", ] for expected in expected_tools: assert expected in tool_names, f"Missing tool: {expected}" @pytest.mark.asyncio @pytest.mark.integration async def test_add_and_scan_repository( self, mcp_server_url: str, test_repo_path: Path, ) -> None: """Test adding and scanning a repository.""" # First, list repositories to check initial state list_result = await self.send_mcp_request( mcp_server_url, "tools/call", {"name": "list_repositories", "arguments": {}}, ) list_response = list_result[-1] list_content = self._content_text_to_obj(list_response) initial_count = len(list_content.get("repositories", [])) # Add the test repository add_result = await self.send_mcp_request( mcp_server_url, "tools/call", { "name": "add_repository", "arguments": { "url": f"file://{test_repo_path}", "scan_immediately": True, "generate_embeddings": False, }, }, ) add_response = add_result[-1] add_content = self._content_text_to_obj(add_response) assert add_content.get("success") is True repo_id = add_content["repository_id"] # List repositories again to verify it was added list_result2 = await self.send_mcp_request( mcp_server_url, "tools/call", {"name": "list_repositories", "arguments": {"include_stats": True}}, ) list_response2 = list_result2[-1] list_content2 = self._content_text_to_obj(list_response2) final_count = list_content2["count"] assert final_count == initial_count + 1 # Check that files were scanned repos = list_content2["repositories"] test_repo = next((r for r in repos if r["id"] == repo_id), None) assert test_repo is not None assert test_repo["stats"]["total_files"] > 0 @pytest.mark.asyncio @pytest.mark.integration async def test_search_functionality( self, mcp_server_url: str, test_repo_path: Path ) -> None: """Test search functionality after indexing.""" # Add and scan repository add_result = await self.send_mcp_request( mcp_server_url, "tools/call", { "name": "add_repository", "arguments": { "url": f"file://{test_repo_path}", "scan_immediately": True, "generate_embeddings": False, }, }, ) add_response = add_result[-1] add_content = self._content_text_to_obj(add_response) repo_id = add_content["repository_id"] # Wait a bit for indexing to complete await asyncio.sleep(2) # Search for the Calculator class search_result = await self.send_mcp_request( mcp_server_url, "tools/call", { "name": "keyword_search", "arguments": { "keywords": ["Calculator"], "scope": "all", "repository_id": repo_id, "limit": 10, }, }, ) search_response = search_result[-1] search_content = self._content_text_to_obj(search_response) assert search_content.get("success") is True results = search_content["results"] assert len(results) > 0 # Check that we found the Calculator class calculator_found = any( r["entity"]["name"] == "Calculator" and r["entity"]["type"] == "class" for r in results ) assert calculator_found, "Calculator class not found in search results" @pytest.mark.asyncio @pytest.mark.integration async def test_get_code(self, mcp_server_url: str, test_repo_path: Path) -> None: """Test getting code for a specific entity.""" # Add and scan repository add_result = await self.send_mcp_request( mcp_server_url, "tools/call", { "name": "add_repository", "arguments": { "url": f"file://{test_repo_path}", "scan_immediately": True, "generate_embeddings": False, }, }, ) repo_id = add_result[-1]["result"]["content"][0]["text"]["repository_id"] # Wait for indexing await asyncio.sleep(2) # Search for Calculator class to get its ID search_result = await self.send_mcp_request( mcp_server_url, "tools/call", { "name": "keyword_search", "arguments": { "keywords": ["Calculator"], "scope": "classes", "repository_id": repo_id, "limit": 10, }, }, ) search_content = self._content_text_to_obj(search_result[-1]) results = search_content["results"] calc_result = next( (r for r in results if r["entity"]["name"] == "Calculator"), None, ) assert calc_result is not None entity_id = calc_result["entity"]["id"] # Get the code for the Calculator class code_result = await self.send_mcp_request( mcp_server_url, "tools/call", { "name": "get_code", "arguments": { "entity_type": "class", "entity_id": entity_id, "include_context": True, }, }, ) code_response = code_result[-1] code_content = self._content_text_to_obj(code_response) assert code_content.get("success") is True code = code_content["code"] assert "class Calculator:" in code assert "def add(" in code if __name__ == "__main__": pytest.main([__file__, "-v"])

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johannhartmann/mcpcodeanalysis'

If you have feedback or need assistance with the MCP directory API, please join our Discord server