Skip to main content
Glama

Katamari MCP Server

by ciphernaut
testing.py18.4 kB
""" Capability Testing Framework with Isolated Environments Provides comprehensive testing infrastructure for capabilities with sandboxing. """ import asyncio import inspect import json import sys import tempfile import time from pathlib import Path from typing import Any, Dict, List, Optional, Callable, Union from dataclasses import dataclass, field from enum import Enum import subprocess import venv import shutil from pydantic import BaseModel class TestStatus(Enum): """Status of a test execution.""" PENDING = "pending" RUNNING = "running" PASSED = "passed" FAILED = "failed" SKIPPED = "skipped" ERROR = "error" class TestType(Enum): """Types of tests available.""" UNIT = "unit" INTEGRATION = "integration" SECURITY = "security" PERFORMANCE = "performance" COMPATIBILITY = "compatibility" @dataclass class TestResult: """Result of a single test execution.""" test_name: str test_type: TestType status: TestStatus execution_time: float message: str = "" details: Dict[str, Any] = field(default_factory=dict) error: Optional[str] = None traceback: Optional[str] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return { "test_name": self.test_name, "test_type": self.test_type.value, "status": self.status.value, "execution_time": self.execution_time, "message": self.message, "details": self.details, "error": self.error, "traceback": self.traceback } @dataclass class TestSuite: """A collection of tests for a capability.""" name: str capability_path: Path tests: List[Callable] = field(default_factory=list) setup_func: Optional[Callable] = None teardown_func: Optional[Callable] = None def add_test(self, test_func: Callable, test_type: TestType = TestType.UNIT): """Add a test function to the suite.""" test_func.test_type = test_type self.tests.append(test_func) def add_setup(self, setup_func: Callable): """Add a setup function for the test suite.""" self.setup_func = setup_func def add_teardown(self, teardown_func: Callable): """Add a teardown function for the test suite.""" self.teardown_func = teardown_func class IsolatedEnvironment: """Creates an isolated Python environment for testing.""" def __init__(self, env_dir: Optional[Path] = None): self.env_dir = env_dir or Path(tempfile.mkdtemp(prefix="katamari_test_")) self.python_path = None self.created = False def create(self, requirements: Optional[List[str]] = None) -> bool: """Create the isolated environment.""" try: # Create virtual environment venv.create(self.env_dir, with_pip=True) # Determine Python executable path if sys.platform == "win32": self.python_path = self.env_dir / "Scripts" / "python.exe" pip_path = self.env_dir / "Scripts" / "pip.exe" else: self.python_path = self.env_dir / "bin" / "python" pip_path = self.env_dir / "bin" / "pip" # Upgrade pip subprocess.run([str(self.python_path), "-m", "pip", "install", "--upgrade", "pip"], check=True, capture_output=True) # Install requirements if provided if requirements: cmd = [str(pip_path), "install"] + requirements subprocess.run(cmd, check=True, capture_output=True) self.created = True return True except Exception as e: print(f"Failed to create isolated environment: {e}") return False def run_script(self, script_path: Path, *args: str) -> subprocess.CompletedProcess: """Run a Python script in the isolated environment.""" if not self.created or not self.python_path: raise RuntimeError("Environment not created or Python path not set") cmd = [str(self.python_path), str(script_path)] + list(args) return subprocess.run(cmd, capture_output=True, text=True) def cleanup(self): """Clean up the isolated environment.""" if self.env_dir.exists(): shutil.rmtree(self.env_dir, ignore_errors=True) class CapabilityTester: """Main capability testing framework.""" def __init__(self, isolated: bool = True): self.isolated = isolated self.environments: Dict[str, IsolatedEnvironment] = {} self.test_suites: Dict[str, TestSuite] = {} def register_capability(self, capability_path: Path, suite_name: Optional[str] = None) -> TestSuite: """Register a capability for testing.""" name = suite_name or capability_path.name suite = TestSuite(name=name, capability_path=capability_path) self.test_suites[name] = suite # Create isolated environment if requested if self.isolated: env = IsolatedEnvironment() self.environments[name] = env return suite def discover_tests(self, capability_path: Path) -> List[Callable]: """Automatically discover test functions in a capability.""" tests = [] # Look for test_*.py files test_files = list(capability_path.rglob("test_*.py")) test_files.extend(capability_path.rglob("*_test.py")) for test_file in test_files: # Read and execute test file to extract test functions try: with open(test_file, 'r') as f: content = f.read() # Simple regex to find test functions import re pattern = r'def (test_[a-zA-Z_][a-zA-Z0-9_]*)\s*\(' matches = re.findall(pattern, content) for func_name in matches: # Create a wrapper function that will be executed later def make_wrapper(file_path, name): def wrapper(): return self._run_test_file(file_path, name) wrapper.__name__ = name return wrapper tests.append(make_wrapper(test_file, func_name)) except Exception as e: print(f"Error discovering tests in {test_file}: {e}") return tests def _run_test_file(self, test_file: Path, test_name: str) -> TestResult: """Run a specific test from a test file.""" start_time = time.time() try: # Create a temporary test script temp_script = test_file.parent / f"temp_{test_name}.py" # Write test runner script script_content = f""" import sys import traceback from pathlib import Path # Add the capability path to sys.path sys.path.insert(0, r"{test_file.parent}") try: # Import the test module import {test_file.stem} # Run the specific test function result = {test_file.stem}.{test_name}() if result is None or result is True: print("TEST_PASSED") else: print("TEST_FAILED") print(f"Result: {{result}}") except Exception as e: print("TEST_ERROR") print(f"Error: {{e}}") traceback.print_exc() """ with open(temp_script, 'w') as f: f.write(script_content) # Run the test result = subprocess.run([sys.executable, str(temp_script)], capture_output=True, text=True, timeout=30) execution_time = time.time() - start_time # Clean up temp_script.unlink(missing_ok=True) # Parse result if result.returncode == 0 and "TEST_PASSED" in result.stdout: return TestResult( test_name=test_name, test_type=TestType.UNIT, status=TestStatus.PASSED, execution_time=execution_time, message="Test passed successfully" ) elif "TEST_FAILED" in result.stdout: return TestResult( test_name=test_name, test_type=TestType.UNIT, status=TestStatus.FAILED, execution_time=execution_time, message="Test failed", details={"stdout": result.stdout, "stderr": result.stderr} ) else: return TestResult( test_name=test_name, test_type=TestType.UNIT, status=TestStatus.ERROR, execution_time=execution_time, message="Test execution error", error=result.stderr, traceback=result.stdout ) except subprocess.TimeoutExpired: return TestResult( test_name=test_name, test_type=TestType.UNIT, status=TestStatus.ERROR, execution_time=30.0, message="Test timed out after 30 seconds" ) except Exception as e: execution_time = time.time() - start_time return TestResult( test_name=test_name, test_type=TestType.UNIT, status=TestStatus.ERROR, execution_time=execution_time, message=f"Unexpected error: {e}", error=str(e) ) async def run_test_suite(self, suite_name: str) -> List[TestResult]: """Run all tests in a test suite.""" if suite_name not in self.test_suites: raise ValueError(f"Test suite '{suite_name}' not found") suite = self.test_suites[suite_name] results = [] # Setup environment if suite.setup_func: try: if inspect.iscoroutinefunction(suite.setup_func): await suite.setup_func() else: suite.setup_func() except Exception as e: # Create error result for all tests for test in suite.tests: results.append(TestResult( test_name=test.__name__, test_type=getattr(test, 'test_type', TestType.UNIT), status=TestStatus.ERROR, execution_time=0.0, message=f"Setup failed: {e}", error=str(e) )) return results # Run tests for test in suite.tests: try: start_time = time.time() if inspect.iscoroutinefunction(test): await test() status = TestStatus.PASSED message = "Test passed" else: test() status = TestStatus.PASSED message = "Test passed" execution_time = time.time() - start_time results.append(TestResult( test_name=test.__name__, test_type=getattr(test, 'test_type', TestType.UNIT), status=status, execution_time=execution_time, message=message )) except AssertionError as e: execution_time = time.time() - start_time results.append(TestResult( test_name=test.__name__, test_type=getattr(test, 'test_type', TestType.UNIT), status=TestStatus.FAILED, execution_time=execution_time, message=f"Assertion failed: {e}", error=str(e) )) except Exception as e: execution_time = time.time() - start_time results.append(TestResult( test_name=test.__name__, test_type=getattr(test, 'test_type', TestType.UNIT), status=TestStatus.ERROR, execution_time=execution_time, message=f"Test error: {e}", error=str(e) )) # Teardown if suite.teardown_func: try: if inspect.iscoroutinefunction(suite.teardown_func): await suite.teardown_func() else: suite.teardown_func() except Exception as e: print(f"Teardown error: {e}") return results async def run_all_tests(self) -> Dict[str, List[TestResult]]: """Run all test suites.""" all_results = {} for suite_name in self.test_suites: results = await self.run_test_suite(suite_name) all_results[suite_name] = results return all_results def generate_report(self, results: Dict[str, List[TestResult]]) -> str: """Generate a comprehensive test report.""" report = [] report.append("=" * 60) report.append("CAPABILITY TEST REPORT") report.append("=" * 60) total_tests = 0 total_passed = 0 total_failed = 0 total_errors = 0 total_skipped = 0 for suite_name, suite_results in results.items(): report.append(f"\nTest Suite: {suite_name}") report.append("-" * 40) suite_passed = sum(1 for r in suite_results if r.status == TestStatus.PASSED) suite_failed = sum(1 for r in suite_results if r.status == TestStatus.FAILED) suite_errors = sum(1 for r in suite_results if r.status == TestStatus.ERROR) suite_skipped = sum(1 for r in suite_results if r.status == TestStatus.SKIPPED) suite_total = len(suite_results) total_tests += suite_total total_passed += suite_passed total_failed += suite_failed total_errors += suite_errors total_skipped += suite_skipped report.append(f"Total: {suite_total}, Passed: {suite_passed}, Failed: {suite_failed}, Errors: {suite_errors}, Skipped: {suite_skipped}") # Show failed and error tests for result in suite_results: if result.status in [TestStatus.FAILED, TestStatus.ERROR]: report.append(f" ❌ {result.test_name} ({result.status.value}): {result.message}") if result.error: report.append(f" Error: {result.error}") # Summary report.append(f"\n" + "=" * 60) report.append("OVERALL SUMMARY") report.append("=" * 60) report.append(f"Total Tests: {total_tests}") report.append(f"Passed: {total_passed} ({total_passed/total_tests*100:.1f}%)" if total_tests > 0 else "Passed: 0") report.append(f"Failed: {total_failed} ({total_failed/total_tests*100:.1f}%)" if total_tests > 0 else "Failed: 0") report.append(f"Errors: {total_errors} ({total_errors/total_tests*100:.1f}%)" if total_tests > 0 else "Errors: 0") report.append(f"Skipped: {total_skipped} ({total_skipped/total_tests*100:.1f}%)" if total_tests > 0 else "Skipped: 0") success_rate = (total_passed / total_tests * 100) if total_tests > 0 else 0 report.append(f"Success Rate: {success_rate:.1f}%") return "\n".join(report) def cleanup(self): """Clean up all isolated environments.""" for env in self.environments.values(): env.cleanup() self.environments.clear() # Test decorators and utilities def capability_test(test_type: TestType = TestType.UNIT): """Decorator to mark a function as a capability test.""" def decorator(func): func.test_type = test_type return func return decorator def integration_test(func): """Decorator to mark a function as an integration test.""" return capability_test(TestType.INTEGRATION)(func) def security_test(func): """Decorator to mark a function as a security test.""" return capability_test(TestType.SECURITY)(func) def performance_test(func): """Decorator to mark a function as a performance test.""" return capability_test(TestType.PERFORMANCE)(func) # Predefined test templates class CapabilityTestTemplate: """Template class for common capability tests.""" @staticmethod def test_import(capability_module: str): """Test that a capability can be imported.""" def test(): import importlib importlib.import_module(capability_module) return test @staticmethod def test_basic_functionality(capability_instance, test_method: str = "test"): """Test basic functionality of a capability.""" def test(): if hasattr(capability_instance, test_method): method = getattr(capability_instance, test_method) result = method() assert result is not None, f"Method {test_method} returned None" return test @staticmethod def test_error_handling(capability_instance): """Test error handling of a capability.""" def test(): # Test with invalid input try: if hasattr(capability_instance, 'process'): capability_instance.process(None) assert False, "Should have raised an error for None input" except (ValueError, TypeError, AttributeError): pass # Expected return test

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server