Skip to main content
Glama
helpers.py11.1 kB
""" Common test patterns and utilities. """ import os import sys import time import tempfile import shutil from pathlib import Path from typing import Dict, Any, List, Optional, Tuple from contextlib import contextmanager import subprocess # Add src to path sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src")) class TestEnvironment: """Manages test environment setup and cleanup.""" def __init__(self): self.temp_dirs: List[Path] = [] self.created_files: List[Path] = [] self.env_vars: Dict[str, str] = {} @contextmanager def temp_directory(self): """Create a temporary directory and auto-cleanup.""" temp_dir = Path(tempfile.mkdtemp(prefix="spice_test_")) self.temp_dirs.append(temp_dir) try: yield temp_dir finally: self._cleanup_temp_dir(temp_dir) def _cleanup_temp_dir(self, temp_dir: Path): """Clean up a temporary directory.""" try: if temp_dir.exists(): shutil.rmtree(temp_dir) self.temp_dirs.remove(temp_dir) except Exception as e: print(f"Warning: Failed to cleanup {temp_dir}: {e}") def create_env_file(self, api_key: str, extra_vars: Dict[str, str] = None) -> Path: """Create a temporary .env file for testing.""" with self.temp_directory() as temp_dir: env_file = temp_dir / ".env" content = f"DUNE_API_KEY={api_key}\n" if extra_vars: for key, value in extra_vars.items(): content += f"{key}={value}\n" env_file.write_text(content) self.created_files.append(env_file) return env_file def set_env_var(self, key: str, value: str): """Set environment variable for test.""" original_value = os.environ.get(key) os.environ[key] = value self.env_vars[key] = original_value or "UNSET" def cleanup_env_vars(self): """Restore environment variables.""" for key, original_value in self.env_vars.items(): if original_value == "UNSET": os.environ.pop(key, None) else: os.environ[key] = original_value self.env_vars.clear() def cleanup_all(self): """Clean up all resources.""" # Clean up temp directories for temp_dir in self.temp_dirs[:]: self._cleanup_temp_dir(temp_dir) # Clean up created files for file_path in self.created_files: try: if file_path.exists(): file_path.unlink() except Exception as e: print(f"Warning: Failed to delete {file_path}: {e}") self.created_files.clear() # Restore environment self.cleanup_env_vars() class PerformanceTimer: """Performance measurement utility.""" def __init__(self): self.start_time = None self.end_time = None self.checkpoints = {} def start(self): """Start timing.""" self.start_time = time.time() self.checkpoints = {} def checkpoint(self, name: str): """Record a checkpoint time.""" if self.start_time is not None: self.checkpoints[name] = time.time() - self.start_time def stop(self): """Stop timing.""" self.end_time = time.time() @property def duration(self) -> float: """Get total duration.""" if self.start_time and self.end_time: return self.end_time - self.start_time return 0.0 def get_checkpoint(self, name: str) -> Optional[float]: """Get checkpoint time.""" return self.checkpoints.get(name) def get_report(self) -> Dict[str, Any]: """Get performance report.""" report = { 'total_duration': self.duration, 'checkpoints': self.checkpoints.copy() } return report class RetryMechanism: """Generic retry mechanism with backoff.""" @staticmethod def retry_with_backoff(func, max_retries: int = 3, backoff_factor: float = 1.0, exceptions: Tuple = (Exception,), default_return: Any = None): """Retry function with exponential backoff.""" last_exception = None for attempt in range(max_retries + 1): try: return func() except exceptions as e: last_exception = e if attempt < max_retries: wait_time = backoff_factor * (2 ** attempt) time.sleep(wait_time) continue break return default_return class TestResultCollector: """Collect and aggregate test results.""" def __init__(self): self.results = [] self.start_time = None self.end_time = None def start_collection(self): """Start result collection.""" self.start_time = time.time() self.results = [] def add_result(self, test_name: str, success: bool, details: Dict[str, Any] = None, error: str = None): """Add a test result.""" result = { 'test_name': test_name, 'success': success, 'timestamp': time.time(), 'details': details or {}, 'error': error } self.results.append(result) def finish_collection(self): """Finish result collection.""" self.end_time = time.time() def get_summary(self) -> Dict[str, Any]: """Get collection summary.""" total_tests = len(self.results) passed_tests = sum(1 for r in self.results if r['success']) failed_tests = total_tests - passed_tests duration = (self.end_time or time.time()) - (self.start_time or time.time()) return { 'total_tests': total_tests, 'passed': passed_tests, 'failed': failed_tests, 'success_rate': passed_tests / total_tests if total_tests > 0 else 0, 'duration': duration, 'failed_tests': [r for r in self.results if not r['success']], 'passed_tests': [r for r in self.results if r['success']] } def export_to_file(self, output_path: Path): """Export results to file.""" import json summary = self.get_summary() report = { 'summary': summary, 'all_results': self.results, 'metadata': { 'start_time': self.start_time, 'end_time': self.end_time, 'total_duration': summary['duration'] } } output_path.write_text(json.dumps(report, indent=2, default=str)) class MCPToolSimulator: """Simulate MCP tool interactions.""" @staticmethod def simulate_tool_call(tool_instance, parameters: Dict[str, Any]) -> Dict[str, Any]: """Simulate an MCP tool call and normalize the response.""" try: if hasattr(tool_instance, 'execute'): # Execute synchronously result = tool_instance.execute(**parameters) else: raise ValueError("Tool instance doesn't have execute method") # Normalize response to MCP format if isinstance(result, dict): return { 'success': True, 'data': result, 'type': 'dict' } elif hasattr(result, 'to_dict'): return { 'success': True, 'data': result.to_dict(), 'type': 'dataframe' } else: return { 'success': True, 'data': str(result), 'type': 'string' } except Exception as e: return { 'success': False, 'error': str(e), 'error_type': type(e).__name__ } @staticmethod def validate_tool_schema(tool_instance) -> Dict[str, Any]: """Validate tool schema and parameter requirements.""" validation = { 'valid': True, 'errors': [], 'warnings': [] } try: if not hasattr(tool_instance, 'get_parameter_schema'): validation['valid'] = False validation['errors'].append("Tool doesn't have get_parameter_schema method") return validation schema = tool_instance.get_parameter_schema() if not isinstance(schema, dict): validation['valid'] = False validation['errors'].append("Schema is not a dictionary") return validation # Check required schema components if 'type' not in schema: validation['warnings'].append("Schema missing 'type' field") if 'properties' not in schema: validation['warnings'].append("Schema missing 'properties' field") elif not isinstance(schema['properties'], dict): validation['valid'] = False validation['errors'].append("Schema 'properties' is not a dictionary") return validation except Exception as e: validation['valid'] = False validation['errors'].append(f"Schema validation exception: {e}") return validation def run_external_command(cmd: List[str], cwd: Optional[Path] = None, timeout: int = 120) -> Dict[str, Any]: """Run an external command and capture output.""" try: result = subprocess.run( cmd, cwd=cwd, capture_output=True, text=True, timeout=timeout ) return { 'success': result.returncode == 0, 'returncode': result.returncode, 'stdout': result.stdout, 'stderr': result.stderr } except subprocess.TimeoutExpired: return { 'success': False, 'error': 'Command timed out', 'timeout': timeout } except Exception as e: return { 'success': False, 'error': str(e) } def merge_dict(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]: """Deep merge two dictionaries.""" result = base.copy() for key, value in override.items(): if key in result and isinstance(result[key], dict) and isinstance(value, dict): result[key] = merge_dict(result[key], value) else: result[key] = value return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/spice-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server