Skip to main content
Glama

MockLoop MCP Server

Official
by MockLoop
test_comprehensive_integration.py30 kB
#!/usr/bin/env python3 """ Comprehensive Integration Test Suite for MockLoop MCP Enhancement Plan Tests the complete integration of Phase 1 and Phase 2 components. """ import asyncio from datetime import datetime import json from pathlib import Path import shutil import sqlite3 import sys import tempfile import time from typing import Any # Add the src directory to the path sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src")) from mockloop_mcp.database_migration import DatabaseMigrator from mockloop_mcp.log_analyzer import LogAnalyzer from mockloop_mcp.main import ( discover_mock_servers_tool, generate_mock_api_tool, manage_mock_data_tool, query_mock_logs_tool, ) class ComprehensiveIntegrationTester: """Comprehensive test suite for MockLoop MCP enhancement plan.""" def __init__(self): self.test_results = {} self.temp_dir = None self.mock_server_dir = None self.test_server_url = "http://localhost:8000" self.server_process = None def setup_test_environment(self): """Set up temporary test environment.""" self.temp_dir = Path(tempfile.mkdtemp(prefix="mockloop_integration_test_")) def cleanup_test_environment(self): """Clean up test environment.""" if self.server_process: try: self.server_process.terminate() self.server_process.wait(timeout=5) except: pass if self.temp_dir and self.temp_dir.exists(): shutil.rmtree(self.temp_dir) def create_test_api_spec(self) -> dict[str, Any]: """Create a comprehensive test API specification.""" return { "openapi": "3.0.0", "info": { "title": "Comprehensive Integration Test API", "version": "1.0.0", "description": "API for testing all MockLoop MCP enhancement features", }, "paths": { "/users": { "get": { "summary": "Get all users", "responses": { "200": { "description": "List of users", "content": { "application/json": { "schema": { "type": "array", "items": { "type": "object", "properties": { "id": {"type": "integer"}, "name": {"type": "string"}, "email": {"type": "string"}, }, }, } } }, } }, }, "post": { "summary": "Create a user", "requestBody": { "content": { "application/json": { "schema": { "type": "object", "properties": { "name": {"type": "string"}, "email": {"type": "string"}, }, } } } }, "responses": { "201": { "description": "User created", "content": { "application/json": { "schema": { "type": "object", "properties": { "id": {"type": "integer"}, "name": {"type": "string"}, "email": {"type": "string"}, }, } } }, } }, }, }, "/users/{user_id}": { "get": { "summary": "Get user by ID", "parameters": [ { "name": "user_id", "in": "path", "required": True, "schema": {"type": "integer"}, } ], "responses": { "200": { "description": "User details", "content": { "application/json": { "schema": { "type": "object", "properties": { "id": {"type": "integer"}, "name": {"type": "string"}, "email": {"type": "string"}, }, } } }, } }, } }, "/products": { "get": { "summary": "Get all products", "responses": { "200": { "description": "List of products", "content": { "application/json": { "schema": { "type": "array", "items": { "type": "object", "properties": { "id": {"type": "integer"}, "name": {"type": "string"}, "price": {"type": "number"}, }, }, } } }, } }, } }, }, } async def test_complete_workflow(self) -> bool: """Test the complete workflow: generation → scenario creation → dynamic responses → monitoring.""" try: # Step 1: Generate mock server with all features test_spec = self.create_test_api_spec() result = await generate_mock_api_tool( spec_url_or_path=json.dumps(test_spec), output_dir_name="integration_test_server", auth_enabled=True, webhooks_enabled=True, admin_ui_enabled=True, storage_enabled=True, ) if result["status"] != "success": return False self.mock_server_dir = Path(result["generated_mock_path"]) # Step 2: Validate database schema migration db_path = self.mock_server_dir / "db" / "request_logs.db" db_path.parent.mkdir(exist_ok=True) migrator = DatabaseMigrator(str(db_path)) migration_status = migrator.get_migration_status() if migration_status["needs_migration"]: success = migrator.apply_migrations() if not success: return False # Step 3: Start the mock server (simulate) # Check for enhanced files required_files = [ "main.py", "logging_middleware.py", "templates/admin.html", "requirements_mock.txt", ] for file_path in required_files: if not (self.mock_server_dir / file_path).exists(): return False # Step 4: Test MCP tools integration (without running server) # Test discover_mock_servers discovery_result = await discover_mock_servers_tool( ports=[8000, 8001, 8002], check_health=False, include_generated=True ) if discovery_result["status"] != "success": return False # Step 5: Test database operations # Insert test log data conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() test_logs = [ ( "2024-01-01T10:00:00", "request", "GET", "/users", 200, 45, "127.0.0.1", "8080", '{"user-agent": "test-client"}', "{}", "{}", '[{"id": 1, "name": "Test User"}]', "test-session-1", "user-list-scenario", "req-001", "test-client", 1024, 0, ), ( "2024-01-01T10:01:00", "request", "POST", "/users", 201, 67, "127.0.0.1", "8080", '{"user-agent": "test-client"}', "{}", '{"name": "New User"}', '{"id": 2, "name": "New User"}', "test-session-1", "user-create-scenario", "req-002", "test-client", 512, 0, ), ( "2024-01-01T10:02:00", "request", "GET", "/products", 200, 32, "127.0.0.1", "8080", '{"user-agent": "test-client"}', "{}", "{}", '[{"id": 1, "name": "Product 1"}]', "test-session-2", "product-list-scenario", "req-003", "test-client", 2048, 0, ), ] for log_data in test_logs: cursor.execute( """ INSERT INTO request_logs ( timestamp, type, method, path, status_code, process_time_ms, client_host, client_port, headers, query_params, request_body, response_body, session_id, test_scenario, correlation_id, user_agent, response_size, is_admin ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, log_data, ) conn.commit() conn.close() # Step 6: Test log analysis analyzer = LogAnalyzer() # Read logs back from database conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM request_logs ORDER BY timestamp") logs = [dict(row) for row in cursor.fetchall()] conn.close() analysis = analyzer.analyze_logs(logs) if not analysis or "total_requests" not in analysis: return False # Step 7: Test scenario management (structure validation) # Check if mock_scenarios table exists conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='mock_scenarios' """) if not cursor.fetchone(): conn.close() return False # Insert test scenario test_scenario = { "name": "integration-test-scenario", "description": "Test scenario for integration testing", "endpoints": { "/users": { "GET": { "status": 200, "data": [{"id": 999, "name": "Integration Test User"}], } } }, } cursor.execute( """ INSERT INTO mock_scenarios (name, description, config, is_active) VALUES (?, ?, ?, ?) """, ( "integration-test", "Integration test scenario", json.dumps(test_scenario), 1, ), ) conn.commit() conn.close() return True except Exception: import traceback traceback.print_exc() return False async def test_mcp_tools_integration(self) -> bool: """Test all MCP tools working together.""" try: # Test 1: generate_mock_api_tool result = await generate_mock_api_tool( spec_url_or_path=json.dumps(self.create_test_api_spec()), output_dir_name="mcp_tools_test_server", ) if result["status"] != "success": return False # Test 2: discover_mock_servers_tool discovery_result = await discover_mock_servers_tool( check_health=False, include_generated=True ) if discovery_result["status"] != "success": return False # Test 3: manage_mock_data_tool (validation only) # Test invalid server result = await manage_mock_data_tool( server_url="http://invalid-server:9999", operation="list_scenarios" ) if result["status"] != "error": return False # Test invalid operation result = await manage_mock_data_tool( server_url="http://localhost:8000", operation="invalid_operation" ) if ( result["status"] != "error" or "Unknown operation" not in result["message"] ): return False # Test 4: query_mock_logs_tool (without server) # This will fail without a server, but we test the structure result = await query_mock_logs_tool( server_url="http://localhost:8000", limit=10 ) # Should have proper error handling return not ("status" not in result or "logs" not in result) except Exception: return False async def test_database_migration_system(self) -> bool: """Test the complete database migration system.""" try: # Create test database test_db_path = self.temp_dir / "migration_test.db" migrator = DatabaseMigrator(str(test_db_path)) # Test 1: Initial state initial_status = migrator.get_migration_status() if initial_status["current_version"] != 0: return False # Test 2: Apply all migrations success = migrator.apply_migrations() if not success: return False final_status = migrator.get_migration_status() if final_status["needs_migration"]: return False # Test 3: Verify all tables exist conn = sqlite3.connect(str(test_db_path)) cursor = conn.cursor() expected_tables = [ "request_logs", "test_sessions", "performance_metrics", "mock_scenarios", "schema_version", ] for table in expected_tables: cursor.execute( """ SELECT name FROM sqlite_master WHERE type='table' AND name=? """, (table,), ) if not cursor.fetchone(): conn.close() return False conn.close() # Test 4: Backup functionality backup_path = migrator.backup_database() if not Path(backup_path).exists(): return False # Test 5: Rollback functionality current_version = migrator.get_current_version() if current_version > 1: rollback_success = migrator.rollback_migration(current_version - 1) if not rollback_success: return False return True except Exception: return False async def test_performance_and_compatibility(self) -> bool: """Test performance overhead and backward compatibility.""" try: # Test 1: Template generation performance start_time = time.time() for i in range(5): # Generate multiple servers test_spec = self.create_test_api_spec() test_spec["info"]["title"] = f"Performance Test API {i}" result = await generate_mock_api_tool( spec_url_or_path=json.dumps(test_spec), output_dir_name=f"perf_test_server_{i}", ) if result["status"] != "success": return False generation_time = time.time() - start_time avg_time = generation_time / 5 if avg_time > 10: # Should be under 10 seconds per generation pass # Test 2: Database operation performance test_db_path = self.temp_dir / "performance_test.db" migrator = DatabaseMigrator(str(test_db_path)) start_time = time.time() migrator.apply_migrations() time.time() - start_time # Test 3: Log analysis performance # Generate large log dataset conn = sqlite3.connect(str(test_db_path)) cursor = conn.cursor() start_time = time.time() # Insert 1000 test log entries for i in range(1000): cursor.execute( """ INSERT INTO request_logs ( timestamp, type, method, path, status_code, process_time_ms, client_host, client_port, headers, query_params, request_body, response_body, session_id, test_scenario, correlation_id, user_agent, response_size, is_admin ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( f"2024-01-01T10:{i // 60:02d}:{i % 60:02d}", "request", "GET", f"/test/{i}", 200, 50 + (i % 100), "127.0.0.1", "8080", '{"user-agent": "test"}', "{}", "{}", "{}", f"session-{i // 100}", f"scenario-{i % 10}", f"req-{i:04d}", "test-client", 1024, 0, ), ) conn.commit() # Read and analyze conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM request_logs") logs = [dict(row) for row in cursor.fetchall()] conn.close() analyzer = LogAnalyzer() analyzer.analyze_logs(logs) analysis_time = time.time() - start_time if analysis_time > 5: # Should analyze 1000 logs in under 5 seconds pass # Test 4: Backward compatibility # Create a "legacy" database with just the base table legacy_db_path = self.temp_dir / "legacy_test.db" conn = sqlite3.connect(str(legacy_db_path)) cursor = conn.cursor() # Create base table only (migration 0) cursor.execute(""" CREATE TABLE request_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, type TEXT, method TEXT, path TEXT, status_code INTEGER, process_time_ms INTEGER, client_host TEXT, client_port TEXT, headers TEXT, query_params TEXT, request_body TEXT, response_body TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.commit() conn.close() # Test migration from legacy legacy_migrator = DatabaseMigrator(str(legacy_db_path)) legacy_status = legacy_migrator.get_migration_status() if legacy_status["current_version"] != 0: return False # Apply migrations to legacy database success = legacy_migrator.apply_migrations() return success except Exception: return False async def test_error_handling_and_resilience(self) -> bool: """Test error handling and system resilience.""" try: # Test 1: Invalid API specifications invalid_specs = [ "invalid json", {"invalid": "spec"}, {"openapi": "3.0.0"}, # Missing required fields ] for i, invalid_spec in enumerate(invalid_specs): try: result = await generate_mock_api_tool( spec_url_or_path=json.dumps(invalid_spec) if isinstance(invalid_spec, dict) else invalid_spec, output_dir_name=f"invalid_test_{i}", ) if result["status"] != "error": return False except Exception: pass # Expected to fail # Test 2: Database corruption handling # Create corrupted database corrupted_db_path = self.temp_dir / "corrupted.db" with open(corrupted_db_path, "w") as f: f.write("This is not a valid SQLite database") try: migrator = DatabaseMigrator(str(corrupted_db_path)) migrator.get_migration_status() # Should handle gracefully except Exception: pass # Test 3: Network error handling # Test with unreachable server result = await manage_mock_data_tool( server_url="http://192.0.2.1:9999", # RFC 5737 test address operation="list_scenarios", ) if result["status"] != "error": return False # Test 4: Concurrent access test_db_path = self.temp_dir / "concurrent_test.db" migrator = DatabaseMigrator(str(test_db_path)) migrator.apply_migrations() async def concurrent_operation(operation_id): """Simulate concurrent database operation.""" try: conn = sqlite3.connect(str(test_db_path)) cursor = conn.cursor() cursor.execute( """ INSERT INTO request_logs ( timestamp, type, method, path, status_code, process_time_ms, client_host, client_port, headers, query_params, request_body, response_body, session_id, test_scenario, correlation_id, user_agent, response_size, is_admin ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( f"2024-01-01T10:00:{operation_id:02d}", "request", "GET", f"/concurrent/{operation_id}", 200, 50, "127.0.0.1", "8080", "{}", "{}", "{}", "{}", f"session-{operation_id}", "concurrent-test", f"req-{operation_id}", "test-client", 1024, 0, ), ) conn.commit() conn.close() return True except Exception: return False # Run 10 concurrent operations tasks = [concurrent_operation(i) for i in range(10)] results = await asyncio.gather(*tasks, return_exceptions=True) successful_ops = sum(1 for r in results if r is True) if successful_ops < 8: # Allow some failures due to locking return False return True except Exception: return False def generate_test_report(self) -> dict[str, Any]: """Generate comprehensive test report.""" total_tests = len(self.test_results) passed_tests = sum(1 for result in self.test_results.values() if result) return { "test_summary": { "total_tests": total_tests, "passed_tests": passed_tests, "failed_tests": total_tests - passed_tests, "success_rate": (passed_tests / total_tests * 100) if total_tests > 0 else 0, }, "test_results": self.test_results, "timestamp": datetime.now().isoformat(), "environment": { "python_version": sys.version, "test_directory": str(self.temp_dir) if self.temp_dir else None, }, } async def run_all_tests(self) -> bool: """Run all integration tests.""" self.setup_test_environment() try: # Test 1: Complete workflow self.test_results["complete_workflow"] = await self.test_complete_workflow() # Test 2: MCP tools integration self.test_results[ "mcp_tools_integration" ] = await self.test_mcp_tools_integration() # Test 3: Database migration system self.test_results[ "database_migration" ] = await self.test_database_migration_system() # Test 4: Performance and compatibility self.test_results[ "performance_compatibility" ] = await self.test_performance_and_compatibility() # Test 5: Error handling and resilience self.test_results[ "error_handling" ] = await self.test_error_handling_and_resilience() # Generate final report report = self.generate_test_report() for _test_name, _result in self.test_results.items(): pass # Save detailed report report_path = ( self.temp_dir / "integration_test_report.json" if self.temp_dir else Path("integration_test_report.json") ) with open(report_path, "w") as f: json.dump(report, f, indent=2) return all(self.test_results.values()) except Exception: import traceback traceback.print_exc() return False finally: self.cleanup_test_environment() async def main(): """Main test execution function.""" tester = ComprehensiveIntegrationTester() success = await tester.run_all_tests() if success: return 0 else: return 1 if __name__ == "__main__": exit_code = asyncio.run(main()) sys.exit(exit_code)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MockLoop/mockloop-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server