Skip to main content
Glama
test_integration_m1.py39.7 kB
"""Integration tests for Agent MCP Gateway m1-core milestone. This module contains end-to-end integration tests that verify: - Full workflow: list_servers → get_server_tools → execute_tool - Policy enforcement across all gateway tools - Concurrent requests from multiple agents - Session isolation between agents - Timeout handling and error recovery - Component integration (ProxyManager, PolicyEngine, Middleware) - Performance benchmarks against targets Test Strategy: - Mock downstream MCP servers to avoid real connections - Use pytest-asyncio for async test execution - Verify policy enforcement at each layer - Measure and validate latency targets """ import asyncio import json import time from typing import Any from unittest.mock import AsyncMock, Mock, patch import pytest from src.gateway import ( gateway, initialize_gateway, list_servers as list_servers_tool, get_server_tools as get_server_tools_tool, execute_tool as execute_tool_tool ) from src.policy import PolicyEngine from src.proxy import ProxyManager from src.metrics import MetricsCollector from src.middleware import AgentAccessControl from fastmcp.exceptions import ToolError # Extract the actual functions from FastMCP's FunctionTool wrapper list_servers = list_servers_tool.fn get_server_tools = get_server_tools_tool.fn execute_tool = execute_tool_tool.fn # ============================================================================ # Fixtures # ============================================================================ @pytest.fixture def gateway_rules(): """Load test gateway rules configuration.""" return { "agents": { "researcher": { "allow": { "servers": ["brave-search"], "tools": { "brave-search": ["*"] } } }, "backend": { "allow": { "servers": ["postgres", "filesystem"], "tools": { "postgres": ["query", "list_*"], "filesystem": ["read_*", "list_*"] } }, "deny": { "tools": { "postgres": ["drop_*", "delete_*"], "filesystem": ["write_*", "delete_*"] } } }, "admin": { "allow": { "servers": ["*"], "tools": { "postgres": ["*"], "filesystem": ["*"], "brave-search": ["*"] } } } }, "defaults": { "deny_on_missing_agent": True } } @pytest.fixture def mcp_config(): """Load test MCP servers configuration.""" return { "mcpServers": { "brave-search": { "command": "npx", "args": ["-y", "@modelcontextprotocol/server-brave-search"], "description": "Brave search API server" }, "postgres": { "command": "uvx", "args": ["mcp-server-postgres"], "description": "PostgreSQL database server" }, "filesystem": { "command": "npx", "args": ["-y", "@modelcontextprotocol/server-filesystem", "/workspace"], "description": "Filesystem access server" } } } # Mock tool class to simulate FastMCP Tool objects class MockTool: """Mock tool object for testing.""" def __init__(self, name: str, description: str = "", inputSchema: dict | None = None): self.name = name self.description = description self.inputSchema = inputSchema or {} @pytest.fixture def mock_tool_definitions(): """Create mock tool definitions for different servers.""" return { "brave-search": [ MockTool( name="brave_web_search", description="Search the web using Brave", inputSchema={ "type": "object", "properties": { "query": {"type": "string"} } } ), MockTool( name="brave_local_search", description="Search local businesses", inputSchema={ "type": "object", "properties": { "query": {"type": "string"}, "location": {"type": "string"} } } ) ], "postgres": [ MockTool( name="query", description="Execute SQL query", inputSchema={ "type": "object", "properties": { "sql": {"type": "string"} } } ), MockTool( name="list_tables", description="List database tables", inputSchema={"type": "object", "properties": {}} ), MockTool( name="drop_table", description="Drop a database table", inputSchema={ "type": "object", "properties": { "table": {"type": "string"} } } ) ], "filesystem": [ MockTool( name="read_file", description="Read file contents", inputSchema={ "type": "object", "properties": { "path": {"type": "string"} } } ), MockTool( name="list_directory", description="List directory contents", inputSchema={ "type": "object", "properties": { "path": {"type": "string"} } } ), MockTool( name="write_file", description="Write file contents", inputSchema={ "type": "object", "properties": { "path": {"type": "string"}, "content": {"type": "string"} } } ) ] } @pytest.fixture def mock_proxy_manager(mock_tool_definitions): """Create a mock ProxyManager with pre-configured tool responses.""" manager = Mock(spec=ProxyManager) # Mock list_tools to return tool definitions by server async def list_tools_impl(server_name: str): if server_name not in mock_tool_definitions: raise KeyError(f"Server '{server_name}' not found") # Add small delay to simulate network latency (5ms) await asyncio.sleep(0.005) return mock_tool_definitions[server_name] manager.list_tools = AsyncMock(side_effect=list_tools_impl) # Mock call_tool to return success responses async def call_tool_impl( server_name: str, tool_name: str, arguments: dict[str, Any], timeout_ms: int | None = None ): # Add small delay to simulate execution (10ms) await asyncio.sleep(0.010) # Simulate different responses based on tool if "search" in tool_name: return Mock( content=[{ "type": "text", "text": f"Search results for: {arguments.get('query', 'N/A')}" }], isError=False ) elif "query" in tool_name: return Mock( content=[{ "type": "text", "text": "Query executed successfully" }], isError=False ) elif "list" in tool_name: return Mock( content=[{ "type": "text", "text": json.dumps(["item1", "item2", "item3"]) }], isError=False ) elif "read" in tool_name: return Mock( content=[{ "type": "text", "text": "File contents here" }], isError=False ) else: return Mock( content=[{ "type": "text", "text": f"Tool {tool_name} executed" }], isError=False ) manager.call_tool = AsyncMock(side_effect=call_tool_impl) return manager @pytest.fixture def policy_engine(gateway_rules): """Create PolicyEngine instance with test rules.""" return PolicyEngine(gateway_rules) @pytest.fixture def metrics_collector(): """Create MetricsCollector instance.""" collector = MetricsCollector() # Reset metrics before each test collector.reset_sync() return collector @pytest.fixture def initialized_gateway(policy_engine, mcp_config, mock_proxy_manager): """Initialize gateway with all components for testing.""" initialize_gateway(policy_engine, mcp_config, mock_proxy_manager) return gateway # ============================================================================ # Full Workflow Tests # ============================================================================ class TestFullWorkflow: """Test complete workflow through all three gateway tools.""" @pytest.mark.asyncio async def test_researcher_workflow( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test researcher agent complete workflow: discover → get tools → execute. Workflow: 1. list_servers - should see brave-search only 2. get_server_tools - get brave-search tools 3. execute_tool - execute brave_web_search """ agent_id = "researcher" start_time = time.perf_counter() # Step 1: List servers servers = await list_servers(agent_id=agent_id, include_metadata=True) assert len(servers) == 1 assert servers[0]["name"] == "brave-search" assert servers[0]["transport"] == "stdio" # Step 2: Get server tools tools_response = await get_server_tools( agent_id=agent_id, server="brave-search" ) assert "error" not in tools_response or tools_response["error"] is None assert tools_response["server"] == "brave-search" assert tools_response["returned"] == 2 # brave_web_search, brave_local_search assert len(tools_response["tools"]) == 2 # Verify tool names tool_names = {t["name"] for t in tools_response["tools"]} assert "brave_web_search" in tool_names assert "brave_local_search" in tool_names # Step 3: Execute a tool result = await execute_tool( agent_id=agent_id, server="brave-search", tool="brave_web_search", args={"query": "MCP protocol"} ) assert result["isError"] is False assert len(result["content"]) > 0 assert "MCP protocol" in result["content"][0]["text"] # Verify total workflow latency end_time = time.perf_counter() total_latency_ms = (end_time - start_time) * 1000 assert total_latency_ms < 200, f"Workflow took {total_latency_ms:.2f}ms (target: <200ms)" @pytest.mark.asyncio async def test_backend_workflow( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test backend agent workflow with multiple servers. Backend can access: - postgres (tools: query, list_*, deny: drop_*, delete_*) - filesystem (tools: read_*, list_*, deny: write_*, delete_*) """ agent_id = "backend" # Step 1: List servers servers = await list_servers(agent_id=agent_id) assert len(servers) == 2 server_names = {s["name"] for s in servers} assert "postgres" in server_names assert "filesystem" in server_names # Step 2a: Get postgres tools pg_tools = await get_server_tools( agent_id=agent_id, server="postgres" ) assert "error" not in pg_tools or pg_tools["error"] is None pg_tool_names = {t["name"] for t in pg_tools["tools"]} # Should have query and list_tables, but NOT drop_table assert "query" in pg_tool_names assert "list_tables" in pg_tool_names assert "drop_table" not in pg_tool_names # Explicitly denied # Step 2b: Get filesystem tools fs_tools = await get_server_tools( agent_id=agent_id, server="filesystem" ) assert "error" not in fs_tools or fs_tools["error"] is None fs_tool_names = {t["name"] for t in fs_tools["tools"]} # Should have read_file and list_directory, but NOT write_file assert "read_file" in fs_tool_names assert "list_directory" in fs_tool_names assert "write_file" not in fs_tool_names # Explicitly denied # Step 3: Execute allowed tools # Execute postgres query result = await execute_tool( agent_id=agent_id, server="postgres", tool="query", args={"sql": "SELECT * FROM users"} ) assert result["isError"] is False # Execute filesystem read result = await execute_tool( agent_id=agent_id, server="filesystem", tool="read_file", args={"path": "/data/config.json"} ) assert result["isError"] is False @pytest.mark.asyncio async def test_admin_wildcard_workflow( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test admin agent with wildcard access to all servers and tools.""" agent_id = "admin" # Step 1: List servers - should see all servers = await list_servers(agent_id=agent_id) assert len(servers) == 3 server_names = {s["name"] for s in servers} assert "brave-search" in server_names assert "postgres" in server_names assert "filesystem" in server_names # Step 2: Get tools from each server - should see all pg_tools = await get_server_tools(agent_id=agent_id, server="postgres") assert len(pg_tools["tools"]) == 3 # All postgres tools including drop_table fs_tools = await get_server_tools(agent_id=agent_id, server="filesystem") assert len(fs_tools["tools"]) == 3 # All filesystem tools including write_file # Step 3: Execute any tool (including ones denied to others) # Admin can execute drop_table (denied to backend) result = await execute_tool( agent_id=agent_id, server="postgres", tool="drop_table", args={"table": "temp_data"} ) assert result["isError"] is False # ============================================================================ # Policy Enforcement Tests # ============================================================================ class TestPolicyEnforcement: """Test policy-based access control across all tools.""" @pytest.mark.asyncio async def test_server_access_denial( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test that agents cannot access servers they're not allowed to.""" agent_id = "researcher" # Only allowed brave-search # list_servers should not include postgres servers = await list_servers(agent_id=agent_id) server_names = {s["name"] for s in servers} assert "postgres" not in server_names # get_server_tools should return error response = await get_server_tools( agent_id=agent_id, server="postgres" ) assert "error" in response assert "Access denied" in response["error"] assert len(response["tools"]) == 0 # execute_tool should raise ToolError with pytest.raises(ToolError) as exc_info: await execute_tool( agent_id=agent_id, server="postgres", tool="query", args={} ) assert "cannot access server" in str(exc_info.value) @pytest.mark.asyncio async def test_tool_access_denial( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test that agents cannot access specific tools they're denied.""" agent_id = "backend" # Denied drop_* on postgres # get_server_tools should filter out denied tools response = await get_server_tools( agent_id=agent_id, server="postgres" ) tool_names = {t["name"] for t in response["tools"]} assert "drop_table" not in tool_names # Explicitly denied assert "query" in tool_names # Explicitly allowed # execute_tool should raise ToolError for denied tool with pytest.raises(ToolError) as exc_info: await execute_tool( agent_id=agent_id, server="postgres", tool="drop_table", args={"table": "users"} ) assert "not authorized to call tool" in str(exc_info.value) @pytest.mark.asyncio async def test_wildcard_access( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test wildcard patterns in tool access.""" agent_id = "backend" # Allowed list_* on postgres response = await get_server_tools( agent_id=agent_id, server="postgres" ) # Should include list_tables (matches list_*) tool_names = {t["name"] for t in response["tools"]} assert "list_tables" in tool_names @pytest.mark.asyncio async def test_unknown_agent_denial( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test that unknown agents are denied (default policy).""" agent_id = "unknown_agent" # list_servers should return empty list servers = await list_servers(agent_id=agent_id) assert len(servers) == 0 # get_server_tools should return error response = await get_server_tools( agent_id=agent_id, server="brave-search" ) assert "error" in response assert "Access denied" in response["error"] # ============================================================================ # Concurrent Access Tests # ============================================================================ class TestConcurrentAccess: """Test concurrent requests from multiple agents.""" @pytest.mark.asyncio async def test_concurrent_requests_multiple_agents( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test multiple agents making concurrent requests. Simulates realistic load: - 3 agents (researcher, backend, admin) - Each makes 10 requests - Requests execute concurrently """ async def agent_workflow(agent_id: str, request_count: int): """Execute multiple requests for an agent.""" results = [] for i in range(request_count): # Vary the operation if i % 3 == 0: result = await list_servers(agent_id=agent_id) results.append(("list_servers", result)) elif i % 3 == 1: # Get tools based on agent permissions if agent_id == "researcher": server = "brave-search" elif agent_id == "backend": server = "postgres" else: server = "filesystem" result = await get_server_tools(agent_id=agent_id, server=server) results.append(("get_server_tools", result)) else: # Execute tool try: if agent_id == "researcher": result = await execute_tool( agent_id=agent_id, server="brave-search", tool="brave_web_search", args={"query": f"test {i}"} ) elif agent_id == "backend": result = await execute_tool( agent_id=agent_id, server="postgres", tool="query", args={"sql": f"SELECT {i}"} ) else: result = await execute_tool( agent_id=agent_id, server="filesystem", tool="read_file", args={"path": f"/file{i}.txt"} ) results.append(("execute_tool", result)) except Exception as e: results.append(("execute_tool", {"error": str(e)})) return results # Run concurrent workflows start_time = time.perf_counter() tasks = [ agent_workflow("researcher", 10), agent_workflow("backend", 10), agent_workflow("admin", 10) ] results = await asyncio.gather(*tasks) end_time = time.perf_counter() total_time_ms = (end_time - start_time) * 1000 # Verify all requests completed assert len(results) == 3 assert all(len(agent_results) == 10 for agent_results in results) # Verify no errors from allowed operations for agent_results in results: for op_type, result in agent_results: if op_type == "execute_tool" and isinstance(result, dict): # Check it's not an error result if "error" not in result or not result.get("isError"): assert True # Success # Performance check - 30 operations should complete in reasonable time # With mocked delays (5ms + 10ms per operation), expect ~450ms + overhead assert total_time_ms < 1000, f"Concurrent operations took {total_time_ms:.2f}ms" @pytest.mark.asyncio async def test_session_isolation( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test that agent sessions are isolated (no context mixing). Each agent should maintain independent state and not interfere with other agents' operations. """ # Track results by agent results = {} async def execute_for_agent(agent_id: str, server: str, tool: str, args: dict): """Execute tool and store result.""" result = await execute_tool( agent_id=agent_id, server=server, tool=tool, args=args ) results[agent_id] = result return result # Execute concurrently for different agents tasks = [ execute_for_agent( "researcher", "brave-search", "brave_web_search", {"query": "researcher query"} ), execute_for_agent( "backend", "postgres", "query", {"sql": "SELECT * FROM backend_data"} ), execute_for_agent( "admin", "filesystem", "read_file", {"path": "/admin/config.json"} ) ] await asyncio.gather(*tasks) # Verify each agent got correct result assert "researcher query" in results["researcher"]["content"][0]["text"] assert "backend" not in results["researcher"]["content"][0]["text"] # Each result should be independent assert len(results) == 3 assert all(not r["isError"] for r in results.values()) # ============================================================================ # Error Handling Tests # ============================================================================ class TestErrorHandling: """Test error handling and recovery scenarios.""" @pytest.mark.asyncio async def test_downstream_server_error( self, initialized_gateway, policy_engine, mcp_config ): """Test handling of errors from downstream servers.""" # Create mock that returns error mock_proxy = Mock(spec=ProxyManager) # Mock error from list_tools mock_proxy.list_tools = AsyncMock( side_effect=RuntimeError("Server connection failed") ) initialize_gateway(policy_engine, mcp_config, mock_proxy) response = await get_server_tools( agent_id="researcher", server="brave-search" ) assert "error" in response assert "Server unavailable" in response["error"] assert response["returned"] == 0 @pytest.mark.asyncio async def test_timeout_handling( self, initialized_gateway, policy_engine, mcp_config ): """Test timeout handling for long-running operations.""" # Create mock that simulates timeout mock_proxy = Mock(spec=ProxyManager) async def slow_call_tool(*args, **kwargs): """Simulate slow operation that times out.""" raise asyncio.TimeoutError("Operation timed out") mock_proxy.call_tool = AsyncMock(side_effect=slow_call_tool) initialize_gateway(policy_engine, mcp_config, mock_proxy) with pytest.raises(ToolError) as exc_info: await execute_tool( agent_id="researcher", server="brave-search", tool="brave_web_search", args={"query": "test"}, timeout_ms=100 ) assert "timed out" in str(exc_info.value).lower() @pytest.mark.asyncio async def test_server_not_found( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test handling of requests to non-existent servers.""" # Mock list_tools to raise KeyError mock_proxy_manager.list_tools = AsyncMock( side_effect=KeyError("Server 'nonexistent' not found") ) response = await get_server_tools( agent_id="admin", # Admin has access to all server="nonexistent" ) assert "error" in response assert "not found" in response["error"] assert response["returned"] == 0 @pytest.mark.asyncio async def test_tool_not_found( self, initialized_gateway, policy_engine, mcp_config ): """Test execution of non-existent tool.""" # Create mock that raises error for unknown tool mock_proxy = Mock(spec=ProxyManager) mock_proxy.call_tool = AsyncMock( side_effect=RuntimeError("Tool 'nonexistent_tool' not found") ) initialize_gateway(policy_engine, mcp_config, mock_proxy) with pytest.raises(ToolError) as exc_info: await execute_tool( agent_id="admin", server="brave-search", tool="nonexistent_tool", args={} ) # Error should mention the tool was not found assert "not found" in str(exc_info.value).lower() # ============================================================================ # Component Integration Tests # ============================================================================ class TestComponentIntegration: """Test integration between gateway components.""" @pytest.mark.asyncio async def test_middleware_agent_extraction(self, gateway_rules, mcp_config): """Test that middleware properly extracts and stores agent_id.""" policy_engine = PolicyEngine(gateway_rules) middleware = AgentAccessControl(policy_engine) # Create mock context and message from fastmcp.server.middleware import MiddlewareContext mock_message = Mock() mock_message.arguments = { "agent_id": "test_agent", "server": "brave-search", "other_arg": "value" } mock_fastmcp_context = Mock() mock_fastmcp_context.set_state = Mock() mock_context = Mock(spec=MiddlewareContext) mock_context.message = mock_message mock_context.fastmcp_context = mock_fastmcp_context # Mock call_next async def call_next(ctx): # Verify agent_id is kept in arguments for gateway tools assert "agent_id" in ctx.message.arguments assert ctx.message.arguments["agent_id"] == "test_agent" assert "server" in ctx.message.arguments assert "other_arg" in ctx.message.arguments return {"success": True} # Execute middleware result = await middleware.on_call_tool(mock_context, call_next) # Verify agent was stored in context mock_fastmcp_context.set_state.assert_called_once_with( "current_agent", "test_agent" ) assert result == {"success": True} @pytest.mark.asyncio async def test_proxy_manager_integration( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test that gateway correctly integrates with ProxyManager.""" # Execute a tool result = await execute_tool( agent_id="researcher", server="brave-search", tool="brave_web_search", args={"query": "integration test"} ) # Verify ProxyManager.call_tool was called correctly mock_proxy_manager.call_tool.assert_called_once() call_args = mock_proxy_manager.call_tool.call_args assert call_args[0][0] == "brave-search" # server assert call_args[0][1] == "brave_web_search" # tool assert call_args[0][2]["query"] == "integration test" # args @pytest.mark.asyncio async def test_policy_engine_integration( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test that gateway correctly uses PolicyEngine for access control.""" # Test different agents get different server lists researcher_servers = await list_servers(agent_id="researcher") backend_servers = await list_servers(agent_id="backend") admin_servers = await list_servers(agent_id="admin") # Verify policy engine applied correctly assert len(researcher_servers) == 1 # brave-search only assert len(backend_servers) == 2 # postgres, filesystem assert len(admin_servers) == 3 # all servers # Verify tool filtering backend_tools = await get_server_tools( agent_id="backend", server="postgres" ) tool_names = {t["name"] for t in backend_tools["tools"]} # Backend should not see drop_table (denied) assert "drop_table" not in tool_names # ============================================================================ # Performance Validation Tests # ============================================================================ class TestPerformance: """Test and validate performance targets.""" @pytest.mark.asyncio async def test_list_servers_latency( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Validate list_servers meets <50ms P95 target.""" latencies = [] # Run 100 iterations to get P95 measurement for _ in range(100): start = time.perf_counter() await list_servers(agent_id="researcher") end = time.perf_counter() latencies.append((end - start) * 1000) # Calculate P95 sorted_latencies = sorted(latencies) p95_index = int(95 * len(sorted_latencies) / 100) p95_latency = sorted_latencies[p95_index] assert p95_latency < 50, f"P95 latency {p95_latency:.2f}ms exceeds 50ms target" @pytest.mark.asyncio async def test_get_server_tools_latency( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Validate get_server_tools meets <300ms P95 target.""" latencies = [] # Run 100 iterations for _ in range(100): start = time.perf_counter() await get_server_tools( agent_id="researcher", server="brave-search" ) end = time.perf_counter() latencies.append((end - start) * 1000) # Calculate P95 sorted_latencies = sorted(latencies) p95_index = int(95 * len(sorted_latencies) / 100) p95_latency = sorted_latencies[p95_index] assert p95_latency < 300, f"P95 latency {p95_latency:.2f}ms exceeds 300ms target" @pytest.mark.asyncio async def test_execute_tool_overhead( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Validate execute_tool overhead is <30ms P95. Measures gateway overhead by comparing total time vs mock execution time. """ latencies = [] mock_execution_time = 10 # Mock adds 10ms delay # Run 100 iterations for _ in range(100): start = time.perf_counter() await execute_tool( agent_id="researcher", server="brave-search", tool="brave_web_search", args={"query": "test"} ) end = time.perf_counter() total_time = (end - start) * 1000 overhead = total_time - mock_execution_time latencies.append(overhead) # Calculate P95 sorted_latencies = sorted(latencies) p95_index = int(95 * len(sorted_latencies) / 100) p95_overhead = sorted_latencies[p95_index] assert p95_overhead < 30, f"P95 overhead {p95_overhead:.2f}ms exceeds 30ms target" @pytest.mark.asyncio async def test_overall_added_latency( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Validate overall gateway latency is <100ms P95. Measures end-to-end latency for complete workflow. """ latencies = [] mock_time = 15 # Mock delays: 5ms (list_tools) + 10ms (call_tool) # Run 50 complete workflows for _ in range(50): start = time.perf_counter() # Complete workflow await list_servers(agent_id="researcher") await get_server_tools(agent_id="researcher", server="brave-search") await execute_tool( agent_id="researcher", server="brave-search", tool="brave_web_search", args={"query": "test"} ) end = time.perf_counter() total_time = (end - start) * 1000 overhead = total_time - mock_time latencies.append(overhead) # Calculate P95 sorted_latencies = sorted(latencies) p95_index = int(95 * len(sorted_latencies) / 100) p95_latency = sorted_latencies[p95_index] assert p95_latency < 100, f"P95 overall latency {p95_latency:.2f}ms exceeds 100ms target" # ============================================================================ # Additional Edge Cases # ============================================================================ class TestEdgeCases: """Test edge cases and boundary conditions.""" @pytest.mark.asyncio async def test_empty_tool_list( self, initialized_gateway, policy_engine, mcp_config ): """Test handling of server with no tools.""" mock_proxy = Mock(spec=ProxyManager) mock_proxy.list_tools = AsyncMock(return_value=[]) initialize_gateway(policy_engine, mcp_config, mock_proxy) response = await get_server_tools( agent_id="researcher", server="brave-search" ) assert response["total_available"] == 0 assert response["returned"] == 0 assert len(response["tools"]) == 0 @pytest.mark.asyncio async def test_tool_name_filtering( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test filtering tools by explicit names.""" response = await get_server_tools( agent_id="researcher", server="brave-search", names="brave_web_search" # Only request this tool ) assert response["returned"] == 1 assert response["tools"][0]["name"] == "brave_web_search" @pytest.mark.asyncio async def test_tool_pattern_filtering( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test filtering tools by wildcard pattern.""" response = await get_server_tools( agent_id="researcher", server="brave-search", pattern="brave_*_search" # Match both tools ) assert response["returned"] == 2 tool_names = {t["name"] for t in response["tools"]} assert "brave_web_search" in tool_names assert "brave_local_search" in tool_names @pytest.mark.asyncio async def test_max_schema_tokens_limit( self, initialized_gateway, policy_engine, mcp_config, mock_proxy_manager ): """Test that max_schema_tokens limits returned tools.""" # Request with very low token limit response = await get_server_tools( agent_id="researcher", server="brave-search", max_schema_tokens=50 # Very low limit ) # Should return fewer tools due to token budget assert response["returned"] <= 2 assert response["tokens_used"] is not None assert response["tokens_used"] <= 50

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/roddutra/agent-mcp-gateway'

If you have feedback or need assistance with the MCP directory API, please join our Discord server