Skip to main content
Glama

MockLoop MCP Server

Official
by MockLoop
testing.mdโ€ข32.8 kB
# Testing Guide This comprehensive guide covers testing practices, strategies, and tools for MockLoop MCP. Our testing approach ensures reliability, maintainability, and confidence in the codebase through multiple layers of testing. ## Overview MockLoop MCP uses a multi-layered testing strategy: - **Unit Tests**: Test individual functions and classes in isolation - **Integration Tests**: Test component interactions and workflows - **End-to-End Tests**: Test complete user scenarios - **Performance Tests**: Validate performance characteristics - **Security Tests**: Verify security controls and data protection - **Contract Tests**: Ensure API compatibility ## Testing Framework and Tools ### Core Testing Stack ```python # pytest - Primary testing framework import pytest import pytest_asyncio import pytest_mock # Testing utilities from unittest.mock import Mock, patch, AsyncMock from faker import Faker from factory_boy import Factory, Sequence # HTTP testing import httpx from fastapi.testclient import TestClient # Database testing import pytest_postgresql from sqlalchemy import create_engine from alembic import command from alembic.config import Config # Performance testing import pytest_benchmark import locust # Security testing import bandit import safety ``` ### Test Configuration ```python # conftest.py - Global test configuration import pytest import asyncio from pathlib import Path from typing import AsyncGenerator, Generator from mockloop_mcp.core import create_app from mockloop_mcp.database import Database, get_database from mockloop_mcp.config import Settings, get_settings # Test settings @pytest.fixture(scope="session") def test_settings() -> Settings: """Provide test-specific settings.""" return Settings( database_url="sqlite:///test.db", environment="test", log_level="DEBUG", auth_enabled=False, storage_enabled=True, webhook_enabled=False ) # Database fixtures @pytest.fixture(scope="session") async def database(test_settings: Settings) -> AsyncGenerator[Database, None]: """Provide test database connection.""" db = Database(test_settings.database_url) await db.connect() # Run migrations alembic_cfg = Config("alembic.ini") alembic_cfg.set_main_option("sqlalchemy.url", test_settings.database_url) command.upgrade(alembic_cfg, "head") yield db await db.disconnect() @pytest.fixture async def clean_database(database: Database) -> AsyncGenerator[Database, None]: """Provide clean database for each test.""" # Start transaction async with database.transaction(): yield database # Rollback transaction after test # Application fixtures @pytest.fixture def app(test_settings: Settings, clean_database: Database): """Provide FastAPI test application.""" app = create_app(test_settings) # Override dependencies app.dependency_overrides[get_settings] = lambda: test_settings app.dependency_overrides[get_database] = lambda: clean_database return app @pytest.fixture def client(app) -> TestClient: """Provide HTTP test client.""" return TestClient(app) @pytest.fixture async def async_client(app) -> AsyncGenerator[httpx.AsyncClient, None]: """Provide async HTTP test client.""" async with httpx.AsyncClient(app=app, base_url="http://test") as client: yield client ``` ## Unit Testing ### Testing Core Classes ```python # test_mock_generator.py import pytest from unittest.mock import Mock, patch from pathlib import Path from mockloop_mcp.generator import MockServerGenerator from mockloop_mcp.models import GenerationOptions, GenerationResult from mockloop_mcp.exceptions import SpecificationError, GenerationError class TestMockServerGenerator: """Test mock server generation functionality.""" @pytest.fixture def generator(self): """Provide generator instance.""" return MockServerGenerator() @pytest.fixture def valid_openapi_spec(self, tmp_path): """Provide valid OpenAPI specification.""" spec_content = { "openapi": "3.0.0", "info": {"title": "Test API", "version": "1.0.0"}, "paths": { "/users": { "get": { "responses": { "200": { "description": "Success", "content": { "application/json": { "schema": { "type": "array", "items": {"$ref": "#/components/schemas/User"} } } } } } } } }, "components": { "schemas": { "User": { "type": "object", "properties": { "id": {"type": "integer"}, "name": {"type": "string"}, "email": {"type": "string", "format": "email"} } } } } } spec_file = tmp_path / "openapi.json" spec_file.write_text(json.dumps(spec_content)) return str(spec_file) def test_generate_with_valid_spec(self, generator, valid_openapi_spec, tmp_path): """Test successful generation with valid specification.""" options = GenerationOptions( output_dir=str(tmp_path / "output"), auth_enabled=True, storage_enabled=True ) result = generator.generate(valid_openapi_spec, options) assert isinstance(result, GenerationResult) assert result.success is True assert result.server_info is not None assert result.server_info.name == "test-api" assert len(result.generated_files) > 0 # Verify generated files exist output_dir = Path(options.output_dir) assert (output_dir / "main.py").exists() assert (output_dir / "requirements.txt").exists() assert (output_dir / "Dockerfile").exists() def test_generate_with_invalid_spec_raises_error(self, generator, tmp_path): """Test that invalid specification raises SpecificationError.""" invalid_spec = tmp_path / "invalid.json" invalid_spec.write_text('{"invalid": "spec"}') options = GenerationOptions(output_dir=str(tmp_path / "output")) with pytest.raises(SpecificationError) as exc_info: generator.generate(str(invalid_spec), options) assert "Invalid OpenAPI specification" in str(exc_info.value) assert exc_info.value.spec_path == str(invalid_spec) def test_generate_with_nonexistent_spec_raises_error(self, generator, tmp_path): """Test that nonexistent specification file raises error.""" nonexistent_spec = str(tmp_path / "nonexistent.json") options = GenerationOptions(output_dir=str(tmp_path / "output")) with pytest.raises(SpecificationError): generator.generate(nonexistent_spec, options) @patch('mockloop_mcp.generator.template_engine') def test_generate_with_template_error_raises_generation_error( self, mock_template_engine, generator, valid_openapi_spec, tmp_path ): """Test that template errors raise GenerationError.""" mock_template_engine.render.side_effect = Exception("Template error") options = GenerationOptions(output_dir=str(tmp_path / "output")) with pytest.raises(GenerationError) as exc_info: generator.generate(valid_openapi_spec, options) assert exc_info.value.stage == "template_rendering" def test_validate_specification_with_valid_spec(self, generator, valid_openapi_spec): """Test specification validation with valid spec.""" result = generator.validate_specification(valid_openapi_spec) assert result.is_valid is True assert len(result.errors) == 0 assert result.spec_type == "openapi" assert result.version == "3.0.0" def test_validate_specification_with_invalid_spec(self, generator, tmp_path): """Test specification validation with invalid spec.""" invalid_spec = tmp_path / "invalid.json" invalid_spec.write_text('{"openapi": "3.0.0"}') # Missing required fields result = generator.validate_specification(str(invalid_spec)) assert result.is_valid is False assert len(result.errors) > 0 assert any("info" in error for error in result.errors) ``` ### Testing Database Operations ```python # test_database.py import pytest from datetime import datetime, timedelta from mockloop_mcp.database import Database from mockloop_mcp.models import RequestLog, ServerConfig class TestDatabase: """Test database operations.""" @pytest.mark.asyncio async def test_insert_request_log(self, clean_database: Database): """Test inserting request log.""" log_data = { "method": "GET", "path": "/users", "query_params": {"limit": "10"}, "headers": {"Accept": "application/json"}, "body": None, "status_code": 200, "response_body": {"users": []}, "response_time": 0.123, "timestamp": datetime.utcnow() } log_id = await clean_database.insert_request_log(log_data) assert log_id is not None # Verify log was inserted log = await clean_database.get_request_log(log_id) assert log is not None assert log.method == "GET" assert log.path == "/users" assert log.status_code == 200 @pytest.mark.asyncio async def test_query_request_logs_with_filters(self, clean_database: Database): """Test querying request logs with filters.""" # Insert test data base_time = datetime.utcnow() logs = [ { "method": "GET", "path": "/users", "status_code": 200, "timestamp": base_time }, { "method": "POST", "path": "/users", "status_code": 201, "timestamp": base_time + timedelta(minutes=1) }, { "method": "GET", "path": "/posts", "status_code": 200, "timestamp": base_time + timedelta(minutes=2) } ] for log_data in logs: await clean_database.insert_request_log(log_data) # Test method filter get_logs = await clean_database.query_request_logs(method="GET") assert len(get_logs) == 2 assert all(log.method == "GET" for log in get_logs) # Test path filter user_logs = await clean_database.query_request_logs(path_pattern="/users") assert len(user_logs) == 2 assert all("/users" in log.path for log in user_logs) # Test time range filter recent_logs = await clean_database.query_request_logs( time_from=base_time + timedelta(minutes=0.5), time_to=base_time + timedelta(minutes=1.5) ) assert len(recent_logs) == 1 assert recent_logs[0].method == "POST" @pytest.mark.asyncio async def test_transaction_rollback(self, clean_database: Database): """Test transaction rollback on error.""" initial_count = await clean_database.count_request_logs() try: async with clean_database.transaction(): # Insert valid log await clean_database.insert_request_log({ "method": "GET", "path": "/test", "status_code": 200, "timestamp": datetime.utcnow() }) # Raise error to trigger rollback raise ValueError("Test error") except ValueError: pass # Verify rollback occurred final_count = await clean_database.count_request_logs() assert final_count == initial_count ``` ### Testing API Endpoints ```python # test_api.py import pytest from fastapi.testclient import TestClient class TestMockServerAPI: """Test mock server API endpoints.""" def test_health_check(self, client: TestClient): """Test health check endpoint.""" response = client.get("/health") assert response.status_code == 200 assert response.json() == { "status": "healthy", "timestamp": pytest.approx(datetime.utcnow().timestamp(), abs=1) } def test_get_server_info(self, client: TestClient): """Test server info endpoint.""" response = client.get("/admin/info") assert response.status_code == 200 data = response.json() assert "name" in data assert "version" in data assert "uptime" in data assert "request_count" in data def test_list_request_logs(self, client: TestClient): """Test listing request logs.""" # Make some requests to generate logs client.get("/users") client.post("/users", json={"name": "Test User"}) response = client.get("/admin/logs") assert response.status_code == 200 data = response.json() assert "logs" in data assert "total" in data assert len(data["logs"]) >= 2 def test_list_request_logs_with_filters(self, client: TestClient): """Test listing request logs with filters.""" # Generate test requests client.get("/users") client.post("/users", json={"name": "Test"}) client.get("/posts") # Filter by method response = client.get("/admin/logs?method=GET") assert response.status_code == 200 data = response.json() assert all(log["method"] == "GET" for log in data["logs"]) # Filter by path response = client.get("/admin/logs?path_pattern=/users") assert response.status_code == 200 data = response.json() assert all("/users" in log["path"] for log in data["logs"]) def test_update_response_data(self, client: TestClient): """Test updating response data.""" new_data = {"users": [{"id": 1, "name": "Updated User"}]} response = client.put( "/admin/responses/users", json=new_data ) assert response.status_code == 200 # Verify updated data is returned response = client.get("/users") assert response.status_code == 200 assert response.json() == new_data def test_create_scenario(self, client: TestClient): """Test creating a new scenario.""" scenario_config = { "name": "test_scenario", "description": "Test scenario", "responses": { "/users": {"users": [{"id": 1, "name": "Test User"}]}, "/posts": {"posts": []} } } response = client.post("/admin/scenarios", json=scenario_config) assert response.status_code == 201 data = response.json() assert data["name"] == "test_scenario" assert "id" in data def test_switch_scenario(self, client: TestClient): """Test switching to a different scenario.""" # Create scenario first scenario_config = { "name": "test_scenario", "responses": { "/users": {"users": [{"id": 999, "name": "Scenario User"}]} } } create_response = client.post("/admin/scenarios", json=scenario_config) scenario_id = create_response.json()["id"] # Switch to scenario response = client.post(f"/admin/scenarios/{scenario_id}/activate") assert response.status_code == 200 # Verify scenario is active response = client.get("/users") assert response.status_code == 200 data = response.json() assert data["users"][0]["id"] == 999 ``` ## Integration Testing ### Testing Component Interactions ```python # test_integration.py import pytest import asyncio from pathlib import Path from mockloop_mcp.generator import MockServerGenerator from mockloop_mcp.server import MockServer from mockloop_mcp.database import Database class TestMockServerIntegration: """Test integration between components.""" @pytest.mark.asyncio async def test_full_generation_and_startup_workflow(self, tmp_path, test_settings): """Test complete workflow from generation to server startup.""" # 1. Generate mock server generator = MockServerGenerator() spec_path = self.create_test_spec(tmp_path) output_dir = tmp_path / "generated_server" options = GenerationOptions( output_dir=str(output_dir), auth_enabled=True, storage_enabled=True ) result = generator.generate(spec_path, options) assert result.success is True # 2. Start generated server server = MockServer.from_directory(output_dir) await server.start() try: # 3. Test server functionality async with httpx.AsyncClient(base_url=server.base_url) as client: # Test health endpoint response = await client.get("/health") assert response.status_code == 200 # Test generated API endpoint response = await client.get("/users") assert response.status_code == 200 # Test admin endpoints response = await client.get("/admin/info") assert response.status_code == 200 # Test request logging response = await client.get("/admin/logs") assert response.status_code == 200 logs = response.json()["logs"] assert len(logs) >= 3 # health, users, admin/info finally: await server.stop() @pytest.mark.asyncio async def test_database_integration_with_server(self, tmp_path, test_settings): """Test database integration with running server.""" # Setup server with database server = MockServer( config=ServerConfig( name="test_server", port=8080, storage_enabled=True, database_url="sqlite:///test_integration.db" ) ) await server.start() try: # Make requests to generate logs async with httpx.AsyncClient(base_url=server.base_url) as client: await client.get("/users") await client.post("/users", json={"name": "Test User"}) await client.get("/users/1") # Verify logs in database db = server.database logs = await db.query_request_logs(limit=10) assert len(logs) == 3 assert logs[0].method in ["GET", "POST"] assert all(log.path.startswith("/users") for log in logs) # Test log filtering get_logs = await db.query_request_logs(method="GET") assert len(get_logs) == 2 post_logs = await db.query_request_logs(method="POST") assert len(post_logs) == 1 finally: await server.stop() def create_test_spec(self, tmp_path: Path) -> str: """Create test OpenAPI specification.""" spec = { "openapi": "3.0.0", "info": {"title": "Test API", "version": "1.0.0"}, "paths": { "/users": { "get": { "responses": { "200": { "description": "List users", "content": { "application/json": { "schema": { "type": "object", "properties": { "users": { "type": "array", "items": {"$ref": "#/components/schemas/User"} } } } } } } } }, "post": { "requestBody": { "content": { "application/json": { "schema": {"$ref": "#/components/schemas/User"} } } }, "responses": { "201": { "description": "User created", "content": { "application/json": { "schema": {"$ref": "#/components/schemas/User"} } } } } } }, "/users/{id}": { "get": { "parameters": [ { "name": "id", "in": "path", "required": True, "schema": {"type": "integer"} } ], "responses": { "200": { "description": "Get user", "content": { "application/json": { "schema": {"$ref": "#/components/schemas/User"} } } } } } } }, "components": { "schemas": { "User": { "type": "object", "properties": { "id": {"type": "integer"}, "name": {"type": "string"}, "email": {"type": "string", "format": "email"} } } } } } spec_file = tmp_path / "test_spec.json" spec_file.write_text(json.dumps(spec)) return str(spec_file) ``` ## End-to-End Testing ### User Workflow Testing ```python # test_e2e.py import pytest import subprocess import time import requests from pathlib import Path class TestEndToEndWorkflows: """Test complete user workflows.""" @pytest.mark.e2e def test_cli_generate_and_run_workflow(self, tmp_path): """Test CLI workflow: generate server and run it.""" spec_file = self.create_openapi_spec(tmp_path) output_dir = tmp_path / "generated_server" # 1. Generate server using CLI result = subprocess.run([ "python", "-m", "mockloop_mcp.cli", "generate", "--spec", str(spec_file), "--output", str(output_dir), "--auth-enabled", "--storage-enabled" ], capture_output=True, text=True) assert result.returncode == 0 assert "Server generated successfully" in result.stdout # 2. Verify generated files assert (output_dir / "main.py").exists() assert (output_dir / "requirements.txt").exists() assert (output_dir / "Dockerfile").exists() assert (output_dir / "docker-compose.yml").exists() # 3. Start server using Docker Compose compose_process = subprocess.Popen([ "docker-compose", "up", "-d" ], cwd=output_dir) try: # Wait for server to start time.sleep(10) # 4. Test server endpoints base_url = "http://localhost:8080" # Health check response = requests.get(f"{base_url}/health") assert response.status_code == 200 # API endpoints response = requests.get(f"{base_url}/users") assert response.status_code == 200 # Admin endpoints response = requests.get(f"{base_url}/admin/info") assert response.status_code == 200 # Test request logging response = requests.get(f"{base_url}/admin/logs") assert response.status_code == 200 logs = response.json()["logs"] assert len(logs) >= 3 finally: # 5. Stop server subprocess.run([ "docker-compose", "down" ], cwd=output_dir) @pytest.mark.e2e def test_mcp_tool_integration(self, tmp_path): """Test MCP tool integration workflow.""" # This would test the actual MCP tool usage # in a real MCP client environment pass def create_openapi_spec(self, tmp_path: Path) -> Path: """Create test OpenAPI specification file.""" spec = { "openapi": "3.0.0", "info": { "title": "E2E Test API", "version": "1.0.0", "description": "API for end-to-end testing" }, "servers": [ {"url": "http://localhost:8080", "description": "Local server"} ], "paths": { "/users": { "get": { "summary": "List users", "responses": { "200": { "description": "Success", "content": { "application/json": { "schema": { "type": "object", "properties": { "users": { "type": "array", "items": {"$ref": "#/components/schemas/User"} } } } } } } } } } }, "components": { "schemas": { "User": { "type": "object", "properties": { "id": {"type": "integer"}, "name": {"type": "string"}, "email": {"type": "string", "format": "email"} } } } } } spec_file = tmp_path / "e2e_spec.json" spec_file.write_text(json.dumps(spec, indent=2)) return spec_file ``` ## Performance Testing ### Load Testing ```python # test_performance.py import pytest import asyncio import time from concurrent.futures import ThreadPoolExecutor import httpx class TestPerformance: """Test performance characteristics.""" @pytest.mark.performance @pytest.mark.asyncio async def test_concurrent_request_handling(self, running_server): """Test server performance under concurrent load.""" base_url = running_server.base_url num_requests = 100 concurrent_clients = 10 async def make_requests(client_id: int) -> list: """Make requests from a single client.""" results = [] async with httpx.AsyncClient(base_url=base_url) as client: for i in range(num_requests // concurrent_clients): start_time = time.time() response = await client.get("/users") end_time = time.time() results.append({ "client_id": client_id, "request_id": i, "status_code": response.status_code, "response_time": end_time - start_time }) return results # Run concurrent clients start_time = time.time() tasks = [ make_requests(client_id) for client_id in range(concurrent_clients) ] results = await asyncio.gather(*tasks) end_time = time.time() # Analyze results all_results = [result for client_results in results for result in client_results] # Verify all requests succeeded assert all(r["status_code"] == 200 for r in all_results) assert len(all_results) == num_requests # Performance assertions total_time = end_time - start_time requests_per_second = num_requests / total_time assert requests_per_second > 50 # Minimum throughput response_times = [r["response_time"] for r in all_results] avg_response_time = sum(response_times) / len(response_times) max_response_time = max(response_times) assert avg_response_time < 0.1 # Average response time under 100ms assert max_response_time < 1.0 # Max response time under 1s @pytest.mark.performance def test_memory_usage_under_load(self, running_server): """Test memory usage during sustained load.""" import psutil import gc process = psutil.Process() initial_memory = process.memory_info().rss # Generate sustained load with ThreadPoolExecutor(max_workers=5) as executor: futures = [] for _ in range(1000): future = executor.submit( requests.get, f"{running_server.base_url}/users" ) futures.append(future) # Wait for completion for future in futures: response = future.result() assert response.status_code == 200 # Force garbage collection gc.collect() final_memory = process.memory_info().rss memory_increase = final_memory - initial_memory # Memory increase should be reasonable (less than 100MB) assert memory_increase < 100 * 1024 * 1024 @pytest.mark.performance @pytest.mark.benchmark def test_request_processing_benchmark(self, benchmark, client): """Benchmark request processing performance.""" def process_request(): response = client.get("/users") assert response.status_code == 200 return response result = benchmark(process_request) # Benchmark assertions assert benchmark.stats.mean < 0.01 # Mean time under 10ms assert benchmark.stats.max < 0.1 # Max time under 100ms ```

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MockLoop/mockloop-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server