Skip to main content
Glama

Sumanshu Arora

testing.md50.5 kB
# Testing Templates **Comprehensive testing strategies for MCP server templates including unit testing, integration testing, and deployment validation with professional CI/CD practices.** ## Testing Philosophy Professional MCP template testing follows a multi-layered approach: 1. **Unit Testing**: Test individual functions and methods in isolation 2. **Integration Testing**: Test component interactions and external dependencies 3. **End-to-End Testing**: Test complete workflows from user perspective 4. **Deployment Testing**: Validate templates in real deployment environments 5. **Performance Testing**: Verify scalability and resource usage 6. **Security Testing**: Validate input handling and access controls ## Test Environment Setup ### Development Environment ```bash # Install test dependencies pip install -r requirements-dev.txt # Dependencies include: # - pytest (testing framework) # - pytest-asyncio (async test support) # - pytest-cov (coverage reporting) # - pytest-mock (mocking utilities) # - responses (HTTP request mocking) # - faker (test data generation) # - testcontainers (containerized testing) ``` ### Test Configuration *pytest.ini* ```ini [tool:pytest] asyncio_mode = auto testpaths = tests python_files = test_*.py python_classes = Test* python_functions = test_* markers = unit: Unit tests (fast, isolated) integration: Integration tests (slower, external deps) e2e: End-to-end tests (complete workflows) slow: Slow tests (performance, load testing) security: Security-focused tests addopts = --strict-markers --disable-warnings --tb=short -v filterwarnings = ignore::DeprecationWarning ignore::PendingDeprecationWarning ``` *conftest.py* ```python """ Shared test fixtures and configuration """ import pytest import asyncio from unittest.mock import Mock, AsyncMock import tempfile import json from typing import Dict, Any from src.config import Config from src.server import MyTemplateMCPServer from src.tools import MyTemplateTools @pytest.fixture(scope="session") def event_loop(): """Create event loop for async tests""" loop = asyncio.new_event_loop() yield loop loop.close() @pytest.fixture def mock_config(): """Mock configuration for testing""" return Config( api_key="test-api-key", api_base_url="https://api.test.com", timeout=30, rate_limit=100, connection_limit=10, supported_formats=["json", "csv", "parquet"], features=["feature1", "feature2"] ) @pytest.fixture def sample_data(): """Sample data for testing""" return [ {"id": 1, "name": "Alice", "category": "A", "value": 100}, {"id": 2, "name": "Bob", "category": "B", "value": 200}, {"id": 3, "name": "Charlie", "category": "A", "value": 150} ] @pytest.fixture def mock_http_session(): """Mock HTTP session for API testing""" session = AsyncMock() session.get = AsyncMock() session.post = AsyncMock() session.close = AsyncMock() return session @pytest.fixture async def tools_instance(mock_config): """Create tools instance for testing""" tools = MyTemplateTools(mock_config) yield tools await tools.close() @pytest.fixture async def server_instance(mock_config): """Create server instance for testing""" server = MyTemplateMCPServer(mock_config) yield server # Cleanup if needed @pytest.fixture def temp_config_file(): """Create temporary configuration file""" config_data = { "api_key": "test-key", "base_url": "https://api.test.com", "features": ["test_feature"] } with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(config_data, f) yield f.name # Cleanup handled by tempfile @pytest.fixture def mock_external_api(): """Mock external API responses""" responses = { "GET /data": {"items": [{"id": 1, "name": "test"}]}, "GET /health": {"status": "healthy"}, "POST /process": {"result": "processed"} } return responses ``` ## Unit Testing ### Testing Tools *tests/test_tools.py* ```python """ Unit tests for template tools """ import pytest from unittest.mock import Mock, AsyncMock, patch import aiohttp import json from src.tools import MyTemplateTools class TestDataFetching: """Test data fetching functionality""" @pytest.mark.unit @pytest.mark.asyncio async def test_fetch_data_success(self, tools_instance, mock_external_api): """Test successful data fetching""" # Setup mock response mock_response = AsyncMock() mock_response.status = 200 mock_response.json.return_value = mock_external_api["GET /data"] mock_response.headers = {"content-type": "application/json"} mock_response.raise_for_status.return_value = None with patch.object(tools_instance, 'session') as mock_session: mock_session.get.return_value.__aenter__.return_value = mock_response result = await tools_instance.fetch_data("data") # Assertions assert result["success"] is True assert result["data"] == mock_external_api["GET /data"] assert result["metadata"]["endpoint"] == "data" assert result["metadata"]["status_code"] == 200 # Verify API call mock_session.get.assert_called_once() call_args = mock_session.get.call_args assert "data" in call_args[0][0] @pytest.mark.unit @pytest.mark.asyncio async def test_fetch_data_with_filters(self, tools_instance): """Test data fetching with query filters""" filters = {"category": "test", "limit": 10} mock_response = AsyncMock() mock_response.status = 200 mock_response.json.return_value = {"filtered": "data"} mock_response.headers = {"content-type": "application/json"} mock_response.raise_for_status.return_value = None with patch.object(tools_instance, 'session') as mock_session: mock_session.get.return_value.__aenter__.return_value = mock_response result = await tools_instance.fetch_data("data", filters=filters) # Verify filters were passed as parameters call_args = mock_session.get.call_args params = call_args[1]["params"] assert params["category"] == "test" assert params["limit"] == 10 @pytest.mark.unit @pytest.mark.asyncio async def test_fetch_data_http_error(self, tools_instance): """Test handling of HTTP errors""" mock_response = AsyncMock() mock_response.raise_for_status.side_effect = aiohttp.ClientResponseError( request_info=Mock(), history=(), status=404, message="Not Found" ) with patch.object(tools_instance, 'session') as mock_session: mock_session.get.return_value.__aenter__.return_value = mock_response with pytest.raises(Exception, match="API request failed"): await tools_instance.fetch_data("nonexistent") @pytest.mark.unit @pytest.mark.asyncio async def test_fetch_data_timeout(self, tools_instance): """Test handling of request timeouts""" with patch.object(tools_instance, 'session') as mock_session: mock_session.get.side_effect = asyncio.TimeoutError() with pytest.raises(Exception, match="Request timeout"): await tools_instance.fetch_data("slow-endpoint") @pytest.mark.unit @pytest.mark.asyncio async def test_fetch_data_csv_format(self, tools_instance): """Test CSV format data fetching""" csv_data = "id,name\n1,test\n2,test2" mock_response = AsyncMock() mock_response.status = 200 mock_response.text.return_value = csv_data mock_response.headers = {"content-type": "text/csv"} mock_response.raise_for_status.return_value = None with patch.object(tools_instance, 'session') as mock_session: mock_session.get.return_value.__aenter__.return_value = mock_response result = await tools_instance.fetch_data("data", format="csv") assert result["success"] is True assert len(result["data"]) == 2 assert result["data"][0]["name"] == "test" class TestDataTransformation: """Test data transformation functionality""" @pytest.mark.unit @pytest.mark.asyncio async def test_transform_rename_field(self, tools_instance, sample_data): """Test field renaming transformation""" transformations = [{ "type": "rename_field", "from": "name", "to": "full_name" }] result = await tools_instance.transform_data(sample_data, transformations) assert result["success"] is True assert len(result["data"]) == 3 # Verify field was renamed for record in result["data"]: assert "full_name" in record assert "name" not in record assert result["data"][0]["full_name"] == "Alice" assert result["statistics"]["records_processed"] == 3 @pytest.mark.unit @pytest.mark.asyncio async def test_transform_add_field(self, tools_instance, sample_data): """Test field addition transformation""" transformations = [{ "type": "add_field", "name": "status", "value": "active" }] result = await tools_instance.transform_data(sample_data, transformations) assert result["success"] is True # Verify field was added for record in result["data"]: assert record["status"] == "active" @pytest.mark.unit @pytest.mark.asyncio async def test_transform_filter_field(self, tools_instance, sample_data): """Test field filtering transformation""" transformations = [{ "type": "filter_field", "field": "category", "condition": {"operator": "eq", "value": "A"} }] result = await tools_instance.transform_data(sample_data, transformations) assert result["success"] is True # Should only keep records with category "A" assert result["statistics"]["records_processed"] == 2 assert result["statistics"]["records_skipped"] == 1 # Verify filtered data for record in result["data"]: assert record["category"] == "A" @pytest.mark.unit @pytest.mark.asyncio async def test_transform_format_field(self, tools_instance, sample_data): """Test field formatting transformation""" transformations = [{ "type": "format_field", "field": "name", "format": "uppercase" }] result = await tools_instance.transform_data(sample_data, transformations) assert result["success"] is True # Verify field was formatted assert result["data"][0]["name"] == "ALICE" assert result["data"][1]["name"] == "BOB" @pytest.mark.unit @pytest.mark.asyncio async def test_transform_multiple_transformations(self, tools_instance, sample_data): """Test multiple transformations in sequence""" transformations = [ {"type": "format_field", "field": "name", "format": "uppercase"}, {"type": "add_field", "name": "processed", "value": True}, {"type": "rename_field", "from": "value", "to": "amount"} ] result = await tools_instance.transform_data(sample_data, transformations) assert result["success"] is True assert result["statistics"]["transformations_applied"] == 9 # 3 records * 3 transformations # Verify all transformations applied record = result["data"][0] assert record["name"] == "ALICE" # Formatted assert record["processed"] is True # Added assert "amount" in record # Renamed assert "value" not in record # Original field removed class TestDataExport: """Test data export functionality""" @pytest.mark.unit @pytest.mark.asyncio async def test_export_json(self, tools_instance, sample_data): """Test JSON export functionality""" result = await tools_instance.export_data(sample_data, format="json") assert result["success"] is True assert result["format"] == "json" assert result["records_exported"] == 3 assert result["bytes_written"] > 0 # Verify JSON is valid exported_data = result["data"] parsed_data = json.loads(exported_data) assert len(parsed_data) == 3 assert parsed_data[0]["name"] == "Alice" @pytest.mark.unit @pytest.mark.asyncio async def test_export_csv(self, tools_instance, sample_data): """Test CSV export functionality""" result = await tools_instance.export_data(sample_data, format="csv") assert result["success"] is True assert result["format"] == "csv" assert result["records_exported"] == 3 # Verify CSV format csv_data = result["data"] lines = csv_data.strip().split('\n') assert len(lines) == 4 # Header + 3 data rows assert "id,name,category,value" in lines[0] assert "1,Alice,A,100" in lines[1] @pytest.mark.unit @pytest.mark.asyncio async def test_export_with_destination(self, tools_instance, sample_data): """Test export with file destination""" with patch.object(tools_instance, '_write_to_destination') as mock_write: mock_write.return_value = None result = await tools_instance.export_data( sample_data, format="json", destination="/tmp/test.json" ) assert result["success"] is True assert result["destination"] == "/tmp/test.json" mock_write.assert_called_once() @pytest.mark.unit @pytest.mark.asyncio async def test_export_unsupported_format(self, tools_instance, sample_data): """Test handling of unsupported export formats""" with pytest.raises(Exception, match="Unsupported export format"): await tools_instance.export_data(sample_data, format="xml") class TestHealthChecks: """Test health check functionality""" @pytest.mark.unit @pytest.mark.asyncio async def test_api_health_check_success(self, tools_instance): """Test successful API health check""" mock_response = AsyncMock() mock_response.status = 200 with patch.object(tools_instance, 'session') as mock_session: mock_session.get.return_value.__aenter__.return_value = mock_response result = await tools_instance.check_api_health() assert result is True # Verify health endpoint was called call_args = mock_session.get.call_args assert "health" in call_args[0][0] @pytest.mark.unit @pytest.mark.asyncio async def test_api_health_check_failure(self, tools_instance): """Test failed API health check""" mock_response = AsyncMock() mock_response.status = 503 # Service unavailable with patch.object(tools_instance, 'session') as mock_session: mock_session.get.return_value.__aenter__.return_value = mock_response result = await tools_instance.check_api_health() assert result is False @pytest.mark.unit @pytest.mark.asyncio async def test_api_health_check_exception(self, tools_instance): """Test health check with network exception""" with patch.object(tools_instance, 'session') as mock_session: mock_session.get.side_effect = Exception("Network error") result = await tools_instance.check_api_health() assert result is False ``` ### Testing Configuration *tests/test_config.py* ```python """ Unit tests for configuration management """ import pytest import os import json import tempfile from unittest.mock import patch from src.config import Config, load_config class TestConfiguration: """Test configuration loading and validation""" @pytest.mark.unit def test_config_creation(self): """Test basic configuration creation""" config = Config( api_key="test-key", api_base_url="https://api.test.com" ) assert config.api_key == "test-key" assert config.api_base_url == "https://api.test.com" assert config.timeout == 30 # Default value @pytest.mark.unit def test_config_validation_success(self): """Test successful configuration validation""" config = Config( api_key="valid-key", api_base_url="https://api.test.com" ) assert config.validate() is True @pytest.mark.unit def test_config_validation_missing_api_key(self): """Test validation failure with missing API key""" config = Config( api_base_url="https://api.test.com" ) assert config.validate() is False @pytest.mark.unit def test_config_validation_invalid_url(self): """Test validation failure with invalid URL""" config = Config( api_key="test-key", api_base_url="not-a-url" ) assert config.validate() is False @pytest.mark.unit def test_load_config_from_env(self): """Test loading configuration from environment variables""" env_vars = { "CUSTOM_API_KEY": "env-api-key", "CUSTOM_BASE_URL": "https://env.api.com", "CUSTOM_FEATURES": "feature1,feature2,feature3" } with patch.dict(os.environ, env_vars): config = load_config() assert config.api_key == "env-api-key" assert config.api_base_url == "https://env.api.com" assert config.features == ["feature1", "feature2", "feature3"] @pytest.mark.unit def test_load_config_from_file(self, temp_config_file): """Test loading configuration from file""" with patch.dict(os.environ, {"MCP_CONFIG_FILE": temp_config_file}): config = load_config() assert config.api_key == "test-key" assert config.api_base_url == "https://api.test.com" @pytest.mark.unit def test_config_precedence(self, temp_config_file): """Test configuration precedence (env > file > defaults)""" env_vars = { "CUSTOM_API_KEY": "env-override-key", "MCP_CONFIG_FILE": temp_config_file } with patch.dict(os.environ, env_vars): config = load_config() # API key should come from environment (highest precedence) assert config.api_key == "env-override-key" # Base URL should come from file assert config.api_base_url == "https://api.test.com" @pytest.mark.unit def test_nested_config_from_env(self): """Test loading nested configuration from environment""" env_vars = { "CUSTOM_ADVANCED__TIMEOUT": "60", "CUSTOM_ADVANCED__RETRY_ATTEMPTS": "5" } with patch.dict(os.environ, env_vars): config = load_config() assert config.advanced["timeout"] == 60 assert config.advanced["retry_attempts"] == 5 ``` ## Integration Testing ### Testing External Dependencies *tests/test_integration.py* ```python """ Integration tests for external dependencies and workflows """ import pytest import asyncio from unittest.mock import patch import responses import aioresponses from src.server import MyTemplateMCPServer from src.tools import MyTemplateTools from src.config import Config class TestAPIIntegration: """Test integration with external APIs""" @pytest.mark.integration @pytest.mark.asyncio async def test_real_api_health_check(self): """Test health check against real API endpoint""" # Use httpbin.org for reliable testing config = Config( api_key="test-key", api_base_url="https://httpbin.org" ) tools = MyTemplateTools(config) try: # httpbin.org/status/200 returns 200 OK with patch.object(tools, 'config') as mock_config: mock_config.api_base_url = "https://httpbin.org" # Mock the health endpoint to use httpbin status with patch.object(tools, 'check_api_health') as mock_health: async def mock_health_check(): try: async with tools.session.get("https://httpbin.org/status/200") as response: return response.status == 200 except: return False mock_health.side_effect = mock_health_check result = await tools.check_api_health() assert result is True finally: await tools.close() @pytest.mark.integration @pytest.mark.asyncio async def test_fetch_data_with_mock_api(self): """Test data fetching with mocked API responses""" config = Config( api_key="test-key", api_base_url="https://api.test.com" ) tools = MyTemplateTools(config) # Mock API responses with aioresponses.aioresponses() as m: m.get( "https://api.test.com/data", payload={"items": [{"id": 1, "name": "test"}]}, status=200 ) result = await tools.fetch_data("data") assert result["success"] is True assert len(result["data"]["items"]) == 1 assert result["data"]["items"][0]["name"] == "test" await tools.close() @pytest.mark.integration @pytest.mark.asyncio async def test_fetch_data_api_error_handling(self): """Test handling of API errors in integration""" config = Config( api_key="test-key", api_base_url="https://api.test.com" ) tools = MyTemplateTools(config) # Mock API error response with aioresponses.aioresponses() as m: m.get( "https://api.test.com/error", status=500, payload={"error": "Internal server error"} ) with pytest.raises(Exception, match="API request failed"): await tools.fetch_data("error") await tools.close() class TestWorkflowIntegration: """Test complete workflow integration""" @pytest.mark.integration @pytest.mark.asyncio async def test_complete_data_pipeline(self): """Test complete data processing pipeline""" config = Config( api_key="test-key", api_base_url="https://api.test.com" ) server = MyTemplateMCPServer(config) # Mock the complete workflow sample_api_data = [ {"id": 1, "name": "alice", "category": "user"}, {"id": 2, "name": "bob", "category": "admin"} ] # Step 1: Fetch data (mocked) with patch.object(server.tools, 'fetch_data') as mock_fetch: mock_fetch.return_value = { "success": True, "data": sample_api_data } fetch_result = await server.tools.fetch_data("users") assert fetch_result["success"] is True # Step 2: Transform data (real implementation) transformations = [ {"type": "format_field", "field": "name", "format": "uppercase"}, {"type": "add_field", "name": "processed_at", "value": "2025-01-27T16:47:30Z"} ] transform_result = await server.tools.transform_data( fetch_result["data"], transformations ) assert transform_result["success"] is True assert transform_result["data"][0]["name"] == "ALICE" assert "processed_at" in transform_result["data"][0] # Step 3: Export data (real implementation) export_result = await server.tools.export_data( transform_result["data"], format="json" ) assert export_result["success"] is True assert export_result["records_exported"] == 2 @pytest.mark.integration @pytest.mark.asyncio async def test_error_recovery_workflow(self): """Test workflow error recovery""" config = Config( api_key="test-key", api_base_url="https://api.test.com" ) server = MyTemplateMCPServer(config) # Test fetch failure recovery with patch.object(server.tools, 'fetch_data') as mock_fetch: mock_fetch.side_effect = Exception("API unavailable") try: await server.tools.fetch_data("users") assert False, "Should have raised exception" except Exception as e: assert "API unavailable" in str(e) # Test transformation with invalid data invalid_data = [{"incomplete": "data"}] transformations = [{"type": "rename_field", "from": "nonexistent", "to": "new"}] result = await server.tools.transform_data(invalid_data, transformations) # Should handle gracefully without complete failure assert result["success"] is True class TestConfigurationIntegration: """Test configuration loading in integration scenarios""" @pytest.mark.integration def test_config_file_loading(self, temp_config_file): """Test configuration loading from file""" with patch.dict(os.environ, {"MCP_CONFIG_FILE": temp_config_file}): config = load_config() assert config.api_key == "test-key" assert config.validate() is True @pytest.mark.integration def test_environment_override(self, temp_config_file): """Test environment variable override of config file""" env_vars = { "MCP_CONFIG_FILE": temp_config_file, "CUSTOM_API_KEY": "override-key" } with patch.dict(os.environ, env_vars): config = load_config() # Environment should override file assert config.api_key == "override-key" # File values should still be loaded assert config.api_base_url == "https://api.test.com" ``` ## End-to-End Testing ### Full System Testing *tests/test_e2e.py* ```python """ End-to-end tests for complete system functionality """ import pytest import asyncio import json import tempfile from unittest.mock import patch import docker import requests from src.server import MyTemplateMCPServer class TestContainerDeployment: """Test containerized deployment scenarios""" @pytest.mark.e2e @pytest.mark.slow def test_docker_container_health(self): """Test template running in Docker container""" # This test requires Docker to be available try: client = docker.from_env() # Build test image image = client.images.build( path=".", tag="my-template:test", quiet=False )[0] # Run container with test configuration container = client.containers.run( "my-template:test", environment={ "CUSTOM_API_KEY": "test-key", "CUSTOM_BASE_URL": "https://httpbin.org" }, ports={"8080/tcp": 8080}, detach=True, remove=True ) try: # Wait for container to start asyncio.sleep(5) # Test health endpoint response = requests.get("http://localhost:8080/health", timeout=10) assert response.status_code == 200 health_data = response.json() assert health_data["status"] == "healthy" finally: container.stop() except docker.errors.DockerException: pytest.skip("Docker not available for testing") @pytest.mark.e2e @pytest.mark.asyncio async def test_mcp_protocol_compliance(self): """Test MCP protocol compliance""" config = load_config() server = MyTemplateMCPServer(config) # Test tool discovery tools = await server._discover_tools() assert len(tools) > 0 # Test each tool has required MCP fields for tool in tools: assert "name" in tool assert "description" in tool assert "inputSchema" in tool # Test tool execution for tool_name in ["fetch_data", "transform_data", "export_data"]: tool_info = next((t for t in tools if t["name"] == tool_name), None) assert tool_info is not None, f"Tool {tool_name} not found" class TestUserWorkflows: """Test realistic user workflows""" @pytest.mark.e2e @pytest.mark.asyncio async def test_data_analyst_workflow(self): """Test typical data analyst workflow""" # Scenario: Data analyst needs to fetch, clean, and export data server = MyTemplateMCPServer() # Step 1: Fetch customer data with patch.object(server.tools, 'fetch_data') as mock_fetch: mock_fetch.return_value = { "success": True, "data": [ {"id": 1, "name": " Alice Johnson ", "email": "alice@example.com", "status": "active"}, {"id": 2, "name": "Bob Smith", "email": "BOB@EXAMPLE.COM", "status": "inactive"}, {"id": 3, "name": "Charlie Brown", "email": "charlie@example.com", "status": "active"} ] } # Fetch data raw_data = await server.tools.fetch_data("customers") assert raw_data["success"] is True # Step 2: Clean and transform data cleaning_transformations = [ {"type": "format_field", "field": "name", "format": "strip"}, {"type": "format_field", "field": "email", "format": "lowercase"}, {"type": "filter_field", "field": "status", "condition": {"operator": "eq", "value": "active"}}, {"type": "add_field", "name": "processed_date", "value": "2025-01-27"} ] cleaned_data = await server.tools.transform_data( raw_data["data"], cleaning_transformations ) assert cleaned_data["success"] is True assert cleaned_data["statistics"]["records_processed"] == 2 # Only active users assert cleaned_data["data"][0]["name"] == "Alice Johnson" # Trimmed assert cleaned_data["data"][0]["email"] == "alice@example.com" # Lowercase # Step 3: Export for analysis export_result = await server.tools.export_data( cleaned_data["data"], format="csv" ) assert export_result["success"] is True assert export_result["records_exported"] == 2 # Verify CSV format is correct csv_content = export_result["data"] lines = csv_content.strip().split('\n') assert len(lines) == 3 # Header + 2 data rows assert "processed_date" in lines[0] # Added field in header @pytest.mark.e2e @pytest.mark.asyncio async def test_developer_integration_workflow(self): """Test developer integration workflow""" # Scenario: Developer integrating MCP server into application server = MyTemplateMCPServer() # Test 1: Verify server health health_result = await server._health_check() assert health_result["status"] in ["healthy", "unhealthy"] assert "checks" in health_result # Test 2: Discover available tools tools = await server._discover_tools() expected_tools = ["fetch_data", "transform_data", "export_data", "health_check"] for expected_tool in expected_tools: tool_found = any(tool["name"] == expected_tool for tool in tools) assert tool_found, f"Tool {expected_tool} not found" # Test 3: Validate tool schemas for tool in tools: assert "inputSchema" in tool schema = tool["inputSchema"] assert "type" in schema assert schema["type"] == "object" if "properties" in schema: assert isinstance(schema["properties"], dict) # Test 4: Test error handling with patch.object(server.tools, 'fetch_data') as mock_fetch: mock_fetch.side_effect = Exception("Simulated failure") try: await server.tools.fetch_data("test") assert False, "Should have raised exception" except Exception as e: assert "Simulated failure" in str(e) class TestPerformanceScenarios: """Test performance under various scenarios""" @pytest.mark.e2e @pytest.mark.slow @pytest.mark.asyncio async def test_large_dataset_processing(self): """Test processing of large datasets""" # Generate large dataset large_dataset = [ {"id": i, "name": f"User_{i}", "value": i * 10} for i in range(1000) # 1000 records ] server = MyTemplateMCPServer() # Test transformation performance transformations = [ {"type": "add_field", "name": "processed", "value": True}, {"type": "format_field", "field": "name", "format": "uppercase"} ] import time start_time = time.time() result = await server.tools.transform_data(large_dataset, transformations) end_time = time.time() processing_time = end_time - start_time # Performance assertions assert result["success"] is True assert result["statistics"]["records_processed"] == 1000 assert processing_time < 5.0 # Should process within 5 seconds # Verify transformations applied correctly assert result["data"][0]["processed"] is True assert result["data"][0]["name"] == "USER_0" @pytest.mark.e2e @pytest.mark.slow @pytest.mark.asyncio async def test_concurrent_requests(self): """Test handling of concurrent requests""" server = MyTemplateMCPServer() # Create multiple concurrent tasks async def fetch_task(endpoint): with patch.object(server.tools, 'fetch_data') as mock_fetch: mock_fetch.return_value = { "success": True, "data": [{"endpoint": endpoint, "timestamp": "2025-01-27T16:47:30Z"}] } return await server.tools.fetch_data(endpoint) # Run concurrent tasks tasks = [fetch_task(f"endpoint_{i}") for i in range(10)] results = await asyncio.gather(*tasks) # Verify all tasks completed successfully assert len(results) == 10 for result in results: assert result["success"] is True ``` ## Deployment Testing ### Template Validation ```bash # Template structure validation mcpt validate my-template --comprehensive # Configuration schema validation mcpt config my-template --validate-schema # Docker build testing mcpt build my-template --test # Deployment smoke test mcpt deploy my-template --test-mode --timeout 60 ``` ### Integration Testing with MCP Platform *tests/test_platform_integration.py* ```python """ Test integration with MCP platform """ import pytest import requests import time from unittest.mock import patch class TestPlatformIntegration: """Test integration with MCP template platform""" @pytest.mark.integration def test_template_deployment(self): """Test template deployment through platform""" # This would typically use the platform API deployment_config = { "template_id": "my-template", "environment": "test", "config": { "api_key": "test-key", "base_url": "https://api.test.com" } } # Mock platform deployment with patch('requests.post') as mock_post: mock_post.return_value.status_code = 200 mock_post.return_value.json.return_value = { "deployment_id": "test-deployment-123", "status": "running", "endpoint": "http://localhost:8080" } # Simulate deployment request response = requests.post( "http://platform.api/deploy", json=deployment_config ) assert response.status_code == 200 deployment_info = response.json() assert deployment_info["status"] == "running" @pytest.mark.integration def test_template_discovery(self): """Test template discovery by platform""" # Mock template registry API with patch('requests.get') as mock_get: mock_get.return_value.status_code = 200 mock_get.return_value.json.return_value = { "tools": [ { "name": "fetch_data", "description": "Fetch data from API", "parameters": {"endpoint": {"type": "string"}} } ] } response = requests.get("http://template.endpoint/tools") assert response.status_code == 200 tools = response.json()["tools"] assert len(tools) > 0 assert tools[0]["name"] == "fetch_data" ``` ## Performance Testing ### Load Testing *tests/test_performance.py* ```python """ Performance tests for template functionality """ import pytest import asyncio import time import statistics from concurrent.futures import ThreadPoolExecutor from src.server import MyTemplateMCPServer class TestPerformance: """Performance testing suite""" @pytest.mark.slow @pytest.mark.asyncio async def test_transformation_performance(self): """Test data transformation performance""" server = MyTemplateMCPServer() # Generate test data of various sizes test_sizes = [100, 500, 1000, 5000] performance_results = {} for size in test_sizes: test_data = [ {"id": i, "name": f"item_{i}", "value": i * 2} for i in range(size) ] transformations = [ {"type": "add_field", "name": "processed", "value": True}, {"type": "format_field", "field": "name", "format": "uppercase"} ] # Measure performance start_time = time.time() result = await server.tools.transform_data(test_data, transformations) end_time = time.time() duration = end_time - start_time performance_results[size] = duration # Verify correctness assert result["success"] is True assert result["statistics"]["records_processed"] == size print(f"Processed {size} records in {duration:.3f}s ({size/duration:.1f} records/sec)") # Performance regression test # Processing should be roughly linear with data size assert performance_results[5000] < performance_results[1000] * 6 # Allow some overhead @pytest.mark.slow @pytest.mark.asyncio async def test_concurrent_tool_execution(self): """Test concurrent tool execution performance""" server = MyTemplateMCPServer() async def test_tool_call(): """Single tool call for concurrency testing""" test_data = [{"id": 1, "name": "test"}] transformations = [{"type": "add_field", "name": "test", "value": True}] start = time.time() result = await server.tools.transform_data(test_data, transformations) duration = time.time() - start return {"success": result["success"], "duration": duration} # Test different concurrency levels concurrency_levels = [1, 5, 10, 20] for concurrency in concurrency_levels: tasks = [test_tool_call() for _ in range(concurrency)] start_time = time.time() results = await asyncio.gather(*tasks) total_time = time.time() - start_time # Verify all tasks succeeded success_count = sum(1 for r in results if r["success"]) assert success_count == concurrency # Calculate performance metrics durations = [r["duration"] for r in results] avg_duration = statistics.mean(durations) print(f"Concurrency {concurrency}: {total_time:.3f}s total, {avg_duration:.3f}s avg per task") # Performance assertions assert total_time < concurrency * 2.0 # Should benefit from concurrency assert avg_duration < 1.0 # Each task should be reasonably fast ``` ## Security Testing ### Security Validation *tests/test_security.py* ```python """ Security tests for template functionality """ import pytest from unittest.mock import patch from src.tools import MyTemplateTools from src.config import Config class TestSecurity: """Security testing suite""" @pytest.mark.security @pytest.mark.asyncio async def test_input_validation(self): """Test input validation and sanitization""" config = Config(api_key="test-key", api_base_url="https://api.test.com") tools = MyTemplateTools(config) # Test SQL injection attempts malicious_inputs = [ "'; DROP TABLE users; --", "<script>alert('xss')</script>", "../../etc/passwd", "\x00\x01\x02", # Binary data "A" * 10000, # Large input ] for malicious_input in malicious_inputs: try: # Should handle malicious input gracefully with patch.object(tools, 'session'): result = await tools.fetch_data(malicious_input) # If no exception, verify result is safe if isinstance(result, dict) and result.get("success"): # Verify no malicious content in response response_str = str(result) assert "<script>" not in response_str assert "DROP TABLE" not in response_str except Exception as e: # Exception is acceptable for malicious input assert "validation" in str(e).lower() or "invalid" in str(e).lower() await tools.close() @pytest.mark.security @pytest.mark.asyncio async def test_api_key_protection(self): """Test API key is not exposed in logs or responses""" config = Config(api_key="secret-api-key-123", api_base_url="https://api.test.com") tools = MyTemplateTools(config) # Mock API call that might log request details with patch.object(tools, 'session') as mock_session: mock_response = AsyncMock() mock_response.status = 200 mock_response.json.return_value = {"data": "test"} mock_response.headers = {"content-type": "application/json"} mock_response.raise_for_status.return_value = None mock_session.get.return_value.__aenter__.return_value = mock_response result = await tools.fetch_data("test") # Verify API key is not in response response_str = str(result) assert "secret-api-key-123" not in response_str # Verify API key is not in logged request details # (This would require checking actual log output in a real scenario) await tools.close() @pytest.mark.security def test_configuration_validation(self): """Test configuration security validation""" # Test invalid/insecure configurations insecure_configs = [ {"api_key": "", "api_base_url": "https://api.test.com"}, # Empty API key {"api_key": "test", "api_base_url": "http://api.test.com"}, # HTTP instead of HTTPS {"api_key": "test", "api_base_url": "javascript:alert('xss')"}, # Invalid URL scheme ] for config_data in insecure_configs: config = Config(**config_data) assert config.validate() is False, f"Should reject insecure config: {config_data}" ``` ## Continuous Integration ### CI Pipeline Configuration *.github/workflows/test.yml* ```yaml name: Template Testing on: push: branches: [ main, develop ] pull_request: branches: [ main ] jobs: unit-tests: runs-on: ubuntu-latest strategy: matrix: python-version: [3.8, 3.9, "3.10", "3.11"] steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements-dev.txt pip install -e . - name: Run unit tests run: | pytest tests/ -m unit --cov=src --cov-report=xml - name: Upload coverage uses: codecov/codecov-action@v3 with: file: ./coverage.xml fail_ci_if_error: true integration-tests: runs-on: ubuntu-latest services: redis: image: redis:alpine options: >- --health-cmd "redis-cli ping" --health-interval 10s --health-timeout 5s --health-retries 5 steps: - uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements-dev.txt pip install -e . - name: Run integration tests run: | pytest tests/ -m integration env: TEST_REDIS_URL: redis://localhost:6379 security-tests: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements-dev.txt pip install bandit safety - name: Run security tests run: | pytest tests/ -m security bandit -r src/ safety check - name: Run SAST scan uses: github/super-linter@v4 env: DEFAULT_BRANCH: main GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} e2e-tests: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: "3.10" - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 - name: Build Docker image run: | docker build -t my-template:test . - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements-dev.txt pip install docker - name: Run E2E tests run: | pytest tests/ -m e2e env: DOCKER_IMAGE: my-template:test ``` ## Testing Best Practices ### Test Organization 1. **Clear Test Structure** - Group tests by functionality - Use descriptive test names - Follow arrange-act-assert pattern 2. **Comprehensive Coverage** - Unit tests for all functions - Integration tests for external dependencies - End-to-end tests for user workflows 3. **Mock Strategy** - Mock external dependencies - Use real implementations for internal logic - Provide realistic test data 4. **Performance Awareness** - Mark slow tests appropriately - Use parallel execution where possible - Monitor test execution time ### Quality Gates ```bash # Complete quality gate script make quality-gate # Individual checks make test # Run all tests make test-unit # Unit tests only make test-integration # Integration tests make test-e2e # End-to-end tests make coverage # Coverage report make lint # Code linting make security # Security checks ``` ## Troubleshooting Tests ### Common Issues 1. **Async Test Failures** ```python # Ensure proper async test setup @pytest.mark.asyncio async def test_async_function(): result = await async_function() assert result["success"] is True ``` 2. **Mock Configuration** ```python # Proper mock setup for HTTP clients with patch.object(tools, 'session') as mock_session: mock_response = AsyncMock() mock_session.get.return_value.__aenter__.return_value = mock_response ``` 3. **Resource Cleanup** ```python # Always clean up resources @pytest.fixture async def tools_instance(): tools = MyTemplateTools(config) yield tools await tools.close() # Cleanup ``` ## Getting Help ### Community Resources - **Documentation**: [Testing guides](../guides/testing.md) - **GitHub Issues**: [Report test issues](https://github.com/Data-Everything/mcp-server-templates/issues) - **Discord Community**: [Join testing discussions](https://discord.gg/55Cfxe9gnr) ### Professional Support - **Custom Testing**: Professional test suite development - **CI/CD Setup**: Automated testing pipeline configuration - **Performance Optimization**: Test performance optimization - **Contact**: [support@dataeverything.ai](mailto:support@dataeverything.ai) --- **Next Steps:** - [Learn about deployment testing](../cli/deploy.md) - [Explore development workflows](development.md) - [View testing examples](../examples/testing.md) - [Contribute testing improvements](../guides/contributing.md)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Data-Everything/mcp-server-templates'

If you have feedback or need assistance with the MCP directory API, please join our Discord server