test_mcp_server.py•11.8 kB
"""Integration tests for MCP server."""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from pathlib import Path
from src.server.mcp_server import MCPServer
from src.registry.tool_registry import ToolRegistry
from src.cache.redis_cache import CacheManager
from src.utils.rate_limiter import RateLimiter
class TestMCPServerIntegration:
"""Integration tests for MCP server."""
@pytest.fixture
async def mcp_server(self, tmp_path):
"""Create MCP server instance for testing."""
# Create metadata directory
metadata_dir = tmp_path / "metadata" / "tools"
metadata_dir.mkdir(parents=True)
# Create mock metadata files
import json
pan_verify_meta = {
"name": "verify_pan",
"description": "Verify PAN card",
"input_schema": {
"type": "object",
"properties": {
"pan": {"type": "string"},
"name_as_per_pan": {"type": "string"},
"date_of_birth": {"type": "string"},
"consent": {"type": "string"},
"reason": {"type": "string"}
},
"required": ["pan", "name_as_per_pan", "date_of_birth", "consent", "reason"]
}
}
with open(metadata_dir / "pan_verification.json", "w") as f:
json.dump(pan_verify_meta, f)
# Mock configuration
with patch('src.server.mcp_server.settings') as mock_settings:
mock_settings.KYC_API_BASE_URL = "https://api.test.com"
mock_settings.KYC_API_KEY = "test_key"
mock_settings.JWT_SECRET_KEY = "test_secret"
mock_settings.REDIS_HOST = "localhost"
mock_settings.REDIS_PORT = 6379
# Create server (implementation would need to be imported)
# This is a placeholder for the actual server initialization
server = MagicMock()
server.registry = ToolRegistry(metadata_dir=metadata_dir / "..")
server.registry.initialize()
server.cache = CacheManager()
server.rate_limiter = RateLimiter()
yield server
@pytest.mark.asyncio
async def test_server_initialization(self, mcp_server):
"""Test that server initializes correctly."""
assert mcp_server.registry is not None
assert mcp_server.cache is not None
assert mcp_server.rate_limiter is not None
@pytest.mark.asyncio
async def test_list_tools(self, mcp_server):
"""Test listing available tools."""
tools = mcp_server.registry.list_tools()
assert len(tools) > 0
assert any(tool["name"] == "verify_pan" for tool in tools)
@pytest.mark.asyncio
async def test_tool_execution_with_cache(self, mcp_server):
"""Test tool execution with caching."""
# Mock tool
mock_tool = AsyncMock()
mock_tool.get_name.return_value = "verify_pan"
mock_tool.get_cache_key.return_value = "test_cache_key"
mock_tool.get_cache_ttl.return_value = 3600
mock_tool.execute.return_value = {"result": "success"}
mcp_server.registry.tools["verify_pan"] = mock_tool
# Mock cache
mcp_server.cache.get = AsyncMock(return_value=None)
mcp_server.cache.set = AsyncMock(return_value=True)
# Execute tool
params = {"pan": "ABCDE1234F"}
result = await mock_tool.execute(params)
assert result == {"result": "success"}
mock_tool.execute.assert_called_once()
@pytest.mark.asyncio
async def test_tool_execution_cache_hit(self, mcp_server):
"""Test tool execution with cache hit."""
# Mock tool
mock_tool = AsyncMock()
mock_tool.get_name.return_value = "verify_pan"
mock_tool.get_cache_key.return_value = "test_cache_key"
mcp_server.registry.tools["verify_pan"] = mock_tool
# Mock cache with existing data
cached_result = {"result": "cached"}
mcp_server.cache.get = AsyncMock(return_value=cached_result)
# In a real implementation, the server would check cache first
cache_result = await mcp_server.cache.get("test_cache_key")
assert cache_result == cached_result
# Tool should not be executed when cache hit occurs
@pytest.mark.asyncio
async def test_rate_limiting_enforcement(self, mcp_server):
"""Test that rate limiting is enforced."""
tool_name = "verify_pan"
# Mock rate limiter to deny request
mcp_server.rate_limiter.check_limit = AsyncMock(return_value=False)
# Attempt to execute tool
allowed = await mcp_server.rate_limiter.check_limit(tool_name)
assert allowed is False
@pytest.mark.asyncio
async def test_multiple_tool_registrations(self, mcp_server, tmp_path):
"""Test registering multiple tools."""
# Add another tool metadata
import json
metadata_dir = tmp_path / "metadata" / "tools"
pan_aadhaar_meta = {
"name": "check_pan_aadhaar_link",
"description": "Check PAN-Aadhaar link",
"input_schema": {"type": "object"}
}
with open(metadata_dir / "pan_aadhaar_link.json", "w") as f:
json.dump(pan_aadhaar_meta, f)
# Reload metadata
mcp_server.registry.tool_metadata = mcp_server.registry.load_metadata()
# Register mock tools
mock_tool1 = MagicMock()
mock_tool1.get_name.return_value = "verify_pan"
mock_tool2 = MagicMock()
mock_tool2.get_name.return_value = "check_pan_aadhaar_link"
mcp_server.registry.register_tool(mock_tool1)
mcp_server.registry.register_tool(mock_tool2)
tools = mcp_server.registry.list_tools()
assert len(tools) == 2
@pytest.mark.asyncio
async def test_error_handling_in_tool_execution(self, mcp_server):
"""Test error handling during tool execution."""
# Mock tool that raises exception
mock_tool = AsyncMock()
mock_tool.get_name.return_value = "verify_pan"
mock_tool.execute.side_effect = Exception("Tool execution failed")
mcp_server.registry.tools["verify_pan"] = mock_tool
# Execute tool and expect exception
with pytest.raises(Exception, match="Tool execution failed"):
await mock_tool.execute({})
@pytest.mark.asyncio
async def test_concurrent_tool_executions(self, mcp_server):
"""Test concurrent execution of tools."""
import asyncio
# Mock tool
mock_tool = AsyncMock()
mock_tool.get_name.return_value = "verify_pan"
mock_tool.execute.return_value = {"result": "success"}
mcp_server.registry.tools["verify_pan"] = mock_tool
# Execute multiple times concurrently
results = await asyncio.gather(*[
mock_tool.execute({"pan": f"PAN{i}"}) for i in range(5)
])
assert len(results) == 5
assert all(r == {"result": "success"} for r in results)
@pytest.mark.asyncio
async def test_cache_and_rate_limit_interaction(self, mcp_server):
"""Test interaction between cache and rate limiting."""
tool_name = "verify_pan"
cache_key = "test_key"
# First request: rate limit passes, cache miss
mcp_server.rate_limiter.check_limit = AsyncMock(return_value=True)
mcp_server.cache.get = AsyncMock(return_value=None)
mcp_server.cache.set = AsyncMock(return_value=True)
rate_ok = await mcp_server.rate_limiter.check_limit(tool_name)
cache_result = await mcp_server.cache.get(cache_key)
assert rate_ok is True
assert cache_result is None
# Second request: cache hit (should bypass rate limit check in real impl)
mcp_server.cache.get = AsyncMock(return_value={"cached": "data"})
cache_result = await mcp_server.cache.get(cache_key)
assert cache_result == {"cached": "data"}
@pytest.mark.asyncio
async def test_tool_metadata_validation(self, mcp_server):
"""Test that tool metadata is properly validated."""
metadata = mcp_server.registry.get_tool_metadata("verify_pan")
assert metadata is not None
assert "name" in metadata
assert "description" in metadata
assert "input_schema" in metadata
assert metadata["name"] == "verify_pan"
@pytest.mark.asyncio
async def test_invalid_tool_request(self, mcp_server):
"""Test handling of invalid tool requests."""
# Request non-existent tool
tool = mcp_server.registry.get_tool("nonexistent_tool")
assert tool is None
@pytest.mark.asyncio
async def test_server_health_check(self, mcp_server):
"""Test server health check functionality."""
# Verify all components are initialized
assert mcp_server.registry is not None
assert mcp_server.cache is not None
assert mcp_server.rate_limiter is not None
# Verify tools are loaded
tools = mcp_server.registry.list_tools()
assert len(tools) > 0
@pytest.mark.asyncio
async def test_graceful_shutdown(self, mcp_server):
"""Test graceful server shutdown."""
# Mock cleanup methods
mcp_server.cache.close = AsyncMock()
# Perform shutdown
await mcp_server.cache.close()
mcp_server.cache.close.assert_called_once()
@pytest.mark.asyncio
async def test_tool_execution_with_validation_error(self, mcp_server):
"""Test tool execution with validation error."""
# Mock tool that raises validation error
mock_tool = AsyncMock()
mock_tool.get_name.return_value = "verify_pan"
mock_tool.execute.side_effect = ValueError("Invalid input")
mcp_server.registry.tools["verify_pan"] = mock_tool
with pytest.raises(ValueError, match="Invalid input"):
await mock_tool.execute({"invalid": "params"})
@pytest.mark.asyncio
async def test_cache_failure_fallback(self, mcp_server):
"""Test that tool execution continues when cache fails."""
# Mock tool
mock_tool = AsyncMock()
mock_tool.get_name.return_value = "verify_pan"
mock_tool.execute.return_value = {"result": "success"}
mcp_server.registry.tools["verify_pan"] = mock_tool
# Mock cache failure
mcp_server.cache.get = AsyncMock(side_effect=Exception("Cache error"))
mcp_server.cache.set = AsyncMock(side_effect=Exception("Cache error"))
# Tool should still execute successfully
result = await mock_tool.execute({"pan": "ABCDE1234F"})
assert result == {"result": "success"}
@pytest.mark.asyncio
async def test_rate_limit_recovery(self, mcp_server):
"""Test rate limit recovery after waiting."""
tool_name = "verify_pan"
# Simulate rate limit exceeded
mcp_server.rate_limiter.check_limit = AsyncMock(return_value=False)
allowed = await mcp_server.rate_limiter.check_limit(tool_name)
assert allowed is False
# Simulate recovery
mcp_server.rate_limiter.check_limit = AsyncMock(return_value=True)
allowed = await mcp_server.rate_limiter.check_limit(tool_name)
assert allowed is True