Skip to main content
Glama

Physics MCP Server

by BlinkZer0
utils.py10.5 kB
""" Utility functions for the Python worker Shared functions for artifact creation, plotting, and data handling """ import os import io import base64 import uuid import time from pathlib import Path import numpy as np import matplotlib.pyplot as plt from typing import Any, Optional, List try: import pandas as pd PANDAS_AVAILABLE = True except ImportError: PANDAS_AVAILABLE = False def fig_to_base64(fig) -> str: """Convert matplotlib figure to base64 encoded PNG""" buffer = io.BytesIO() fig.savefig(buffer, format='png', dpi=150, bbox_inches='tight') buffer.seek(0) image_png = buffer.getvalue() buffer.close() graphic = base64.b64encode(image_png) return graphic.decode('utf-8') def fig_to_svg(fig) -> str: """Convert matplotlib figure to SVG string""" buffer = io.StringIO() fig.savefig(buffer, format='svg', bbox_inches='tight') buffer.seek(0) svg_string = buffer.getvalue() buffer.close() return svg_string def create_csv_artifact(data: Any, filename_prefix: str, headers: Optional[List[str]] = None) -> str: """Create CSV artifact from data and return file path""" # Ensure artifacts directory exists os.makedirs("artifacts", exist_ok=True) # Convert data to DataFrame if isinstance(data, np.ndarray): if data.ndim == 1: df = pd.DataFrame(data, columns=['value']) elif data.ndim == 2: if headers and len(headers) == data.shape[1]: df = pd.DataFrame(data, columns=headers) else: df = pd.DataFrame(data) else: # Flatten higher dimensional arrays df = pd.DataFrame(data.flatten(), columns=['value']) elif isinstance(data, list): df = pd.DataFrame(data) elif isinstance(data, dict): df = pd.DataFrame(data) else: # Try to convert to DataFrame directly df = pd.DataFrame(data) # Generate filename timestamp = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S") filename = f"{filename_prefix}_{timestamp}.csv" filepath = os.path.join("artifacts", filename) # Save CSV df.to_csv(filepath, index=False) return filepath def safe_filename(name: str) -> str: """Convert string to safe filename""" # Remove or replace unsafe characters safe_chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_." return ''.join(c if c in safe_chars else '_' for c in name) def format_number(value: float, precision: int = 6) -> str: """Format number with appropriate precision""" if abs(value) < 1e-3 or abs(value) > 1e6: return f"{value:.{precision}e}" else: return f"{value:.{precision}f}".rstrip('0').rstrip('.') def create_artifact_metadata(tool_name: str, params: dict, result: dict) -> dict: """Create standardized metadata for artifacts""" return { "tool_name": tool_name, "parameters": params, "created_at": pd.Timestamp.now().isoformat(), "result_keys": list(result.keys()) if isinstance(result, dict) else [], "version": "1.0" } def ensure_numpy_array(data: Any) -> np.ndarray: """Ensure data is a numpy array""" if isinstance(data, np.ndarray): return data elif isinstance(data, list): return np.array(data) else: return np.array([data]) def validate_signal_data(signal_data: Any, min_length: int = 2) -> np.ndarray: """Validate and convert signal data to numpy array""" signal_array = ensure_numpy_array(signal_data) if signal_array.ndim != 1: raise ValueError("Signal data must be 1-dimensional") if len(signal_array) < min_length: raise ValueError(f"Signal data must have at least {min_length} samples") if not np.issubdtype(signal_array.dtype, np.number): raise ValueError("Signal data must be numeric") return signal_array.astype(np.float64) def setup_matplotlib_style(): """Set up consistent matplotlib styling""" plt.style.use('default') plt.rcParams.update({ 'figure.figsize': (10, 6), 'figure.dpi': 100, 'savefig.dpi': 150, 'font.size': 10, 'axes.titlesize': 12, 'axes.labelsize': 10, 'xtick.labelsize': 9, 'ytick.labelsize': 9, 'legend.fontsize': 9, 'grid.alpha': 0.3, 'lines.linewidth': 1.5, 'axes.grid': True }) def memory_usage_mb() -> float: """Get current memory usage in MB""" try: import psutil process = psutil.Process() return process.memory_info().rss / 1024 / 1024 except ImportError: return 0.0 def check_memory_limit(limit_mb: float = 1000.0) -> bool: """Check if memory usage is below limit""" current_usage = memory_usage_mb() return current_usage < limit_mb def truncate_large_arrays(data: np.ndarray, max_size: int = 100000) -> np.ndarray: """Truncate arrays that are too large for processing""" if data.size > max_size: # Take evenly spaced samples indices = np.linspace(0, len(data)-1, max_size, dtype=int) return data[indices] return data def validate_frequency_range(frequencies: np.ndarray, sample_rate: float) -> np.ndarray: """Validate frequency array is within Nyquist limit""" nyquist = sample_rate / 2 valid_freqs = frequencies[frequencies <= nyquist] if len(valid_freqs) < len(frequencies): print(f"Warning: {len(frequencies) - len(valid_freqs)} frequencies above Nyquist limit removed") return valid_freqs def create_time_axis(length: int, sample_rate: float) -> np.ndarray: """Create time axis for signal plotting""" return np.arange(length) / sample_rate def db_scale(magnitude: np.ndarray, reference: float = 1.0) -> np.ndarray: """Convert magnitude to dB scale""" return 20 * np.log10(np.abs(magnitude) / reference + 1e-12) def normalize_signal(signal: np.ndarray, method: str = "peak") -> np.ndarray: """Normalize signal using various methods""" if method == "peak": return signal / np.max(np.abs(signal)) elif method == "rms": rms = np.sqrt(np.mean(signal**2)) return signal / rms elif method == "energy": energy = np.sqrt(np.sum(signal**2)) return signal / energy else: return signal def find_peaks_simple(signal: np.ndarray, threshold: float = 0.5) -> np.ndarray: """Simple peak finding algorithm""" peaks = [] for i in range(1, len(signal) - 1): if (signal[i] > signal[i-1] and signal[i] > signal[i+1] and signal[i] > threshold * np.max(signal)): peaks.append(i) return np.array(peaks) def estimate_noise_floor(signal: np.ndarray, percentile: float = 10) -> float: """Estimate noise floor of signal""" return np.percentile(np.abs(signal), percentile) def apply_window_correction(signal: np.ndarray, window: np.ndarray) -> float: """Calculate window correction factor""" return np.sqrt(np.mean(window**2)) def create_frequency_axis(length: int, sample_rate: float) -> np.ndarray: """Create frequency axis for FFT results""" return np.fft.fftfreq(length, 1/sample_rate) def get_positive_frequencies(frequencies: np.ndarray, spectrum: np.ndarray): """Extract positive frequency components""" nyquist_idx = len(frequencies) // 2 return frequencies[:nyquist_idx], spectrum[:nyquist_idx] def calculate_spectral_centroid(frequencies: np.ndarray, magnitude: np.ndarray) -> float: """Calculate spectral centroid (center of mass of spectrum)""" return np.sum(frequencies * magnitude) / np.sum(magnitude) def calculate_spectral_bandwidth(frequencies: np.ndarray, magnitude: np.ndarray, centroid: Optional[float] = None) -> float: """Calculate spectral bandwidth""" if centroid is None: centroid = calculate_spectral_centroid(frequencies, magnitude) return np.sqrt(np.sum(((frequencies - centroid)**2) * magnitude) / np.sum(magnitude)) def zero_pad_signal(signal: np.ndarray, target_length: int) -> np.ndarray: """Zero-pad signal to target length""" if len(signal) >= target_length: return signal[:target_length] padding = target_length - len(signal) return np.pad(signal, (0, padding), mode='constant') def next_power_of_2(n: int) -> int: """Find next power of 2 greater than or equal to n""" return 2 ** int(np.ceil(np.log2(n))) def overlap_add(segments: List[np.ndarray], hop_size: int) -> np.ndarray: """Overlap-add reconstruction from signal segments""" if not segments: return np.array([]) segment_length = len(segments[0]) total_length = (len(segments) - 1) * hop_size + segment_length result = np.zeros(total_length) for i, segment in enumerate(segments): start_idx = i * hop_size end_idx = start_idx + len(segment) result[start_idx:end_idx] += segment return result def create_test_signal(duration: float, sample_rate: float, frequencies: List[float], amplitudes: List[float] = None, noise_level: float = 0.0) -> np.ndarray: """Create test signal with multiple frequency components""" if amplitudes is None: amplitudes = [1.0] * len(frequencies) t = np.arange(0, duration, 1/sample_rate) signal = np.zeros_like(t) for freq, amp in zip(frequencies, amplitudes): signal += amp * np.sin(2 * np.pi * freq * t) if noise_level > 0: noise = noise_level * np.random.randn(len(signal)) signal += noise return signal # Phase 6 ML utility functions def generate_session_id() -> str: """Generate unique session ID""" return f"session_{int(time.time())}_{str(uuid.uuid4())[:8]}" def ensure_artifacts_dir(session_id: str) -> Path: """Ensure artifacts directory exists and return path""" artifacts_dir = Path("artifacts") / session_id artifacts_dir.mkdir(parents=True, exist_ok=True) return artifacts_dir def encode_image_b64(image_path: Path) -> str: """Encode image file to base64 string""" with open(image_path, 'rb') as f: image_data = f.read() return base64.b64encode(image_data).decode('utf-8') # Initialize matplotlib style when module is imported setup_matplotlib_style()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BlinkZer0/Phys-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server