Skip to main content
Glama

Physics MCP Server

by BlinkZer0
performance.py16.1 kB
""" Performance enhancements for Phys-MCP with caching, GPU fallback, and chunked processing """ import hashlib import json import pickle import time from functools import wraps from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union, Callable import numpy as np from datetime import datetime, timedelta import psutil import threading import queue # Cache configuration CACHE_DIR = Path(".cache") CACHE_MAX_SIZE_MB = 500 CACHE_TTL_HOURS = 24 ENABLE_CACHE = True # Performance monitoring class PerformanceMonitor: """Monitor and track performance metrics""" def __init__(self): self.metrics = {} self.lock = threading.Lock() def record_metric(self, name: str, value: float, unit: str = 'ms', tags: Optional[Dict] = None): """Record a performance metric""" with self.lock: if name not in self.metrics: self.metrics[name] = [] self.metrics[name].append({ 'value': value, 'unit': unit, 'timestamp': datetime.now().isoformat(), 'tags': tags or {} }) # Keep only last 1000 entries per metric if len(self.metrics[name]) > 1000: self.metrics[name] = self.metrics[name][-1000:] def get_stats(self, name: str) -> Dict[str, float]: """Get statistics for a metric""" with self.lock: if name not in self.metrics or not self.metrics[name]: return {} values = [m['value'] for m in self.metrics[name]] return { 'count': len(values), 'mean': np.mean(values), 'median': np.median(values), 'min': np.min(values), 'max': np.max(values), 'std': np.std(values) } def get_all_stats(self) -> Dict[str, Dict[str, float]]: """Get statistics for all metrics""" return {name: self.get_stats(name) for name in self.metrics.keys()} # Global performance monitor perf_monitor = PerformanceMonitor() def generate_cache_key(func_name: str, args: tuple, kwargs: dict) -> str: """Generate cache key from function name and parameters""" # Create deterministic representation cache_data = { 'function': func_name, 'args': args, 'kwargs': {k: v for k, v in kwargs.items() if k not in ['emit_plots', 'emit_csv', 'emit_frames']} } # Convert to JSON string and hash cache_str = json.dumps(cache_data, sort_keys=True, default=str) return hashlib.sha256(cache_str.encode()).hexdigest() def get_cache_path(cache_key: str) -> Path: """Get cache file path""" CACHE_DIR.mkdir(exist_ok=True) return CACHE_DIR / f"{cache_key}.pkl" def is_cache_valid(cache_path: Path, ttl_hours: int = CACHE_TTL_HOURS) -> bool: """Check if cache file is valid (exists and not expired)""" if not cache_path.exists(): return False # Check TTL file_age = datetime.now() - datetime.fromtimestamp(cache_path.stat().st_mtime) return file_age < timedelta(hours=ttl_hours) def save_to_cache(cache_key: str, data: Any) -> bool: """Save data to cache""" if not ENABLE_CACHE: return False try: cache_path = get_cache_path(cache_key) # Check cache size limit cleanup_cache_if_needed() with open(cache_path, 'wb') as f: pickle.dump(data, f) return True except Exception as e: print(f"Cache save failed: {e}") return False def load_from_cache(cache_key: str) -> Optional[Any]: """Load data from cache""" if not ENABLE_CACHE: return None try: cache_path = get_cache_path(cache_key) if not is_cache_valid(cache_path): return None with open(cache_path, 'rb') as f: return pickle.load(f) except Exception as e: print(f"Cache load failed: {e}") return None def cleanup_cache_if_needed(): """Clean up cache if it exceeds size limit""" if not CACHE_DIR.exists(): return # Calculate total cache size total_size = sum(f.stat().st_size for f in CACHE_DIR.glob("*.pkl")) max_size_bytes = CACHE_MAX_SIZE_MB * 1024 * 1024 if total_size <= max_size_bytes: return # Remove oldest files until under limit cache_files = [(f, f.stat().st_mtime) for f in CACHE_DIR.glob("*.pkl")] cache_files.sort(key=lambda x: x[1]) # Sort by modification time current_size = total_size for cache_file, _ in cache_files: if current_size <= max_size_bytes: break file_size = cache_file.stat().st_size cache_file.unlink() current_size -= file_size def with_cache(ttl_hours: int = CACHE_TTL_HOURS): """Decorator to add caching to functions""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): # Generate cache key cache_key = generate_cache_key(func.__name__, args, kwargs) # Try to load from cache cached_result = load_from_cache(cache_key) if cached_result is not None: perf_monitor.record_metric(f"{func.__name__}_cache_hit", 0, 'count') return cached_result # Execute function start_time = time.time() result = func(*args, **kwargs) duration_ms = (time.time() - start_time) * 1000 # Save to cache save_to_cache(cache_key, result) # Record metrics perf_monitor.record_metric(f"{func.__name__}_cache_miss", duration_ms, 'ms') return result return wrapper return decorator class GPUFallback: """GPU acceleration with automatic CPU fallback""" def __init__(self): self.gpu_available = self._check_gpu_availability() self.gpu_memory_threshold = 0.9 # Use up to 90% of GPU memory def _check_gpu_availability(self) -> bool: """Check if GPU is available""" try: import torch return torch.cuda.is_available() except ImportError: return False def get_device(self, prefer_gpu: bool = True) -> str: """Get optimal device for computation""" if not prefer_gpu or not self.gpu_available: return 'cpu' try: import torch # Check GPU memory if torch.cuda.is_available(): memory_free = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated(0) memory_total = torch.cuda.get_device_properties(0).total_memory memory_usage = 1 - (memory_free / memory_total) if memory_usage < self.gpu_memory_threshold: return 'cuda' else: print(f"GPU memory usage too high ({memory_usage:.1%}), falling back to CPU") return 'cpu' except Exception as e: print(f"GPU check failed: {e}, falling back to CPU") return 'cpu' def with_fallback(self, func: Callable, *args, prefer_gpu: bool = True, **kwargs): """Execute function with GPU fallback""" device = self.get_device(prefer_gpu) try: return func(*args, device=device, **kwargs) except Exception as e: if device == 'cuda': print(f"GPU execution failed: {e}, retrying on CPU") return func(*args, device='cpu', **kwargs) else: raise e # Global GPU fallback instance gpu_fallback = GPUFallback() class ChunkedProcessor: """Process large datasets in chunks to manage memory""" def __init__(self, max_chunk_size: int = 10000, max_memory_mb: int = 1000): self.max_chunk_size = max_chunk_size self.max_memory_mb = max_memory_mb def calculate_chunk_size(self, data_size: int, item_size_bytes: int = 8) -> int: """Calculate optimal chunk size based on memory constraints""" # Estimate memory usage estimated_memory_mb = (data_size * item_size_bytes) / (1024 * 1024) if estimated_memory_mb <= self.max_memory_mb: return data_size # Process all at once # Calculate chunk size to stay within memory limit chunk_size = int((self.max_memory_mb * 1024 * 1024) / item_size_bytes) return min(chunk_size, self.max_chunk_size) def process_chunks(self, data: Union[List, np.ndarray], processor_func: Callable, combine_func: Optional[Callable] = None, **kwargs) -> Any: """Process data in chunks""" if len(data) == 0: return [] # Calculate chunk size item_size = data[0].nbytes if hasattr(data[0], 'nbytes') else 8 chunk_size = self.calculate_chunk_size(len(data), item_size) if chunk_size >= len(data): # Process all at once return processor_func(data, **kwargs) # Process in chunks results = [] for i in range(0, len(data), chunk_size): chunk = data[i:i + chunk_size] chunk_result = processor_func(chunk, **kwargs) results.append(chunk_result) # Log progress progress = min(100, (i + chunk_size) * 100 // len(data)) if i % (chunk_size * 10) == 0: # Log every 10 chunks print(f"Processing progress: {progress}%") # Combine results if combine_func: return combine_func(results) elif isinstance(results[0], np.ndarray): return np.concatenate(results) elif isinstance(results[0], list): return [item for sublist in results for item in sublist] else: return results # Global chunked processor chunked_processor = ChunkedProcessor() def optimize_fft(signal_data: np.ndarray, sample_rate: float, **kwargs) -> Dict[str, Any]: """Optimized FFT with caching and chunked processing""" def _compute_fft_chunk(chunk: np.ndarray, sample_rate: float) -> Dict[str, Any]: """Compute FFT for a chunk of data""" # Use GPU if available device = gpu_fallback.get_device() if device == 'cuda': try: import torch chunk_tensor = torch.from_numpy(chunk).cuda() fft_result = torch.fft.fft(chunk_tensor) frequencies = torch.fft.fftfreq(len(chunk), 1/sample_rate) return { 'frequencies': frequencies.cpu().numpy(), 'amplitudes': torch.abs(fft_result).cpu().numpy(), 'phases': torch.angle(fft_result).cpu().numpy() } except Exception as e: print(f"GPU FFT failed: {e}, falling back to CPU") # CPU fallback fft_result = np.fft.fft(chunk) frequencies = np.fft.fftfreq(len(chunk), 1/sample_rate) return { 'frequencies': frequencies, 'amplitudes': np.abs(fft_result), 'phases': np.angle(fft_result) } def _combine_fft_results(results: List[Dict]) -> Dict[str, Any]: """Combine FFT results from multiple chunks""" if len(results) == 1: return results[0] # Average the results (simple approach) combined = { 'frequencies': results[0]['frequencies'], 'amplitudes': np.mean([r['amplitudes'] for r in results], axis=0), 'phases': np.mean([r['phases'] for r in results], axis=0) } return combined # Process with chunking if needed if len(signal_data) > 50000: # Use chunking for large signals result = chunked_processor.process_chunks( data=signal_data.reshape(-1, min(10000, len(signal_data))), processor_func=lambda chunk: _compute_fft_chunk(chunk.flatten(), sample_rate), combine_func=_combine_fft_results ) else: result = _compute_fft_chunk(signal_data, sample_rate) return result @with_cache(ttl_hours=1) # Cache FFT results for 1 hour def cached_fft(signal_data: List[float], sample_rate: float) -> Dict[str, Any]: """Cached FFT computation""" signal_array = np.array(signal_data) return optimize_fft(signal_array, sample_rate) def optimize_plot_generation(plot_func: Callable, *args, **kwargs) -> Any: """Optimize plot generation with caching and performance monitoring""" start_time = time.time() try: # Check if we should use high DPI dpi = kwargs.get('dpi', 100) if dpi > 200: # High DPI plots are expensive, check system resources memory_percent = psutil.virtual_memory().percent if memory_percent > 80: print(f"High memory usage ({memory_percent}%), reducing DPI from {dpi} to 150") kwargs['dpi'] = 150 # Execute plot function result = plot_func(*args, **kwargs) # Record performance duration_ms = (time.time() - start_time) * 1000 perf_monitor.record_metric('plot_generation', duration_ms, 'ms', { 'plot_type': kwargs.get('plot_type', 'unknown'), 'dpi': kwargs.get('dpi', 100) }) return result except Exception as e: # Log performance even on failure duration_ms = (time.time() - start_time) * 1000 perf_monitor.record_metric('plot_generation_failed', duration_ms, 'ms') raise e def get_performance_report() -> Dict[str, Any]: """Get comprehensive performance report""" stats = perf_monitor.get_all_stats() # Add system info system_info = { 'cpu_percent': psutil.cpu_percent(interval=1), 'memory_percent': psutil.virtual_memory().percent, 'disk_usage_percent': psutil.disk_usage('.').percent } # Add GPU info if available if gpu_fallback.gpu_available: try: import torch if torch.cuda.is_available(): memory_allocated = torch.cuda.memory_allocated(0) memory_total = torch.cuda.get_device_properties(0).total_memory system_info['gpu_memory_percent'] = (memory_allocated / memory_total) * 100 except Exception: pass # Add cache info cache_info = { 'cache_enabled': ENABLE_CACHE, 'cache_size_mb': 0, 'cache_files': 0 } if CACHE_DIR.exists(): cache_files = list(CACHE_DIR.glob("*.pkl")) cache_info['cache_files'] = len(cache_files) cache_info['cache_size_mb'] = sum(f.stat().st_size for f in cache_files) / (1024 * 1024) return { 'performance_stats': stats, 'system_info': system_info, 'cache_info': cache_info, 'timestamp': datetime.now().isoformat() } def clear_cache(): """Clear all cached data""" if CACHE_DIR.exists(): for cache_file in CACHE_DIR.glob("*.pkl"): cache_file.unlink() print(f"Cleared cache directory: {CACHE_DIR}") def configure_performance( cache_enabled: bool = True, cache_size_mb: int = 500, cache_ttl_hours: int = 24, chunk_size: int = 10000 ): """Configure performance settings""" global ENABLE_CACHE, CACHE_MAX_SIZE_MB, CACHE_TTL_HOURS ENABLE_CACHE = cache_enabled CACHE_MAX_SIZE_MB = cache_size_mb CACHE_TTL_HOURS = cache_ttl_hours chunked_processor.max_chunk_size = chunk_size print(f"Performance configured: cache={cache_enabled}, size={cache_size_mb}MB, ttl={cache_ttl_hours}h")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BlinkZer0/Phys-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server