Skip to main content
Glama
test_mcp_server.py11.8 kB
"""Integration tests for MCP server.""" import pytest from unittest.mock import AsyncMock, MagicMock, patch from pathlib import Path from src.server.mcp_server import MCPServer from src.registry.tool_registry import ToolRegistry from src.cache.redis_cache import CacheManager from src.utils.rate_limiter import RateLimiter class TestMCPServerIntegration: """Integration tests for MCP server.""" @pytest.fixture async def mcp_server(self, tmp_path): """Create MCP server instance for testing.""" # Create metadata directory metadata_dir = tmp_path / "metadata" / "tools" metadata_dir.mkdir(parents=True) # Create mock metadata files import json pan_verify_meta = { "name": "verify_pan", "description": "Verify PAN card", "input_schema": { "type": "object", "properties": { "pan": {"type": "string"}, "name_as_per_pan": {"type": "string"}, "date_of_birth": {"type": "string"}, "consent": {"type": "string"}, "reason": {"type": "string"} }, "required": ["pan", "name_as_per_pan", "date_of_birth", "consent", "reason"] } } with open(metadata_dir / "pan_verification.json", "w") as f: json.dump(pan_verify_meta, f) # Mock configuration with patch('src.server.mcp_server.settings') as mock_settings: mock_settings.KYC_API_BASE_URL = "https://api.test.com" mock_settings.KYC_API_KEY = "test_key" mock_settings.JWT_SECRET_KEY = "test_secret" mock_settings.REDIS_HOST = "localhost" mock_settings.REDIS_PORT = 6379 # Create server (implementation would need to be imported) # This is a placeholder for the actual server initialization server = MagicMock() server.registry = ToolRegistry(metadata_dir=metadata_dir / "..") server.registry.initialize() server.cache = CacheManager() server.rate_limiter = RateLimiter() yield server @pytest.mark.asyncio async def test_server_initialization(self, mcp_server): """Test that server initializes correctly.""" assert mcp_server.registry is not None assert mcp_server.cache is not None assert mcp_server.rate_limiter is not None @pytest.mark.asyncio async def test_list_tools(self, mcp_server): """Test listing available tools.""" tools = mcp_server.registry.list_tools() assert len(tools) > 0 assert any(tool["name"] == "verify_pan" for tool in tools) @pytest.mark.asyncio async def test_tool_execution_with_cache(self, mcp_server): """Test tool execution with caching.""" # Mock tool mock_tool = AsyncMock() mock_tool.get_name.return_value = "verify_pan" mock_tool.get_cache_key.return_value = "test_cache_key" mock_tool.get_cache_ttl.return_value = 3600 mock_tool.execute.return_value = {"result": "success"} mcp_server.registry.tools["verify_pan"] = mock_tool # Mock cache mcp_server.cache.get = AsyncMock(return_value=None) mcp_server.cache.set = AsyncMock(return_value=True) # Execute tool params = {"pan": "ABCDE1234F"} result = await mock_tool.execute(params) assert result == {"result": "success"} mock_tool.execute.assert_called_once() @pytest.mark.asyncio async def test_tool_execution_cache_hit(self, mcp_server): """Test tool execution with cache hit.""" # Mock tool mock_tool = AsyncMock() mock_tool.get_name.return_value = "verify_pan" mock_tool.get_cache_key.return_value = "test_cache_key" mcp_server.registry.tools["verify_pan"] = mock_tool # Mock cache with existing data cached_result = {"result": "cached"} mcp_server.cache.get = AsyncMock(return_value=cached_result) # In a real implementation, the server would check cache first cache_result = await mcp_server.cache.get("test_cache_key") assert cache_result == cached_result # Tool should not be executed when cache hit occurs @pytest.mark.asyncio async def test_rate_limiting_enforcement(self, mcp_server): """Test that rate limiting is enforced.""" tool_name = "verify_pan" # Mock rate limiter to deny request mcp_server.rate_limiter.check_limit = AsyncMock(return_value=False) # Attempt to execute tool allowed = await mcp_server.rate_limiter.check_limit(tool_name) assert allowed is False @pytest.mark.asyncio async def test_multiple_tool_registrations(self, mcp_server, tmp_path): """Test registering multiple tools.""" # Add another tool metadata import json metadata_dir = tmp_path / "metadata" / "tools" pan_aadhaar_meta = { "name": "check_pan_aadhaar_link", "description": "Check PAN-Aadhaar link", "input_schema": {"type": "object"} } with open(metadata_dir / "pan_aadhaar_link.json", "w") as f: json.dump(pan_aadhaar_meta, f) # Reload metadata mcp_server.registry.tool_metadata = mcp_server.registry.load_metadata() # Register mock tools mock_tool1 = MagicMock() mock_tool1.get_name.return_value = "verify_pan" mock_tool2 = MagicMock() mock_tool2.get_name.return_value = "check_pan_aadhaar_link" mcp_server.registry.register_tool(mock_tool1) mcp_server.registry.register_tool(mock_tool2) tools = mcp_server.registry.list_tools() assert len(tools) == 2 @pytest.mark.asyncio async def test_error_handling_in_tool_execution(self, mcp_server): """Test error handling during tool execution.""" # Mock tool that raises exception mock_tool = AsyncMock() mock_tool.get_name.return_value = "verify_pan" mock_tool.execute.side_effect = Exception("Tool execution failed") mcp_server.registry.tools["verify_pan"] = mock_tool # Execute tool and expect exception with pytest.raises(Exception, match="Tool execution failed"): await mock_tool.execute({}) @pytest.mark.asyncio async def test_concurrent_tool_executions(self, mcp_server): """Test concurrent execution of tools.""" import asyncio # Mock tool mock_tool = AsyncMock() mock_tool.get_name.return_value = "verify_pan" mock_tool.execute.return_value = {"result": "success"} mcp_server.registry.tools["verify_pan"] = mock_tool # Execute multiple times concurrently results = await asyncio.gather(*[ mock_tool.execute({"pan": f"PAN{i}"}) for i in range(5) ]) assert len(results) == 5 assert all(r == {"result": "success"} for r in results) @pytest.mark.asyncio async def test_cache_and_rate_limit_interaction(self, mcp_server): """Test interaction between cache and rate limiting.""" tool_name = "verify_pan" cache_key = "test_key" # First request: rate limit passes, cache miss mcp_server.rate_limiter.check_limit = AsyncMock(return_value=True) mcp_server.cache.get = AsyncMock(return_value=None) mcp_server.cache.set = AsyncMock(return_value=True) rate_ok = await mcp_server.rate_limiter.check_limit(tool_name) cache_result = await mcp_server.cache.get(cache_key) assert rate_ok is True assert cache_result is None # Second request: cache hit (should bypass rate limit check in real impl) mcp_server.cache.get = AsyncMock(return_value={"cached": "data"}) cache_result = await mcp_server.cache.get(cache_key) assert cache_result == {"cached": "data"} @pytest.mark.asyncio async def test_tool_metadata_validation(self, mcp_server): """Test that tool metadata is properly validated.""" metadata = mcp_server.registry.get_tool_metadata("verify_pan") assert metadata is not None assert "name" in metadata assert "description" in metadata assert "input_schema" in metadata assert metadata["name"] == "verify_pan" @pytest.mark.asyncio async def test_invalid_tool_request(self, mcp_server): """Test handling of invalid tool requests.""" # Request non-existent tool tool = mcp_server.registry.get_tool("nonexistent_tool") assert tool is None @pytest.mark.asyncio async def test_server_health_check(self, mcp_server): """Test server health check functionality.""" # Verify all components are initialized assert mcp_server.registry is not None assert mcp_server.cache is not None assert mcp_server.rate_limiter is not None # Verify tools are loaded tools = mcp_server.registry.list_tools() assert len(tools) > 0 @pytest.mark.asyncio async def test_graceful_shutdown(self, mcp_server): """Test graceful server shutdown.""" # Mock cleanup methods mcp_server.cache.close = AsyncMock() # Perform shutdown await mcp_server.cache.close() mcp_server.cache.close.assert_called_once() @pytest.mark.asyncio async def test_tool_execution_with_validation_error(self, mcp_server): """Test tool execution with validation error.""" # Mock tool that raises validation error mock_tool = AsyncMock() mock_tool.get_name.return_value = "verify_pan" mock_tool.execute.side_effect = ValueError("Invalid input") mcp_server.registry.tools["verify_pan"] = mock_tool with pytest.raises(ValueError, match="Invalid input"): await mock_tool.execute({"invalid": "params"}) @pytest.mark.asyncio async def test_cache_failure_fallback(self, mcp_server): """Test that tool execution continues when cache fails.""" # Mock tool mock_tool = AsyncMock() mock_tool.get_name.return_value = "verify_pan" mock_tool.execute.return_value = {"result": "success"} mcp_server.registry.tools["verify_pan"] = mock_tool # Mock cache failure mcp_server.cache.get = AsyncMock(side_effect=Exception("Cache error")) mcp_server.cache.set = AsyncMock(side_effect=Exception("Cache error")) # Tool should still execute successfully result = await mock_tool.execute({"pan": "ABCDE1234F"}) assert result == {"result": "success"} @pytest.mark.asyncio async def test_rate_limit_recovery(self, mcp_server): """Test rate limit recovery after waiting.""" tool_name = "verify_pan" # Simulate rate limit exceeded mcp_server.rate_limiter.check_limit = AsyncMock(return_value=False) allowed = await mcp_server.rate_limiter.check_limit(tool_name) assert allowed is False # Simulate recovery mcp_server.rate_limiter.check_limit = AsyncMock(return_value=True) allowed = await mcp_server.rate_limiter.check_limit(tool_name) assert allowed is True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/CTD-Techs/CTD-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server