testing.py•18.4 kB
"""
Capability Testing Framework with Isolated Environments
Provides comprehensive testing infrastructure for capabilities with sandboxing.
"""
import asyncio
import inspect
import json
import sys
import tempfile
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Callable, Union
from dataclasses import dataclass, field
from enum import Enum
import subprocess
import venv
import shutil
from pydantic import BaseModel
class TestStatus(Enum):
"""Status of a test execution."""
PENDING = "pending"
RUNNING = "running"
PASSED = "passed"
FAILED = "failed"
SKIPPED = "skipped"
ERROR = "error"
class TestType(Enum):
"""Types of tests available."""
UNIT = "unit"
INTEGRATION = "integration"
SECURITY = "security"
PERFORMANCE = "performance"
COMPATIBILITY = "compatibility"
@dataclass
class TestResult:
"""Result of a single test execution."""
test_name: str
test_type: TestType
status: TestStatus
execution_time: float
message: str = ""
details: Dict[str, Any] = field(default_factory=dict)
error: Optional[str] = None
traceback: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"test_name": self.test_name,
"test_type": self.test_type.value,
"status": self.status.value,
"execution_time": self.execution_time,
"message": self.message,
"details": self.details,
"error": self.error,
"traceback": self.traceback
}
@dataclass
class TestSuite:
"""A collection of tests for a capability."""
name: str
capability_path: Path
tests: List[Callable] = field(default_factory=list)
setup_func: Optional[Callable] = None
teardown_func: Optional[Callable] = None
def add_test(self, test_func: Callable, test_type: TestType = TestType.UNIT):
"""Add a test function to the suite."""
test_func.test_type = test_type
self.tests.append(test_func)
def add_setup(self, setup_func: Callable):
"""Add a setup function for the test suite."""
self.setup_func = setup_func
def add_teardown(self, teardown_func: Callable):
"""Add a teardown function for the test suite."""
self.teardown_func = teardown_func
class IsolatedEnvironment:
"""Creates an isolated Python environment for testing."""
def __init__(self, env_dir: Optional[Path] = None):
self.env_dir = env_dir or Path(tempfile.mkdtemp(prefix="katamari_test_"))
self.python_path = None
self.created = False
def create(self, requirements: Optional[List[str]] = None) -> bool:
"""Create the isolated environment."""
try:
# Create virtual environment
venv.create(self.env_dir, with_pip=True)
# Determine Python executable path
if sys.platform == "win32":
self.python_path = self.env_dir / "Scripts" / "python.exe"
pip_path = self.env_dir / "Scripts" / "pip.exe"
else:
self.python_path = self.env_dir / "bin" / "python"
pip_path = self.env_dir / "bin" / "pip"
# Upgrade pip
subprocess.run([str(self.python_path), "-m", "pip", "install", "--upgrade", "pip"],
check=True, capture_output=True)
# Install requirements if provided
if requirements:
cmd = [str(pip_path), "install"] + requirements
subprocess.run(cmd, check=True, capture_output=True)
self.created = True
return True
except Exception as e:
print(f"Failed to create isolated environment: {e}")
return False
def run_script(self, script_path: Path, *args: str) -> subprocess.CompletedProcess:
"""Run a Python script in the isolated environment."""
if not self.created or not self.python_path:
raise RuntimeError("Environment not created or Python path not set")
cmd = [str(self.python_path), str(script_path)] + list(args)
return subprocess.run(cmd, capture_output=True, text=True)
def cleanup(self):
"""Clean up the isolated environment."""
if self.env_dir.exists():
shutil.rmtree(self.env_dir, ignore_errors=True)
class CapabilityTester:
"""Main capability testing framework."""
def __init__(self, isolated: bool = True):
self.isolated = isolated
self.environments: Dict[str, IsolatedEnvironment] = {}
self.test_suites: Dict[str, TestSuite] = {}
def register_capability(self, capability_path: Path, suite_name: Optional[str] = None) -> TestSuite:
"""Register a capability for testing."""
name = suite_name or capability_path.name
suite = TestSuite(name=name, capability_path=capability_path)
self.test_suites[name] = suite
# Create isolated environment if requested
if self.isolated:
env = IsolatedEnvironment()
self.environments[name] = env
return suite
def discover_tests(self, capability_path: Path) -> List[Callable]:
"""Automatically discover test functions in a capability."""
tests = []
# Look for test_*.py files
test_files = list(capability_path.rglob("test_*.py"))
test_files.extend(capability_path.rglob("*_test.py"))
for test_file in test_files:
# Read and execute test file to extract test functions
try:
with open(test_file, 'r') as f:
content = f.read()
# Simple regex to find test functions
import re
pattern = r'def (test_[a-zA-Z_][a-zA-Z0-9_]*)\s*\('
matches = re.findall(pattern, content)
for func_name in matches:
# Create a wrapper function that will be executed later
def make_wrapper(file_path, name):
def wrapper():
return self._run_test_file(file_path, name)
wrapper.__name__ = name
return wrapper
tests.append(make_wrapper(test_file, func_name))
except Exception as e:
print(f"Error discovering tests in {test_file}: {e}")
return tests
def _run_test_file(self, test_file: Path, test_name: str) -> TestResult:
"""Run a specific test from a test file."""
start_time = time.time()
try:
# Create a temporary test script
temp_script = test_file.parent / f"temp_{test_name}.py"
# Write test runner script
script_content = f"""
import sys
import traceback
from pathlib import Path
# Add the capability path to sys.path
sys.path.insert(0, r"{test_file.parent}")
try:
# Import the test module
import {test_file.stem}
# Run the specific test function
result = {test_file.stem}.{test_name}()
if result is None or result is True:
print("TEST_PASSED")
else:
print("TEST_FAILED")
print(f"Result: {{result}}")
except Exception as e:
print("TEST_ERROR")
print(f"Error: {{e}}")
traceback.print_exc()
"""
with open(temp_script, 'w') as f:
f.write(script_content)
# Run the test
result = subprocess.run([sys.executable, str(temp_script)],
capture_output=True, text=True, timeout=30)
execution_time = time.time() - start_time
# Clean up
temp_script.unlink(missing_ok=True)
# Parse result
if result.returncode == 0 and "TEST_PASSED" in result.stdout:
return TestResult(
test_name=test_name,
test_type=TestType.UNIT,
status=TestStatus.PASSED,
execution_time=execution_time,
message="Test passed successfully"
)
elif "TEST_FAILED" in result.stdout:
return TestResult(
test_name=test_name,
test_type=TestType.UNIT,
status=TestStatus.FAILED,
execution_time=execution_time,
message="Test failed",
details={"stdout": result.stdout, "stderr": result.stderr}
)
else:
return TestResult(
test_name=test_name,
test_type=TestType.UNIT,
status=TestStatus.ERROR,
execution_time=execution_time,
message="Test execution error",
error=result.stderr,
traceback=result.stdout
)
except subprocess.TimeoutExpired:
return TestResult(
test_name=test_name,
test_type=TestType.UNIT,
status=TestStatus.ERROR,
execution_time=30.0,
message="Test timed out after 30 seconds"
)
except Exception as e:
execution_time = time.time() - start_time
return TestResult(
test_name=test_name,
test_type=TestType.UNIT,
status=TestStatus.ERROR,
execution_time=execution_time,
message=f"Unexpected error: {e}",
error=str(e)
)
async def run_test_suite(self, suite_name: str) -> List[TestResult]:
"""Run all tests in a test suite."""
if suite_name not in self.test_suites:
raise ValueError(f"Test suite '{suite_name}' not found")
suite = self.test_suites[suite_name]
results = []
# Setup environment
if suite.setup_func:
try:
if inspect.iscoroutinefunction(suite.setup_func):
await suite.setup_func()
else:
suite.setup_func()
except Exception as e:
# Create error result for all tests
for test in suite.tests:
results.append(TestResult(
test_name=test.__name__,
test_type=getattr(test, 'test_type', TestType.UNIT),
status=TestStatus.ERROR,
execution_time=0.0,
message=f"Setup failed: {e}",
error=str(e)
))
return results
# Run tests
for test in suite.tests:
try:
start_time = time.time()
if inspect.iscoroutinefunction(test):
await test()
status = TestStatus.PASSED
message = "Test passed"
else:
test()
status = TestStatus.PASSED
message = "Test passed"
execution_time = time.time() - start_time
results.append(TestResult(
test_name=test.__name__,
test_type=getattr(test, 'test_type', TestType.UNIT),
status=status,
execution_time=execution_time,
message=message
))
except AssertionError as e:
execution_time = time.time() - start_time
results.append(TestResult(
test_name=test.__name__,
test_type=getattr(test, 'test_type', TestType.UNIT),
status=TestStatus.FAILED,
execution_time=execution_time,
message=f"Assertion failed: {e}",
error=str(e)
))
except Exception as e:
execution_time = time.time() - start_time
results.append(TestResult(
test_name=test.__name__,
test_type=getattr(test, 'test_type', TestType.UNIT),
status=TestStatus.ERROR,
execution_time=execution_time,
message=f"Test error: {e}",
error=str(e)
))
# Teardown
if suite.teardown_func:
try:
if inspect.iscoroutinefunction(suite.teardown_func):
await suite.teardown_func()
else:
suite.teardown_func()
except Exception as e:
print(f"Teardown error: {e}")
return results
async def run_all_tests(self) -> Dict[str, List[TestResult]]:
"""Run all test suites."""
all_results = {}
for suite_name in self.test_suites:
results = await self.run_test_suite(suite_name)
all_results[suite_name] = results
return all_results
def generate_report(self, results: Dict[str, List[TestResult]]) -> str:
"""Generate a comprehensive test report."""
report = []
report.append("=" * 60)
report.append("CAPABILITY TEST REPORT")
report.append("=" * 60)
total_tests = 0
total_passed = 0
total_failed = 0
total_errors = 0
total_skipped = 0
for suite_name, suite_results in results.items():
report.append(f"\nTest Suite: {suite_name}")
report.append("-" * 40)
suite_passed = sum(1 for r in suite_results if r.status == TestStatus.PASSED)
suite_failed = sum(1 for r in suite_results if r.status == TestStatus.FAILED)
suite_errors = sum(1 for r in suite_results if r.status == TestStatus.ERROR)
suite_skipped = sum(1 for r in suite_results if r.status == TestStatus.SKIPPED)
suite_total = len(suite_results)
total_tests += suite_total
total_passed += suite_passed
total_failed += suite_failed
total_errors += suite_errors
total_skipped += suite_skipped
report.append(f"Total: {suite_total}, Passed: {suite_passed}, Failed: {suite_failed}, Errors: {suite_errors}, Skipped: {suite_skipped}")
# Show failed and error tests
for result in suite_results:
if result.status in [TestStatus.FAILED, TestStatus.ERROR]:
report.append(f" ❌ {result.test_name} ({result.status.value}): {result.message}")
if result.error:
report.append(f" Error: {result.error}")
# Summary
report.append(f"\n" + "=" * 60)
report.append("OVERALL SUMMARY")
report.append("=" * 60)
report.append(f"Total Tests: {total_tests}")
report.append(f"Passed: {total_passed} ({total_passed/total_tests*100:.1f}%)" if total_tests > 0 else "Passed: 0")
report.append(f"Failed: {total_failed} ({total_failed/total_tests*100:.1f}%)" if total_tests > 0 else "Failed: 0")
report.append(f"Errors: {total_errors} ({total_errors/total_tests*100:.1f}%)" if total_tests > 0 else "Errors: 0")
report.append(f"Skipped: {total_skipped} ({total_skipped/total_tests*100:.1f}%)" if total_tests > 0 else "Skipped: 0")
success_rate = (total_passed / total_tests * 100) if total_tests > 0 else 0
report.append(f"Success Rate: {success_rate:.1f}%")
return "\n".join(report)
def cleanup(self):
"""Clean up all isolated environments."""
for env in self.environments.values():
env.cleanup()
self.environments.clear()
# Test decorators and utilities
def capability_test(test_type: TestType = TestType.UNIT):
"""Decorator to mark a function as a capability test."""
def decorator(func):
func.test_type = test_type
return func
return decorator
def integration_test(func):
"""Decorator to mark a function as an integration test."""
return capability_test(TestType.INTEGRATION)(func)
def security_test(func):
"""Decorator to mark a function as a security test."""
return capability_test(TestType.SECURITY)(func)
def performance_test(func):
"""Decorator to mark a function as a performance test."""
return capability_test(TestType.PERFORMANCE)(func)
# Predefined test templates
class CapabilityTestTemplate:
"""Template class for common capability tests."""
@staticmethod
def test_import(capability_module: str):
"""Test that a capability can be imported."""
def test():
import importlib
importlib.import_module(capability_module)
return test
@staticmethod
def test_basic_functionality(capability_instance, test_method: str = "test"):
"""Test basic functionality of a capability."""
def test():
if hasattr(capability_instance, test_method):
method = getattr(capability_instance, test_method)
result = method()
assert result is not None, f"Method {test_method} returned None"
return test
@staticmethod
def test_error_handling(capability_instance):
"""Test error handling of a capability."""
def test():
# Test with invalid input
try:
if hasattr(capability_instance, 'process'):
capability_instance.process(None)
assert False, "Should have raised an error for None input"
except (ValueError, TypeError, AttributeError):
pass # Expected
return test