Skip to main content
Glama

Physics MCP Server

by BlinkZer0
worker.py82.3 kB
#!/usr/bin/env python3 """ Physics MCP Python Worker Handles symbolic computation, numerical analysis, and plotting operations via JSON-RPC over stdin/stdout. """ from __future__ import annotations import sys import json import math import traceback import signal import threading from typing import Any, Dict, Optional, Union, List from io import BytesIO import base64 import time import os from pathlib import Path # Import error handling and performance modules from src.error_handling import ( PhysicsError, ValidationError, ComputationError, UnitsError, ResourceError, wrap_tool_execution, create_error_response, generate_request_id, logger ) from src.performance import ( with_cache, gpu_fallback, chunked_processor, optimize_fft, optimize_plot_generation, get_performance_report, perf_monitor ) # Core computation libraries import sympy as sp from sympy import * import numpy as np import scipy.constants as const import pint # Plotting import matplotlib matplotlib.use("Agg") # Non-interactive backend import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D import matplotlib.animation as animation from matplotlib.animation import FFMpegWriter, PillowWriter # Phase 5: Advanced visualization imports try: import trimesh import trimesh.exchange.gltf import trimesh.exchange.ply _TRIMESH_AVAILABLE = True except ImportError: _TRIMESH_AVAILABLE = False try: import cv2 _OPENCV_AVAILABLE = True except ImportError: _OPENCV_AVAILABLE = False # Optional device-aware acceleration try: from accel import ( accel_init, accel_caps, accel_eval_parametric_1d, accel_eval_scalar_2d, accel_eval_vector_2d, accel_eval_scalar_3d, ) _ACCEL_INFO = accel_init() except Exception: def accel_caps(): return {"active": False, "device": "cpu", "mode": "cpu", "has_torch": False} _ACCEL_INFO = accel_caps() # Optional quantum physics library try: _HAS_QUTIP = True except ImportError: qutip = None # type: ignore _HAS_QUTIP = False # Phase 4 imports try: import data_io import signal_processing import external_apis import export_utils except ImportError as e: print(f"Warning: Phase 4 modules not available: {e}", file=sys.stderr) # Phase 6 ML imports try: import ml_augmentation except ImportError as e: print(f"Warning: Phase 6 ML modules not available: {e}", file=sys.stderr) # Phase 7 & 8 imports try: import distributed_collaboration import experiment_orchestrator except ImportError as e: print(f"Warning: Phase 7 & 8 modules not available: {e}", file=sys.stderr) # Graphing Calculator imports try: from graphing_calculator import GraphingCalculator except ImportError as e: print(f"Warning: Graphing Calculator modules not available: {e}", file=sys.stderr) EXECUTION_TIMEOUT = 10.0 # seconds MAX_ARRAY_SIZE = 100000 def load_config() -> Dict[str, Any]: """Load server configuration from config/server.config.json""" config_path = Path(__file__).parent.parent.parent / "config" / "server.config.json" try: if config_path.exists(): with open(config_path, 'r') as f: return json.load(f) except Exception as e: print(f"Warning: Could not load config: {e}", file=sys.stderr) # Default config return { "accel": {"mode": "auto", "device_preference": ["cuda", "hip", "mps", "xpu", "cpu"]}, "ml": { "default_backend": "torch", "max_vram_mb": 4096, "train": {"epochs": 200, "early_stop_patience": 20, "batch_size": 64, "lr": 1e-3}, "video": {"fps": 24, "encoder_mp4": "libx264", "encoder_webm": "libvpx-vp9"} } } SAFE_SYMPY_NAMESPACE = { # Core sympy functions 'sin', 'cos', 'tan', 'asin', 'acos', 'atan', 'atan2', 'sinh', 'cosh', 'tanh', 'asinh', 'acosh', 'atanh', 'exp', 'log', 'ln', 'sqrt', 'cbrt', 'Abs', 'sign', 'pi', 'E', 'I', 'oo', 'zoo', 'nan', 'Symbol', 'symbols', 'Function', 'Derivative', 'Integral', 'diff', 'integrate', 'limit', 'series', 'solve', 'dsolve', 'Matrix', 'eye', 'zeros', 'ones', 'diag', 'simplify', 'expand', 'factor', 'collect', 'cancel', 'apart', 'together', 'trigsimp', 'powsimp', 'radsimp', 'Eq', 'Ne', 'Lt', 'Le', 'Gt', 'Ge', 'And', 'Or', 'Not', 'Implies', 'Equivalent', 'Sum', 'Product', 'factorial', 'binomial', 'gamma', 'Rational', 'Float', 'Integer', 'Poly', 'roots', 'latex', 'pretty', 'pprint' } # Initialize unit registry ureg = pint.UnitRegistry() # CODATA constants with units - Extended set CODATA = { "c": const.c * ureg("m/s"), # Speed of light "h": const.h * ureg("J*s"), # Planck constant "hbar": const.hbar * ureg("J*s"), # Reduced Planck constant "e": const.e * ureg("C"), # Elementary charge "m_e": const.m_e * ureg("kg"), # Electron mass "m_p": const.m_p * ureg("kg"), # Proton mass "k_B": const.k * ureg("J/K"), # Boltzmann constant "N_A": const.N_A * ureg("1/mol"), # Avogadro constant "epsilon_0": const.epsilon_0 * ureg("F/m"), # Vacuum permittivity "mu_0": const.mu_0 * ureg("H/m"), # Vacuum permeability "G": const.G * ureg("m^3/(kg*s^2)"), # Gravitational constant "R": const.R * ureg("J/(mol*K)"), # Gas constant "sigma": const.sigma * ureg("W/(m^2*K^4)"), # Stefan-Boltzmann constant "a_0": const.physical_constants["Bohr radius"][0] * ureg("m"), # Bohr radius "alpha": const.alpha, # Fine structure constant (dimensionless) # Astrophysical constants "M_sun": 1.98847e30 * ureg("kg"), # Solar mass "pc": const.parsec * ureg("m"), # Parsec "ly": const.c * 365.25 * 24 * 3600 * ureg("m"), # Light year "au": const.au * ureg("m"), # Astronomical unit } class TimeoutError(Exception): """Custom timeout exception.""" pass def timeout_handler(signum, frame): """Signal handler for timeouts.""" raise TimeoutError("Operation timed out") def safe_sympify(expr_str: str, timeout: float = EXECUTION_TIMEOUT) -> sp.Basic: """Safely parse a sympy expression with timeout and restricted namespace.""" def parse_with_timeout(): # Create a restricted local namespace safe_locals = {} for name in SAFE_SYMPY_NAMESPACE: if hasattr(sp, name): safe_locals[name] = getattr(sp, name) # Add common mathematical constants safe_locals.update({ 'pi': sp.pi, 'e': sp.E, 'I': sp.I, 'oo': sp.oo, 'sin': sp.sin, 'cos': sp.cos, 'exp': sp.exp, 'log': sp.log }) return sp.sympify(expr_str, locals=safe_locals, evaluate=False) # Set up timeout if sys.platform != 'win32': signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(int(timeout)) try: result = parse_with_timeout() return result except Exception as e: raise ValueError(f"Failed to parse expression '{expr_str}': {e}") finally: if sys.platform != 'win32': signal.alarm(0) # Cancel the alarm def safe_evaluate(expr: sp.Basic, timeout: float = EXECUTION_TIMEOUT) -> sp.Basic: """Safely evaluate a sympy expression with timeout.""" def evaluate_with_timeout(): return expr.simplify() if sys.platform != 'win32': signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(int(timeout)) try: result = evaluate_with_timeout() return result finally: if sys.platform != 'win32': signal.alarm(0) def parse_quantity(value: Union[float, Dict[str, Any]]) -> Union[float, pint.Quantity]: """Parse a value that might have units.""" if isinstance(value, dict) and "value" in value: val = value["value"] unit = value.get("unit", "") if unit: return val * ureg(unit) return val return value def quantity_to_dict(q: Union[float, pint.Quantity]) -> Dict[str, Any]: """Convert a quantity back to dict format.""" if isinstance(q, pint.Quantity): return {"value": float(q.magnitude), "unit": str(q.units)} return {"value": float(q), "unit": ""} @wrap_tool_execution @with_cache(ttl_hours=2) def handle_cas_evaluate(params: Dict[str, Any]) -> Dict[str, Any]: """Evaluate a symbolic/numeric expression.""" expr_str = params["expr"] variables = params.get("vars", {}) # Parse the expression safely expr = safe_sympify(expr_str) # Substitute variables subs = {} for var_name, var_value in variables.items(): parsed_val = parse_quantity(var_value) # For symbolic computation, use magnitude if it's a quantity if isinstance(parsed_val, pint.Quantity): subs[sp.Symbol(var_name)] = parsed_val.magnitude else: subs[sp.Symbol(var_name)] = parsed_val # Substitute and simplify if subs: result_expr = expr.subs(subs) else: result_expr = expr simplified = safe_evaluate(result_expr) # Try to evaluate numerically evalf_result = None try: if simplified.is_real or simplified.is_complex: evalf_result = float(simplified.evalf()) except (TypeError, ValueError): pass return { "latex": sp.latex(simplified), "str": str(simplified), "evalf": evalf_result, "original": str(expr) } def handle_cas_diff(params: Dict[str, Any]) -> Dict[str, Any]: """Differentiate an expression.""" expr_str = params["expr"] symbol_str = params["symbol"] order = params.get("order", 1) expr = safe_sympify(expr_str) symbol = sp.Symbol(symbol_str) result = sp.diff(expr, symbol, order) return { "latex": sp.latex(result), "str": str(result), "original": f"d^{order}/d{symbol_str}^{order} ({expr_str})" } def handle_cas_integrate(params: Dict[str, Any]) -> Dict[str, Any]: """Integrate an expression.""" expr_str = params["expr"] symbol_str = params["symbol"] bounds = params.get("bounds") variables = params.get("vars", {}) expr = safe_sympify(expr_str) symbol = sp.Symbol(symbol_str) # Substitute variables if provided if variables: subs = {} for var_name, var_value in variables.items(): if var_name != symbol_str: # Don't substitute the integration variable parsed_val = parse_quantity(var_value) if isinstance(parsed_val, pint.Quantity): subs[sp.Symbol(var_name)] = parsed_val.magnitude else: subs[sp.Symbol(var_name)] = parsed_val if subs: expr = expr.subs(subs) if bounds: # Definite integral lower, upper = bounds result = sp.integrate(expr, (symbol, lower, upper)) else: # Indefinite integral result = sp.integrate(expr, symbol) # Try to evaluate numerically if definite evalf_result = None if bounds: try: evalf_result = float(result.evalf()) except (TypeError, ValueError): pass return { "latex": sp.latex(result), "str": str(result), "evalf": evalf_result, "definite": bounds is not None } def handle_cas_solve_equation(params: Dict[str, Any]) -> Dict[str, Any]: """Solve an algebraic equation.""" equation_str = params["equation"] symbol_str = params["symbol"] assumptions = params.get("assumptions", {}) # Create symbol with assumptions symbol_kwargs = {} if assumptions: for assumption, value in assumptions.items(): if assumption in ['real', 'positive', 'negative', 'integer', 'rational']: symbol_kwargs[assumption] = value symbol = sp.Symbol(symbol_str, **symbol_kwargs) # Parse equation (assume it's in the form "expr = 0" or "lhs = rhs") if "=" in equation_str: lhs, rhs = equation_str.split("=", 1) equation = sp.Eq(safe_sympify(lhs.strip()), safe_sympify(rhs.strip())) else: # Assume it's already in the form "expr = 0" equation = sp.Eq(safe_sympify(equation_str), 0) solutions = sp.solve(equation, symbol) # Try to evaluate solutions numerically numeric_solutions = [] for sol in solutions: try: numeric_val = complex(sol.evalf()) if numeric_val.imag == 0: numeric_solutions.append(float(numeric_val.real)) else: numeric_solutions.append(numeric_val) except (TypeError, ValueError): numeric_solutions.append(None) return { "solutions": [str(sol) for sol in solutions], "latex_solutions": [sp.latex(sol) for sol in solutions], "numeric_solutions": numeric_solutions, "count": len(solutions), "assumptions": assumptions } def handle_cas_solve_ode(params: Dict[str, Any]) -> Dict[str, Any]: """Solve an ordinary differential equation.""" ode_str = params["ode"] symbol_str = params["symbol"] func_str = params["func"] initial_conditions = params.get("ics", {}) # Parse symbols and function x = sp.Symbol(symbol_str) y = sp.Function(func_str) # Enhanced ODE parsing - handle common forms ode_processed = ode_str # Replace derivative notation ode_processed = ode_processed.replace(f"{func_str}''", f"{func_str}(x).diff(x,2)") ode_processed = ode_processed.replace(f"{func_str}'", f"{func_str}(x).diff(x)") ode_processed = ode_processed.replace(f"{func_str}", f"{func_str}(x)") try: # Parse the ODE expression if "=" in ode_processed: lhs, rhs = ode_processed.split("=", 1) ode_expr = safe_sympify(lhs.strip()) - safe_sympify(rhs.strip()) else: ode_expr = safe_sympify(ode_processed) # Create the differential equation ode = sp.Eq(ode_expr, 0) # Solve the ODE general_solution = sp.dsolve(ode, y(x)) # Apply initial conditions if provided particular_solution = None if initial_conditions: try: # Extract constants from general solution constants = general_solution.free_symbols - {x} if constants and len(initial_conditions) >= len(constants): # Create system of equations from initial conditions ic_equations = [] solution_rhs = general_solution.rhs for condition, value in initial_conditions.items(): if condition.startswith(f"{func_str}("): # Extract x value from condition like "y(0)" x_val = float(condition.split("(")[1].split(")")[0]) ic_eq = sp.Eq(solution_rhs.subs(x, x_val), value) ic_equations.append(ic_eq) elif condition.startswith(f"{func_str}'("): # Derivative initial condition x_val = float(condition.split("(")[1].split(")")[0]) ic_eq = sp.Eq(solution_rhs.diff(x).subs(x, x_val), value) ic_equations.append(ic_eq) if ic_equations: const_values = sp.solve(ic_equations, list(constants)) if const_values: particular_solution = general_solution.subs(const_values) except Exception as e: # If IC application fails, just return general solution pass result = { "general_solution": str(general_solution), "latex": sp.latex(general_solution), "ode": str(ode), "ode_original": ode_str } if particular_solution: result.update({ "particular_solution": str(particular_solution), "particular_latex": sp.latex(particular_solution), "initial_conditions": initial_conditions }) return result except Exception as e: raise ValueError(f"Failed to solve ODE '{ode_str}': {e}") def handle_cas_propagate_uncertainty(params: Dict[str, Any]) -> Dict[str, Any]: """Propagate uncertainty through an expression using linear approximation.""" expr_str = params["expr"] variables = params["vars"] # Dict of {var_name: {value, sigma, unit?}} # Parse the expression expr = safe_sympify(expr_str) # Extract variable information var_values = {} var_uncertainties = {} var_units = {} for var_name, var_info in variables.items(): var_values[var_name] = var_info["value"] var_uncertainties[var_name] = var_info["sigma"] if "unit" in var_info: var_units[var_name] = var_info["unit"] # Calculate partial derivatives partials = {} for var_name in variables.keys(): var_symbol = sp.Symbol(var_name) partial = sp.diff(expr, var_symbol) partials[var_name] = partial # Evaluate expression at mean values subs_dict = {sp.Symbol(name): value for name, value in var_values.items()} mean_value = float(expr.subs(subs_dict).evalf()) # Calculate uncertainty using linear propagation # σ_f² = Σ (∂f/∂x_i)² σ_x_i² variance = 0 partial_contributions = {} for var_name, partial_expr in partials.items(): partial_value = float(partial_expr.subs(subs_dict).evalf()) var_uncertainty = var_uncertainties[var_name] contribution = (partial_value * var_uncertainty) ** 2 variance += contribution partial_contributions[var_name] = { "partial_derivative": str(partial_expr), "partial_value": partial_value, "contribution": contribution } total_uncertainty = math.sqrt(variance) # Calculate relative uncertainty relative_uncertainty = abs(total_uncertainty / mean_value) if mean_value != 0 else float('inf') return { "expression": expr_str, "mean_value": mean_value, "uncertainty": total_uncertainty, "relative_uncertainty": relative_uncertainty, "result_with_uncertainty": f"{mean_value:.6g} ± {total_uncertainty:.6g}", "partial_contributions": partial_contributions, "latex": sp.latex(expr) } # Phase 5: Advanced Visualization Functions def handle_plot_volume_3d(params: Dict[str, Any]) -> Dict[str, Any]: """GPU-accelerated numeric sampling of scalar volumes with slices/isosurfaces.""" f_str = params["f"] x_range = params["x"] y_range = params["y"] z_range = params["z"] mode = params.get("mode", "slices") iso_level = params.get("iso_level", 0.0) emit_animation = params.get("emit_animation", False) animate_axis = params.get("animate_axis", "z") fps = params.get("fps", 24) format_type = params.get("format", "mp4") samples_cap = params.get("samples_cap", 160) allow_large = params.get("allow_large", False) # Parse ranges [min, max] or [min, max, steps] def parse_range(r, default_steps=50): if len(r) == 2: return r[0], r[1], default_steps elif len(r) == 3: return r[0], r[1], int(r[2]) else: raise ValueError("Range must be [min, max] or [min, max, steps]") x_min, x_max, x_steps = parse_range(x_range) y_min, y_max, y_steps = parse_range(y_range) z_min, z_max, z_steps = parse_range(z_range) # Enforce caps if not allow_large: x_steps = min(x_steps, samples_cap) y_steps = min(y_steps, samples_cap) z_steps = min(z_steps, samples_cap) # Create coordinate grids x = np.linspace(x_min, x_max, x_steps) y = np.linspace(y_min, y_max, y_steps) z = np.linspace(z_min, z_max, z_steps) start_time = time.time() device_used = "cpu" fallback = False # Try GPU acceleration first try: if _ACCEL_INFO["active"]: # Use acceleration layer for 3D scalar evaluation from accel import accel_eval_scalar_3d X, Y, Z, F = accel_eval_scalar_3d(sp.sympify(f_str), x_min, x_max, y_min, y_max, z_min, z_max, min(x_steps, y_steps, z_steps)) device_used = _ACCEL_INFO["device"] else: raise RuntimeError("Acceleration not available") except Exception: # Fallback to CPU fallback = True X, Y, Z = np.meshgrid(x, y, z, indexing='ij') f_expr = sp.sympify(f_str) x_sym, y_sym, z_sym = sp.symbols('x y z') f_func = sp.lambdify((x_sym, y_sym, z_sym), f_expr, "numpy") F = f_func(X, Y, Z) device_used = "cpu" duration_ms = int((time.time() - start_time) * 1000) # Generate contact sheet (slices or isosurface previews) fig = plt.figure(figsize=(12, 9), dpi=100) if mode == "slices": # Show orthogonal slices mid_x, mid_y, mid_z = x_steps//2, y_steps//2, z_steps//2 # XY slice (constant Z) plt.subplot(2, 2, 1) plt.imshow(F[:, :, mid_z].T, extent=[x_min, x_max, y_min, y_max], origin='lower', cmap='viridis') plt.title(f'XY slice (z={z[mid_z]:.2f})') plt.xlabel('x') plt.ylabel('y') plt.colorbar() # XZ slice (constant Y) plt.subplot(2, 2, 2) plt.imshow(F[:, mid_y, :].T, extent=[x_min, x_max, z_min, z_max], origin='lower', cmap='viridis') plt.title(f'XZ slice (y={y[mid_y]:.2f})') plt.xlabel('x') plt.ylabel('z') plt.colorbar() # YZ slice (constant X) plt.subplot(2, 2, 3) plt.imshow(F[mid_x, :, :].T, extent=[y_min, y_max, z_min, z_max], origin='lower', cmap='viridis') plt.title(f'YZ slice (x={x[mid_x]:.2f})') plt.xlabel('y') plt.ylabel('z') plt.colorbar() # 3D isosurface preview ax = fig.add_subplot(2, 2, 4, projection='3d') # Sample a few isosurface levels levels = np.linspace(F.min(), F.max(), 3)[1:-1] # Skip min/max for level in levels: try: # Simple isosurface approximation using contour for k in range(0, z_steps, max(1, z_steps//5)): cs = plt.contour(X[:, :, k], Y[:, :, k], F[:, :, k], levels=[level]) if cs.collections: for collection in cs.collections: for path in collection.get_paths(): vertices = path.vertices ax.plot(vertices[:, 0], vertices[:, 1], z[k], alpha=0.6) except: pass ax.set_xlabel('x') ax.set_ylabel('y') ax.set_zlabel('z') ax.set_title('3D Isosurfaces') elif mode == "isosurface": # Show isosurface at specified level ax = fig.add_subplot(1, 1, 1, projection='3d') # Create isosurface using marching cubes approximation try: # Simple isosurface visualization for k in range(0, z_steps, max(1, z_steps//10)): cs = plt.contour(X[:, :, k], Y[:, :, k], F[:, :, k], levels=[iso_level]) if cs.collections: for collection in cs.collections: for path in collection.get_paths(): vertices = path.vertices ax.plot(vertices[:, 0], vertices[:, 1], z[k], alpha=0.8, color='blue') ax.set_xlabel('x') ax.set_ylabel('y') ax.set_zlabel('z') ax.set_title(f'Isosurface at level {iso_level}') except Exception as e: plt.text(0.5, 0.5, f'Isosurface generation failed: {str(e)}', transform=ax.transAxes, ha='center', va='center') plt.tight_layout() # Save contact sheet buffer = BytesIO() plt.savefig(buffer, format='png', dpi=100, bbox_inches='tight') buffer.seek(0) png_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') plt.close() # Generate CSV data csv_lines = ["x,y,z,f"] for i in range(0, x_steps, max(1, x_steps//100)): # Sample for CSV for j in range(0, y_steps, max(1, y_steps//100)): for k in range(0, z_steps, max(1, z_steps//100)): csv_lines.append(f"{X[i,j,k]},{Y[i,j,k]},{Z[i,j,k]},{F[i,j,k]}") csv_data = "\n".join(csv_lines) result = { "png_contact_sheet_b64": png_b64, "csv_data": csv_data, "meta": { "device": device_used, "fallback": fallback, "mesh": [x_steps, y_steps, z_steps], "cached": False, "duration_ms": duration_ms, "mode": mode } } # Generate animation if requested if emit_animation: try: animation_path = f"volume_animation_{int(time.time())}.{format_type}" # Simple animation sweeping through one axis fig, ax = plt.subplots(figsize=(8, 6)) if animate_axis == "z": frames = range(0, z_steps, max(1, z_steps//30)) def animate(frame): ax.clear() ax.imshow(F[:, :, frame].T, extent=[x_min, x_max, y_min, y_max], origin='lower', cmap='viridis') ax.set_title(f'XY slice at z={z[frame]:.2f}') ax.set_xlabel('x') ax.set_ylabel('y') elif animate_axis == "y": frames = range(0, y_steps, max(1, y_steps//30)) def animate(frame): ax.clear() ax.imshow(F[:, frame, :].T, extent=[x_min, x_max, z_min, z_max], origin='lower', cmap='viridis') ax.set_title(f'XZ slice at y={y[frame]:.2f}') ax.set_xlabel('x') ax.set_ylabel('z') else: # x frames = range(0, x_steps, max(1, x_steps//30)) def animate(frame): ax.clear() ax.imshow(F[frame, :, :].T, extent=[y_min, y_max, z_min, z_max], origin='lower', cmap='viridis') ax.set_title(f'YZ slice at x={x[frame]:.2f}') ax.set_xlabel('y') ax.set_ylabel('z') anim = animation.FuncAnimation(fig, animate, frames=frames, interval=1000//fps) # Save animation if format_type == "mp4": writer = FFMpegWriter(fps=fps, codec='libx264') anim.save(animation_path, writer=writer) elif format_type == "gif": writer = PillowWriter(fps=fps) anim.save(animation_path, writer=writer) else: writer = FFMpegWriter(fps=fps, codec='libvpx-vp9') anim.save(animation_path, writer=writer) plt.close() result["animation_path"] = animation_path except Exception as e: result["animation_error"] = str(e) return result def handle_plot_animation(params: Dict[str, Any]) -> Dict[str, Any]: """Time evolution renderer for 2D functions.""" frame_expr = params["frame_expr"] x_range = params.get("x_range", [-5, 5, 100]) t_range = params["t_range"] renderer = params.get("renderer", "imshow") fps = params.get("fps", 24) format_type = params.get("format", "mp4") dpi = params.get("dpi", 120) emit_frames = params.get("emit_frames", False) emit_csv = params.get("emit_csv", False) frames_cap = params.get("frames_cap", 300) allow_large = params.get("allow_large", False) # Parse ranges def parse_range(r, default_steps=100): if len(r) == 2: return r[0], r[1], default_steps elif len(r) == 3: return r[0], r[1], int(r[2]) else: raise ValueError("Range must be [min, max] or [min, max, steps]") if x_range: x_min, x_max, x_steps = parse_range(x_range) t_min, t_max, t_steps = parse_range(t_range) # Enforce frame cap if not allow_large: t_steps = min(t_steps, frames_cap) start_time = time.time() device_used = "cpu" fallback = False # Create coordinate arrays if x_range: x = np.linspace(x_min, x_max, x_steps) t = np.linspace(t_min, t_max, t_steps) # Parse expression expr = sp.sympify(frame_expr) # Prepare animation fig, ax = plt.subplots(figsize=(10, 6), dpi=dpi) frames_data = [] csv_lines = [] if renderer == "line": # 1D line plot animation x_sym, t_sym = sp.symbols('x t') func = sp.lambdify((x_sym, t_sym), expr, "numpy") if emit_csv: csv_lines.append("t,x,f") def animate(frame_idx): ax.clear() t_val = t[frame_idx] y_vals = func(x, t_val) ax.plot(x, y_vals, 'b-', linewidth=2) ax.set_xlim(x_min, x_max) ax.set_ylim(np.min(y_vals) - 0.1, np.max(y_vals) + 0.1) ax.set_xlabel('x') ax.set_ylabel('f(x,t)') ax.set_title(f'Frame at t = {t_val:.3f}') ax.grid(True, alpha=0.3) if emit_csv: for xi, yi in zip(x, y_vals): csv_lines.append(f"{t_val},{xi},{yi}") if emit_frames: frames_data.append({"t": t_val, "x": x.tolist(), "y": y_vals.tolist()}) elif renderer == "imshow": # 2D heatmap animation if not x_range: raise ValueError("x_range required for imshow renderer") x_sym, t_sym = sp.symbols('x t') func = sp.lambdify((x_sym, t_sym), expr, "numpy") # Pre-compute all frames for consistent color scale all_frames = [] for t_val in t: frame_data = func(x, t_val) all_frames.append(frame_data) vmin, vmax = np.min(all_frames), np.max(all_frames) if emit_csv: csv_lines.append("t,x,f") def animate(frame_idx): ax.clear() t_val = t[frame_idx] frame_data = all_frames[frame_idx] im = ax.imshow(frame_data.reshape(1, -1), extent=[x_min, x_max, -0.5, 0.5], aspect='auto', cmap='viridis', vmin=vmin, vmax=vmax) ax.set_xlabel('x') ax.set_title(f'Frame at t = {t_val:.3f}') if emit_csv: for xi, fi in zip(x, frame_data): csv_lines.append(f"{t_val},{xi},{fi}") else: raise ValueError(f"Unknown renderer: {renderer}") # Create animation anim = animation.FuncAnimation(fig, animate, frames=len(t), interval=1000//fps, repeat=False) # Save animation animation_path = f"animation_{int(time.time())}.{format_type}" try: if format_type == "mp4": writer = FFMpegWriter(fps=fps, codec='libx264', bitrate=1800) anim.save(animation_path, writer=writer) elif format_type == "gif": writer = PillowWriter(fps=fps) anim.save(animation_path, writer=writer) elif format_type == "webm": writer = FFMpegWriter(fps=fps, codec='libvpx-vp9') anim.save(animation_path, writer=writer) else: raise ValueError(f"Unsupported format: {format_type}") except Exception as e: plt.close() return {"error": f"Animation encoding failed: {str(e)}"} plt.close() duration_ms = int((time.time() - start_time) * 1000) result = { "animation_path": animation_path, "meta": { "device": device_used, "fallback": fallback, "frames": len(t), "duration_ms": duration_ms, "renderer": renderer, "format": format_type } } if emit_csv and csv_lines: result["csv_data"] = "\n".join(csv_lines) if emit_frames: result["frames_data"] = frames_data return result def handle_plot_interactive(params: Dict[str, Any]) -> Dict[str, Any]: """Generate parameter sweep with UI spec for interactive controls.""" expr_str = params["expr"] x_range = params.get("x_range", [-5, 5, 100]) controls = params["controls"] renderer = params.get("renderer", "line") grid_limit = params.get("grid_limit", 24) # Parse x range def parse_range(r, default_steps=100): if len(r) == 2: return r[0], r[1], default_steps elif len(r) == 3: return r[0], r[1], int(r[2]) else: raise ValueError("Range must be [min, max] or [min, max, steps]") x_min, x_max, x_steps = parse_range(x_range) x = np.linspace(x_min, x_max, x_steps) # Parse expression expr = sp.sympify(expr_str) x_sym = sp.symbols('x') # Extract parameter symbols param_symbols = {} for control in controls: param_symbols[control["name"]] = sp.symbols(control["name"]) # Create lambdified function all_symbols = [x_sym] + list(param_symbols.values()) func = sp.lambdify(all_symbols, expr, "numpy") # Generate sparse grid of thumbnails thumbnails = [] param_combinations = [] # Create parameter grid (limited by grid_limit) import itertools # For each control, create a small set of values param_grids = [] for control in controls: n_vals = min(3, grid_limit // len(controls)) # Distribute grid points vals = np.linspace(control["min"], control["max"], n_vals) param_grids.append([(control["name"], val) for val in vals]) # Generate combinations for combination in itertools.product(*param_grids): if len(param_combinations) >= grid_limit: break param_dict = dict(combination) param_combinations.append(param_dict) # Evaluate function with these parameters param_values = [param_dict[name] for name in param_symbols.keys()] y_vals = func(x, *param_values) # Create thumbnail plot fig, ax = plt.subplots(figsize=(3, 2), dpi=80) if renderer == "line": ax.plot(x, y_vals, 'b-', linewidth=1.5) ax.set_xlim(x_min, x_max) elif renderer == "contour": # For contour, we'd need 2D data - simplified to line for now ax.plot(x, y_vals, 'b-', linewidth=1.5) ax.set_xlim(x_min, x_max) ax.set_xlabel('x', fontsize=8) ax.set_ylabel('f', fontsize=8) ax.tick_params(labelsize=6) # Add parameter values as title title_parts = [f"{k}={v:.2f}" for k, v in param_dict.items()] ax.set_title(", ".join(title_parts), fontsize=8) plt.tight_layout() # Save thumbnail buffer = BytesIO() plt.savefig(buffer, format='png', dpi=80, bbox_inches='tight') buffer.seek(0) png_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') plt.close() thumbnails.append({ "control_values": param_dict, "png_b64": png_b64 }) # Create UI spec ui_spec = { "type": "sliders", "controls": controls } result = { "thumbnails": thumbnails, "ui_spec": ui_spec, "meta": { "device": "cpu", "cached": False, "grid_size": len(thumbnails), "renderer": renderer } } return result def handle_plot_vr_export(params: Dict[str, Any]) -> Dict[str, Any]: """Export 3D meshes/point fields to glTF 2.0 (GLB) and PLY formats.""" if not _TRIMESH_AVAILABLE: return {"error": "trimesh library not available. Install with: pip install trimesh"} geometry = params["geometry"] format_type = params.get("format", "glb") extras = params.get("extras", {}) vertices = np.array(geometry["vertices"]) faces = np.array(geometry["faces"]) normals = np.array(geometry.get("normals", [])) if geometry.get("normals") else None colors = np.array(geometry.get("colors", [])) if geometry.get("colors") else None try: # Create trimesh object mesh = trimesh.Trimesh(vertices=vertices, faces=faces) # Add normals if provided if normals is not None and len(normals) == len(vertices): mesh.vertex_normals = normals # Add colors if provided if colors is not None and len(colors) == len(vertices): mesh.visual.vertex_colors = colors # Generate filename timestamp = int(time.time()) if format_type == "glb": filename = f"mesh_{timestamp}.glb" # Export as GLB (binary glTF) mesh.export(filename) elif format_type == "ply": filename = f"mesh_{timestamp}.ply" # Export as PLY mesh.export(filename) else: return {"error": f"Unsupported format: {format_type}"} result = { "path": filename, "meta": { "vertices": len(vertices), "faces": len(faces), "format": format_type, "has_normals": normals is not None, "has_colors": colors is not None } } # Add extras to metadata if extras: result["meta"]["extras"] = extras return result except Exception as e: return {"error": f"3D export failed: {str(e)}"} def handle_units_convert(params: Dict[str, Any]) -> Dict[str, Any]: """Convert units using Pint.""" quantity = params["quantity"] # {value, unit} to_unit = params["to"] # Parse input quantity input_value = quantity["value"] input_unit = quantity["unit"] try: # Create Pint quantity input_quantity = input_value * ureg(input_unit) # Convert to target unit converted = input_quantity.to(to_unit) # Check dimensional compatibility dimensionality = str(converted.dimensionality) return { "input": quantity, "output": { "value": float(converted.magnitude), "unit": str(converted.units) }, "conversion_factor": float(converted.magnitude / input_value), "dimensionality": dimensionality } except Exception as e: raise ValueError(f"Unit conversion failed: {e}") def handle_constants_get(params: Dict[str, Any]) -> Dict[str, Any]: """Get a physical constant by name.""" name = params["name"] if name in CODATA: constant = CODATA[name] if isinstance(constant, pint.Quantity): return { "name": name, "value": float(constant.magnitude), "unit": str(constant.units), "dimensionality": str(constant.dimensionality), "description": f"CODATA physical constant: {name}" } else: # Dimensionless constant return { "name": name, "value": float(constant), "unit": "", "dimensionality": "dimensionless", "description": f"CODATA physical constant: {name}" } else: # Try to find similar constants similar = [k for k in CODATA.keys() if name.lower() in k.lower() or k.lower() in name.lower()] available = list(CODATA.keys()) raise ValueError(f"Unknown constant '{name}'. Similar: {similar}. Available: {available}") def handle_units_smart_eval(params: Dict[str, Any]) -> Dict[str, Any]: """Smart evaluation of expressions with units and constants.""" from src.units_smart import evaluate_with_units expr = params["expr"] constants = params.get("constants", {}) try: result = evaluate_with_units(expr, constants) return result except Exception as e: raise ValueError(f"Smart units evaluation failed: {e}") def handle_units_round_trip_test(params: Dict[str, Any]) -> Dict[str, Any]: """Test round-trip unit conversion accuracy.""" from src.units_smart import round_trip_test value = params["value"] from_unit = params["from_unit"] to_unit = params["to_unit"] tolerance = params.get("tolerance", 1e-9) try: result = round_trip_test(value, from_unit, to_unit, tolerance) return result except Exception as e: raise ValueError(f"Round-trip test failed: {e}") @wrap_tool_execution def handle_plot_function_2d(params: Dict[str, Any]) -> Dict[str, Any]: """Plot a 2D function.""" f_str = params["f"] x_min = params["x_min"] x_max = params["x_max"] samples = params.get("samples", 1000) # Optional styling parameters title = params.get("title", f"Plot of {f_str}") xlabel = params.get("xlabel", "x") ylabel = params.get("ylabel", "f(x)") dpi = params.get("dpi", 100) width = params.get("width", 8) height = params.get("height", 6) # Create x values x_vals = np.linspace(x_min, x_max, samples) # Parse and evaluate function x_sym = sp.Symbol('x') f_expr = sp.sympify(f_str) f_lambda = sp.lambdify(x_sym, f_expr, 'numpy') try: y_vals = f_lambda(x_vals) except Exception as e: raise ValueError(f"Error evaluating function: {e}") # Create plot plt.figure(figsize=(width, height), dpi=dpi) plt.plot(x_vals, y_vals, 'b-', linewidth=2) plt.title(title) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.grid(True, alpha=0.3) # Save to base64 PNG buffer = BytesIO() plt.savefig(buffer, format='png', dpi=dpi, bbox_inches='tight') buffer.seek(0) image_png_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') plt.close() # Generate CSV data csv_data = "x,y\n" + "\n".join(f"{x},{y}" for x, y in zip(x_vals, y_vals)) return { "image_png_b64": image_png_b64, "csv_data": csv_data, "x_range": [float(x_min), float(x_max)], "y_range": [float(np.min(y_vals)), float(np.max(y_vals))], "samples": samples } def handle_plot_parametric_2d(params: Dict[str, Any]) -> Dict[str, Any]: """Plot a 2D parametric curve.""" x_str = params["x_t"] y_str = params["y_t"] t_min = params["t_min"] t_max = params["t_max"] samples = params.get("samples", 1000) # Attempt accelerated sampling first, then fallback to CPU x_vals = y_vals = None try: x_expr = safe_sympify(x_str) y_expr = safe_sympify(y_str) # Try GPU/MPS/XPU path try: Xacc, Yacc = accel_eval_parametric_1d(x_expr, y_expr, t_min, t_max, samples) x_vals, y_vals = Xacc, Yacc except Exception as e: # Fallback to CPU t_vals = np.linspace(t_min, t_max, samples) t_sym = sp.Symbol('t') x_lambda = sp.lambdify(t_sym, x_expr, 'numpy') y_lambda = sp.lambdify(t_sym, y_expr, 'numpy') x_vals = x_lambda(t_vals) y_vals = y_lambda(t_vals) try: sys.stderr.write(f"[ACCEL] parametric_2d fallback to CPU: {e}\n") except Exception: pass except Exception as e: raise ValueError(f"Error evaluating parametric functions: {e}") # Create plot plt.figure(figsize=(8, 6), dpi=100) plt.plot(x_vals, y_vals, 'b-', linewidth=2) plt.title(f"Parametric Plot: x(t)={x_str}, y(t)={y_str}") plt.xlabel("x") plt.ylabel("y") plt.grid(True, alpha=0.3) plt.axis('equal') # Save to base64 PNG buffer = BytesIO() plt.savefig(buffer, format='png', dpi=100, bbox_inches='tight') buffer.seek(0) image_png_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') plt.close() return { "image_png_b64": image_png_b64, "t_range": [float(t_min), float(t_max)], "samples": samples } def handle_plot_field_2d(params: Dict[str, Any]) -> Dict[str, Any]: """Plot a 2D vector field.""" fx_str = params["fx"] fy_str = params["fy"] x_min = params["x_min"] x_max = params["x_max"] y_min = params["y_min"] y_max = params["y_max"] grid_points = params.get("grid_points", 20) plot_type = params.get("plot_type", "quiver") # Optional styling title = params.get("title", f"Vector Field: F = ({fx_str}, {fy_str})") xlabel = params.get("xlabel", "x") ylabel = params.get("ylabel", "y") dpi = params.get("dpi", 100) width = params.get("width", 8) height = params.get("height", 6) # Create coordinate grids via ACCEL if possible, else CPU x_sym, y_sym = sp.symbols('x y') fx_expr = safe_sympify(fx_str) fy_expr = safe_sympify(fy_str) try: try: X, Y, FX, FY = accel_eval_vector_2d(fx_expr, fy_expr, x_min, x_max, y_min, y_max, grid_points) except Exception as e: x_vals = np.linspace(x_min, x_max, grid_points) y_vals = np.linspace(y_min, y_max, grid_points) X, Y = np.meshgrid(x_vals, y_vals) fx_lambda = sp.lambdify((x_sym, y_sym), fx_expr, 'numpy') fy_lambda = sp.lambdify((x_sym, y_sym), fy_expr, 'numpy') FX = fx_lambda(X, Y) FY = fy_lambda(X, Y) try: sys.stderr.write(f"[ACCEL] field_2d fallback to CPU: {e}\n") except Exception: pass except Exception as e: raise ValueError(f"Error evaluating vector field: {e}") # Create plot plt.figure(figsize=(width, height), dpi=dpi) if plot_type == "quiver": # Quiver plot plt.quiver(X, Y, FX, FY, angles='xy', scale_units='xy', scale=1, alpha=0.8) elif plot_type == "stream": # Streamline plot plt.streamplot(X, Y, FX, FY, density=1.5, arrowsize=1.2, arrowstyle='->') else: raise ValueError(f"Unknown plot_type: {plot_type}") plt.title(title) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.grid(True, alpha=0.3) plt.axis('equal') # Save to base64 PNG buffer = BytesIO() plt.savefig(buffer, format='png', dpi=dpi, bbox_inches='tight') buffer.seek(0) image_png_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') plt.close() return { "image_png_b64": image_png_b64, "x_range": [float(x_min), float(x_max)], "y_range": [float(y_min), float(y_max)], "grid_points": grid_points, "plot_type": plot_type } def handle_plot_phase_portrait(params: Dict[str, Any]) -> Dict[str, Any]: """Plot a phase portrait for a 2D dynamical system.""" dx_str = params["dx"] # dx/dt dy_str = params["dy"] # dy/dt x_min = params["x_min"] x_max = params["x_max"] y_min = params["y_min"] y_max = params["y_max"] grid_points = params.get("grid_points", 20) # Optional styling title = params.get("title", f"Phase Portrait: dx/dt = {dx_str}, dy/dt = {dy_str}") xlabel = params.get("xlabel", "x") ylabel = params.get("ylabel", "y") dpi = params.get("dpi", 100) width = params.get("width", 8) height = params.get("height", 6) # Create coordinate grids and evaluate via ACCEL if possible, else CPU x_sym, y_sym = sp.symbols('x y') dx_expr = safe_sympify(dx_str) dy_expr = safe_sympify(dy_str) try: try: X, Y, DX, DY = accel_eval_vector_2d(dx_expr, dy_expr, x_min, x_max, y_min, y_max, grid_points) except Exception as e: x_vals = np.linspace(x_min, x_max, grid_points) y_vals = np.linspace(y_min, y_max, grid_points) X, Y = np.meshgrid(x_vals, y_vals) dx_lambda = sp.lambdify((x_sym, y_sym), dx_expr, 'numpy') dy_lambda = sp.lambdify((x_sym, y_sym), dy_expr, 'numpy') DX = dx_lambda(X, Y) DY = dy_lambda(X, Y) try: sys.stderr.write(f"[ACCEL] phase_portrait fallback to CPU: {e}\n") except Exception: pass except Exception as e: raise ValueError(f"Error evaluating dynamical system: {e}") # Normalize for better visualization M = np.sqrt(DX**2 + DY**2) M[M == 0] = 1 # Avoid division by zero DX_norm = DX / M DY_norm = DY / M # Create plot plt.figure(figsize=(width, height), dpi=dpi) # Plot direction field plt.quiver(X, Y, DX_norm, DY_norm, M, angles='xy', scale_units='xy', scale=1, alpha=0.7, cmap='viridis') # Add some sample trajectories try: from scipy.integrate import odeint def system(state, t): x, y = state return [float(dx_lambda(x, y)), float(dy_lambda(x, y))] # Sample initial conditions n_trajectories = 8 x_starts = np.linspace(x_min + 0.1*(x_max-x_min), x_max - 0.1*(x_max-x_min), n_trajectories//2) y_starts = np.linspace(y_min + 0.1*(y_max-y_min), y_max - 0.1*(y_max-y_min), n_trajectories//2) t = np.linspace(0, 2, 100) for x0 in x_starts: for y0 in y_starts: try: trajectory = odeint(system, [x0, y0], t) plt.plot(trajectory[:, 0], trajectory[:, 1], 'r-', alpha=0.6, linewidth=1) except: pass # Skip if integration fails except ImportError: pass # scipy not available plt.title(title) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.grid(True, alpha=0.3) plt.xlim(x_min, x_max) plt.ylim(y_min, y_max) # Save to base64 PNG buffer = BytesIO() plt.savefig(buffer, format='png', dpi=dpi, bbox_inches='tight') buffer.seek(0) image_png_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') plt.close() return { "image_png_b64": image_png_b64, "x_range": [float(x_min), float(x_max)], "y_range": [float(y_min), float(y_max)], "grid_points": grid_points, "system": {"dx_dt": dx_str, "dy_dt": dy_str} } def handle_plot_surface_3d(params: Dict[str, Any]) -> Dict[str, Any]: """Plot a 3D surface.""" f_str = params["f"] x_min = params["x_min"] x_max = params["x_max"] y_min = params["y_min"] y_max = params["y_max"] samples = params.get("samples", 50) # Optional styling title = params.get("title", f"3D Surface: z = {f_str}") xlabel = params.get("xlabel", "x") ylabel = params.get("ylabel", "y") zlabel = params.get("zlabel", "z") dpi = params.get("dpi", 100) width = params.get("width", 10) height = params.get("height", 8) # Limit samples for performance samples = min(samples, 100) # Create coordinate grids and evaluate via ACCEL if possible, else CPU x_sym, y_sym = sp.symbols('x y') f_expr = safe_sympify(f_str) try: try: X, Y, Z = accel_eval_scalar_2d(f_expr, x_min, x_max, y_min, y_max, samples) except Exception as e: x_vals = np.linspace(x_min, x_max, samples) y_vals = np.linspace(y_min, y_max, samples) X, Y = np.meshgrid(x_vals, y_vals) f_lambda = sp.lambdify((x_sym, y_sym), f_expr, 'numpy') Z = f_lambda(X, Y) try: sys.stderr.write(f"[ACCEL] surface_3d fallback to CPU: {e}\n") except Exception: pass except Exception as e: raise ValueError(f"Error evaluating function: {e}") # Create 3D plot fig = plt.figure(figsize=(width, height), dpi=dpi) ax = fig.add_subplot(111, projection='3d') # Surface plot surf = ax.plot_surface(X, Y, Z, cmap='viridis', alpha=0.8, linewidth=0, antialiased=True) # Add contour lines at the bottom ax.contour(X, Y, Z, zdir='z', offset=np.min(Z), cmap='viridis', alpha=0.5) ax.set_title(title) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) ax.set_zlabel(zlabel) # Add colorbar fig.colorbar(surf, shrink=0.5, aspect=5) # Save to base64 PNG buffer = BytesIO() plt.savefig(buffer, format='png', dpi=dpi, bbox_inches='tight') buffer.seek(0) image_png_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') plt.close() # Generate CSV data csv_data = "x,y,z\n" for i in range(samples): for j in range(samples): csv_data += f"{X[i,j]},{Y[i,j]},{Z[i,j]}\n" return { "image_png_b64": image_png_b64, "csv_data": csv_data, "x_range": [float(x_min), float(x_max)], "y_range": [float(y_min), float(y_max)], "z_range": [float(np.min(Z)), float(np.max(Z))], "samples": samples } def handle_plot_contour_2d(params: Dict[str, Any]) -> Dict[str, Any]: """Plot 2D contour lines.""" f_str = params["f"] x_min = params["x_min"] x_max = params["x_max"] y_min = params["y_min"] y_max = params["y_max"] levels = params.get("levels", 15) samples = params.get("samples", 100) # Optional styling title = params.get("title", f"Contour Plot: {f_str}") xlabel = params.get("xlabel", "x") ylabel = params.get("ylabel", "y") dpi = params.get("dpi", 100) width = params.get("width", 8) height = params.get("height", 6) # Create coordinate grids x_vals = np.linspace(x_min, x_max, samples) y_vals = np.linspace(y_min, y_max, samples) X, Y = np.meshgrid(x_vals, y_vals) # Parse and evaluate function x_sym, y_sym = sp.symbols('x y') f_expr = safe_sympify(f_str) f_lambda = sp.lambdify((x_sym, y_sym), f_expr, 'numpy') try: Z = f_lambda(X, Y) except Exception as e: raise ValueError(f"Error evaluating function: {e}") # Create plot plt.figure(figsize=(width, height), dpi=dpi) # Contour plot with filled contours contour_filled = plt.contourf(X, Y, Z, levels=levels, cmap='viridis', alpha=0.8) contour_lines = plt.contour(X, Y, Z, levels=levels, colors='black', alpha=0.4, linewidths=0.5) # Add labels to contour lines plt.clabel(contour_lines, inline=True, fontsize=8) plt.title(title) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.colorbar(contour_filled) plt.grid(True, alpha=0.3) # Save to base64 PNG buffer = BytesIO() plt.savefig(buffer, format='png', dpi=dpi, bbox_inches='tight') buffer.seek(0) image_png_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') plt.close() return { "image_png_b64": image_png_b64, "x_range": [float(x_min), float(x_max)], "y_range": [float(y_min), float(y_max)], "z_range": [float(np.min(Z)), float(np.max(Z))], "levels": levels, "samples": samples } @wrap_tool_execution @with_cache(ttl_hours=4) def handle_tensor_algebra(params: Dict[str, Any]) -> Dict[str, Any]: """Advanced tensor algebra operations.""" from src.tensor_algebra import ( tensor_algebra_compute, schwarzschild_metric, kerr_metric, TensorField, christoffel_symbols ) metric = params.get("metric", []) coords = params.get("coords", []) compute = params.get("compute", []) try: # Handle special metrics if isinstance(metric, str): if metric.lower() == "schwarzschild": schwarzschild_result = schwarzschild_metric() return { "metric_type": "schwarzschild", "metric": schwarzschild_result["metric"], "coordinates": schwarzschild_result["coordinates"], "physical_properties": { "schwarzschild_radius": schwarzschild_result["schwarzschild_radius"], "singularities": schwarzschild_result["singularities"], "applications": schwarzschild_result["applications"] } } elif metric.lower() == "kerr": kerr_result = kerr_metric() return { "metric_type": "kerr", "metric": kerr_result["metric"], "coordinates": kerr_result["coordinates"], "physical_properties": { "angular_momentum": kerr_result["angular_momentum"], "ergosphere": kerr_result["ergosphere"], "event_horizon": kerr_result["event_horizon"], "applications": kerr_result["applications"] } } # Validate inputs if not metric or not coords or not compute: raise ValueError("metric, coords, and compute parameters are required") if len(metric) != len(coords) or len(metric[0]) != len(coords): raise ValueError("Metric dimensions must match coordinate dimensions") # Perform tensor algebra computations result = tensor_algebra_compute(metric, coords, compute) # Convert sympy expressions to strings for JSON serialization def serialize_sympy(obj): if hasattr(obj, '__iter__') and not isinstance(obj, str): if hasattr(obj, 'shape'): # numpy/sympy array return [[str(obj[i, j]) for j in range(obj.shape[1])] for i in range(obj.shape[0])] else: return [serialize_sympy(item) for item in obj] else: return str(obj) # Serialize results serialized_result = {} for key, value in result.items(): if key in ['christoffel_symbols', 'riemann_tensor', 'ricci_tensor', 'ricci_scalar', 'geodesics']: serialized_result[key] = serialize_sympy(value) else: serialized_result[key] = value return { "status": "success", "computation": "advanced_tensor_algebra", "results": serialized_result, "capabilities": [ "christoffel_symbols", "riemann_curvature", "ricci_tensor", "ricci_scalar", "einstein_tensor", "geodesic_equations", "schwarzschild_metric", "kerr_metric" ] } except Exception as e: raise ValueError(f"Tensor algebra computation failed: {e}") def handle_quantum_ops(params: Dict[str, Any]) -> Dict[str, Any]: """Quantum operator utilities (commutators, matrix representations).""" operators = params["operators"] task = params["task"] try: if task == "commutator": if len(operators) != 2: raise ValueError("Commutator requires exactly 2 operators") # Parse operators as sympy expressions A = safe_sympify(operators[0]) B = safe_sympify(operators[1]) # For symbolic operators, compute [A,B] = AB - BA commutator = A * B - B * A return { "operators": operators, "commutator": str(sp.simplify(commutator)), "latex": sp.latex(sp.simplify(commutator)), "task": task } elif task == "matrix_rep": if not _HAS_QUTIP: return { "error": "qutip not available", "message": "Install qutip for matrix representations of quantum operators", "operators": operators, "task": task } # Basic matrix representations for common operators matrices = {} for op_name in operators: if op_name.lower() in ["sigma_x", "pauli_x", "sx"]: matrices[op_name] = qutip.sigmax().full().tolist() elif op_name.lower() in ["sigma_y", "pauli_y", "sy"]: matrices[op_name] = qutip.sigmay().full().tolist() elif op_name.lower() in ["sigma_z", "pauli_z", "sz"]: matrices[op_name] = qutip.sigmaz().full().tolist() elif op_name.lower() in ["a", "annihilation"]: matrices[op_name] = qutip.destroy(4).full().tolist() # 4-level truncation elif op_name.lower() in ["a_dag", "creation"]: matrices[op_name] = qutip.create(4).full().tolist() else: matrices[op_name] = f"Unknown operator: {op_name}" return { "operators": operators, "matrices": matrices, "task": task } else: raise ValueError(f"Unknown task: {task}") except Exception as e: raise ValueError(f"Quantum operator computation failed: {e}") def handle_quantum_solve(params: Dict[str, Any]) -> Dict[str, Any]: """Quantum solver for standard problems or custom Hamiltonians.""" problem = params["problem"] hamiltonian = params.get("hamiltonian") solve_params = params.get("params", {}) try: if problem == "sho": # Simple Harmonic Oscillator n_levels = solve_params.get("n_levels", 5) omega = solve_params.get("omega", 1.0) hbar = solve_params.get("hbar", 1.0) # Energy eigenvalues: E_n = ħω(n + 1/2) energies = [hbar * omega * (n + 0.5) for n in range(n_levels)] # Wavefunctions (symbolic form) x = sp.Symbol('x') wavefunctions = [] for n in range(min(n_levels, 4)): # Limit for computational efficiency # Hermite polynomial approach (simplified) if n == 0: psi_n = f"(mω/πħ)^(1/4) * exp(-mωx²/2ħ)" elif n == 1: psi_n = f"(mω/πħ)^(1/4) * sqrt(2mω/ħ) * x * exp(-mωx²/2ħ)" else: psi_n = f"ψ_{n}(x) - use Hermite polynomials H_{n}" wavefunctions.append(psi_n) return { "problem": "sho", "parameters": {"omega": omega, "hbar": hbar, "n_levels": n_levels}, "energies": energies, "energy_units": "ħω", "wavefunctions": wavefunctions, "ground_state_energy": energies[0] } elif problem == "particle_in_box": # Infinite square well L = solve_params.get("L", 1.0) n_levels = solve_params.get("n_levels", 5) m = solve_params.get("m", 1.0) hbar = solve_params.get("hbar", 1.0) # Energy eigenvalues: E_n = n²π²ħ²/2mL² energies = [n**2 * sp.pi**2 * hbar**2 / (2 * m * L**2) for n in range(1, n_levels + 1)] # Wavefunctions: ψ_n(x) = sqrt(2/L) * sin(nπx/L) wavefunctions = [f"sqrt(2/L) * sin({n}πx/L)" for n in range(1, n_levels + 1)] return { "problem": "particle_in_box", "parameters": {"L": L, "m": m, "hbar": hbar, "n_levels": n_levels}, "energies": [float(E.evalf()) for E in energies], "energy_units": "ħ²π²/2mL²", "wavefunctions": wavefunctions } elif problem == "custom": if not hamiltonian: raise ValueError("Custom problem requires hamiltonian parameter") return { "problem": "custom", "hamiltonian": hamiltonian, "message": "Custom Hamiltonian solving requires numerical diagonalization. Use qutip.Qobj(H).eigenstates() for matrix Hamiltonians.", "suggestion": "Provide matrix representation or use symbolic eigenvalue solving with sympy" } else: raise ValueError(f"Unknown problem type: {problem}") except Exception as e: raise ValueError(f"Quantum solver failed: {e}") def handle_quantum_visualize(params: Dict[str, Any]) -> Dict[str, Any]: """Visualize quantum states (Bloch sphere, probability density).""" state = params["state"] kind = params["kind"] try: if kind == "bloch": if not _HAS_QUTIP: return { "error": "qutip not available", "message": "Install qutip for Bloch sphere visualization", "state": state, "kind": kind } # Parse state (simplified - assume it's a qubit state) # For demo, create a simple Bloch sphere representation try: # Parse state as |0⟩ + |1⟩ coefficients if "+" in state: # Superposition state bloch_vector = [1/sp.sqrt(2), 0, 1/sp.sqrt(2)] # |+⟩ state else: bloch_vector = [0, 0, 1] # |0⟩ state return { "state": state, "kind": "bloch", "bloch_vector": [float(x.evalf()) if hasattr(x, 'evalf') else float(x) for x in bloch_vector], "message": "Use qutip.Bloch() for interactive visualization" } except Exception: return { "error": "state_parse_failed", "message": "Could not parse quantum state. Use qutip.Qobj format.", "state": state } elif kind == "prob_density": return { "state": state, "kind": "prob_density", "message": "Probability density visualization requires wavefunction ψ(x). Use |ψ(x)|² plotting.", "suggestion": "Provide wavefunction expression and use plot_function_2d with f='abs(psi)**2'" } else: raise ValueError(f"Unknown visualization kind: {kind}") except Exception as e: raise ValueError(f"Quantum visualization failed: {e}") def handle_quantum_ops(params: Dict[str, Any]) -> Dict[str, Any]: """Handle quantum operator operations.""" from src.quantum import quantum_ops operators = params.get("operators", []) task = params.get("task", "matrix_rep") try: result = quantum_ops(operators, task) return result except Exception as e: raise ValueError(f"Quantum ops failed: {e}") def handle_quantum_solve(params: Dict[str, Any]) -> Dict[str, Any]: """Handle quantum problem solving.""" from src.quantum import quantum_solve problem = params.get("problem", "sho") hamiltonian = params.get("hamiltonian") problem_params = params.get("params", {}) try: result = quantum_solve(problem, hamiltonian, problem_params) return result except Exception as e: raise ValueError(f"Quantum solve failed: {e}") def handle_quantum_visualize(params: Dict[str, Any]) -> Dict[str, Any]: """Handle quantum state visualization.""" from src.quantum import quantum_visualize state = params.get("state", "1,0") kind = params.get("kind", "bloch") try: result = quantum_visualize(state, kind) # Save image as artifact if present if 'image' in result: artifacts_dir = Path("artifacts") artifacts_dir.mkdir(exist_ok=True) timestamp = int(time.time() * 1000) filename = f"quantum_{kind}_{timestamp}.png" filepath = artifacts_dir / filename # Decode base64 and save import base64 image_data = base64.b64decode(result['image']) with open(filepath, 'wb') as f: f.write(image_data) # Add artifact info result['artifacts'] = [{ 'type': 'image', 'path': str(filepath), 'description': f'Quantum {kind} visualization' }] return result except Exception as e: raise ValueError(f"Quantum visualization failed: {e}") def handle_report_generate(params: Dict[str, Any]) -> Dict[str, Any]: """Generate session report with Markdown output.""" from src.reporting import generate_session_report session_id = params.get("session_id", "default") title = params.get("title") author = params.get("author") include_sections = params.get("include", ["summary", "tools", "artifacts", "reproduce"]) format_type = params.get("format", "markdown") try: result = generate_session_report( session_id=session_id, title=title, author=author, include_sections=include_sections, format_type=format_type ) return result except Exception as e: raise ValueError(f"Report generation failed: {e}") def handle_job_submit(params: Dict[str, Any]) -> Dict[str, Any]: """Submit job for distributed execution.""" from src.reporting import submit_job job_type = params.get("job_type", "computation") job_params = params.get("parameters", {}) executor = params.get("executor", "local") priority = params.get("priority", "normal") try: result = submit_job( job_type=job_type, parameters=job_params, executor=executor, priority=priority ) return result except Exception as e: raise ValueError(f"Job submission failed: {e}") def handle_job_status(params: Dict[str, Any]) -> Dict[str, Any]: """Get status of submitted job.""" from src.reporting import get_job_status job_id = params.get("job_id") if not job_id: raise ValueError("job_id parameter is required") try: result = get_job_status(job_id) return result except Exception as e: raise ValueError(f"Failed to get job status: {e}") def handle_performance_report(params: Dict[str, Any]) -> Dict[str, Any]: """Get comprehensive performance report.""" try: report = get_performance_report() return { 'success': True, 'report': report } except Exception as e: raise ValueError(f"Failed to generate performance report: {e}") def handle_statmech_partition(params: Dict[str, Any]) -> Dict[str, Any]: """Statistical mechanics partition function and thermodynamic quantities.""" energy_levels = params.get("energy_levels", []) temperature = params.get("temperature", 300.0) # Kelvin degeneracies = params.get("degeneracies", None) try: if not energy_levels: raise ValueError("energy_levels parameter required") # Boltzmann constant k_B = 1.380649e-23 # J/K (CODATA 2018) beta = 1.0 / (k_B * temperature) # Convert energy levels to numpy array E = np.array(energy_levels, dtype=float) # Degeneracies (default to 1 for all levels) if degeneracies is None: g = np.ones_like(E) else: g = np.array(degeneracies, dtype=float) if len(g) != len(E): raise ValueError("degeneracies length must match energy_levels length") # Shift energies to avoid numerical overflow (subtract ground state) E_shifted = E - E[0] # Partition function Z = Σ g_i * exp(-β E_i) Z = np.sum(g * np.exp(-beta * E_shifted)) # Thermodynamic quantities ln_Z = np.log(Z) # Internal energy U = -∂ln(Z)/∂β = Σ E_i * g_i * exp(-β E_i) / Z U = np.sum(E * g * np.exp(-beta * E_shifted)) / Z # Heat capacity C_V = ∂U/∂T = k_B * β² * (⟨E²⟩ - ⟨E⟩²) E_avg = U E2_avg = np.sum(E**2 * g * np.exp(-beta * E_shifted)) / Z C_V = k_B * beta**2 * (E2_avg - E_avg**2) # Helmholtz free energy F = -k_B * T * ln(Z) F = -k_B * temperature * ln_Z # Entropy S = k_B * (ln(Z) + β * U) S = k_B * (ln_Z + beta * U) # Population probabilities populations = g * np.exp(-beta * E_shifted) / Z return { "temperature": temperature, "temperature_unit": "K", "energy_levels": energy_levels, "degeneracies": g.tolist(), "partition_function": float(Z), "ln_partition_function": float(ln_Z), "internal_energy": float(U), "internal_energy_unit": "J", "heat_capacity": float(C_V), "heat_capacity_unit": "J/K", "helmholtz_free_energy": float(F), "helmholtz_free_energy_unit": "J", "entropy": float(S), "entropy_unit": "J/K", "populations": populations.tolist(), "most_populated_level": int(np.argmax(populations)) } except Exception as e: raise ValueError(f"Statistical mechanics computation failed: {e}") def handle_request(msg: Dict[str, Any]) -> Dict[str, Any]: """Handle a single JSON-RPC request.""" method = msg["method"] params = msg.get("params", {}) config = load_config() # CAS methods if method == "cas_evaluate": return handle_cas_evaluate(params) elif method == "cas_diff": return handle_cas_diff(params) elif method == "cas_integrate": return handle_cas_integrate(params) elif method == "cas_solve_equation": return handle_cas_solve_equation(params) elif method == "cas_solve_ode": return handle_cas_solve_ode(params) elif method == "cas_propagate_uncertainty": return handle_cas_propagate_uncertainty(params) # Units and Constants methods elif method == "units_convert": return handle_units_convert(params) elif method == "units_smart_eval": return handle_units_smart_eval(params) elif method == "units_round_trip_test": return handle_units_round_trip_test(params) elif method == "constants_get": return handle_constants_get(params) # Plot methods elif method == "plot_function_2d": return handle_plot_function_2d(params) elif method == "plot_parametric_2d": return handle_plot_parametric_2d(params) elif method == "plot_field_2d": return handle_plot_field_2d(params) elif method == "plot_phase_portrait": return handle_plot_phase_portrait(params) elif method == "plot_surface_3d": return handle_plot_surface_3d(params) elif method == "plot_contour_2d": return handle_plot_contour_2d(params) elif method == "accel_caps": return accel_caps() # Phase 5: Advanced Visualization Methods elif method == "plot_volume_3d": return handle_plot_volume_3d(params) elif method == "plot_animation": return handle_plot_animation(params) elif method == "plot_interactive": return handle_plot_interactive(params) elif method == "plot_vr_export": return handle_plot_vr_export(params) # Phase 3 methods (Quantum MVP) elif method == "quantum_ops": return handle_quantum_ops(params) elif method == "quantum_solve": return handle_quantum_solve(params) elif method == "quantum_visualize": return handle_quantum_visualize(params) # Phase 3 methods (other) elif method == "tensor_algebra": return handle_tensor_algebra(params) elif method == "statmech_partition": return handle_statmech_partition(params) # Phase 4 methods - Data I/O elif method == "data_import_hdf5": return data_io.data_import_hdf5(**params) elif method == "data_import_fits": return data_io.data_import_fits(**params) elif method == "data_import_root": return data_io.data_import_root(**params) elif method == "data_export_hdf5": return data_io.data_export_hdf5(**params) # Phase 4 methods - Signal Processing elif method == "data_fft": return signal_processing.data_fft(**params) elif method == "data_filter": return signal_processing.data_filter(**params) elif method == "data_spectrogram": return signal_processing.data_spectrogram(**params) elif method == "data_wavelet": return signal_processing.data_wavelet(**params) # Phase 4 methods - External APIs elif method == "api_arxiv": return external_apis.api_arxiv(**params) elif method == "api_cern": return external_apis.api_cern(**params) elif method == "api_nasa": return external_apis.api_nasa(**params) elif method == "api_nist": return external_apis.api_nist(**params) # Phase 4 methods - Export elif method == "export_overleaf": return export_utils.export_overleaf(**params) elif method == "export_github": return export_utils.export_github(**params) elif method == "export_zenodo": return export_utils.export_zenodo(**params) elif method == "export_jupyter": return export_utils.export_jupyter(**params) # Reporting and orchestration methods elif method == "report_generate": return handle_report_generate(params) elif method == "job_submit": return handle_job_submit(params) elif method == "job_status": return handle_job_status(params) elif method == "performance_report": return handle_performance_report(params) # Phase 6 methods - ML/AI Augmentation elif method == "ml_symbolic_regression": return ml_augmentation.ml_symbolic_regression(params, config) elif method == "ml_surrogate_pde": return ml_augmentation.ml_surrogate_pde(params, config) elif method == "ml_pattern_recognition": return ml_augmentation.ml_pattern_recognition(params, config) elif method == "ml_explain_derivation": return ml_augmentation.ml_explain_derivation(params, config) # Phase 7 methods - Distributed Collaboration elif method == "distributed_job_submit": return distributed_collaboration.distributed_job_submit(params, config) elif method == "distributed_session_share": return distributed_collaboration.distributed_session_share(params, config) elif method == "distributed_lab_notebook": return distributed_collaboration.distributed_lab_notebook(params, config) elif method == "distributed_artifact_versioning": return distributed_collaboration.distributed_artifact_versioning(params, config) # Phase 8 methods - Experiment Orchestrator elif method == "orchestrator_define_dag": return experiment_orchestrator.orchestrator_define_dag(params, config) elif method == "orchestrator_validate_dag": return experiment_orchestrator.orchestrator_validate_dag(params, config) elif method == "orchestrator_run_dag": return experiment_orchestrator.orchestrator_run_dag(params, config) elif method == "orchestrator_publish_report": return experiment_orchestrator.orchestrator_publish_report(params, config) elif method == "orchestrator_collaborate_share": return experiment_orchestrator.orchestrator_collaborate_share(params, config) # Graphing Calculator methods elif method == "graphing_calculator": calculator = GraphingCalculator() operation = params.get('operation', '') return calculator.handle_operation(operation, params) else: raise ValueError(f"Unknown method: {method}") def main(): """Main worker loop - read JSON-RPC requests from stdin, write responses to stdout.""" for line in sys.stdin: line = line.strip() if not line: continue try: request = json.loads(line) result = handle_request(request) response = { "id": request.get("id"), "result": result } except Exception as e: response = { "id": request.get("id") if 'request' in locals() else None, "error": { "code": -32603, "message": str(e), "data": traceback.format_exc() } } print(json.dumps(response)) sys.stdout.flush() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BlinkZer0/Phys-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server