Skip to main content
Glama

qudi MCP Integration

by dirkenglund
rkhs_spline_projection.py15.6 kB
#!/usr/bin/env python3 """ RKHS Spline Kernel Projection for Spectrum Analysis Implements the mathematical framework: min_{f ∈ H_K} ||f||²_{H_K} + λ Σ(f(x_i) - y_i)² Yielding solution: f(x) = Σ α_i K(x, x_i) where α = (K + λI)⁻¹y """ import numpy as np import matplotlib matplotlib.use('Agg') # Use non-interactive backend import matplotlib.pyplot as plt from scipy.optimize import minimize from PIL import Image from scipy.signal import find_peaks import json # Handle cv2 import gracefully try: import cv2 CV2_AVAILABLE = True except ImportError: CV2_AVAILABLE = False print("⚠ OpenCV not available. Using synthetic data generation instead.") class RKHSSplineProjector: """ Reproducing Kernel Hilbert Space (RKHS) spline projection for spectrum smoothing. Parameters: ----------- epsilon : float, default=0.05 Kernel width parameter controlling local vs global fitting lambda_reg : float, default=0.001 Regularization parameter for noise suppression kernel_type : str, default='gaussian' Type of kernel ('gaussian', 'rbf', 'polynomial') """ def __init__(self, epsilon=0.05, lambda_reg=0.001, kernel_type='gaussian'): self.epsilon = epsilon self.lambda_reg = lambda_reg self.kernel_type = kernel_type self.x_train = None self.y_train = None self.alpha = None def _gaussian_kernel(self, x1, x2): """Gaussian RBF kernel: K(x1, x2) = exp(-||x1 - x2||² / (2ε²))""" return np.exp(-np.abs(x1 - x2)**2 / (2 * self.epsilon**2)) def _polynomial_kernel(self, x1, x2, degree=3): """Polynomial kernel: K(x1, x2) = (1 + x1·x2)^d""" return (1 + x1 * x2)**degree def _rbf_kernel(self, x1, x2): """Radial basis function kernel""" return np.exp(-np.abs(x1 - x2) / self.epsilon) def _compute_kernel_matrix(self, x1, x2=None): """Compute kernel matrix K_ij = K(x_i, x_j)""" if x2 is None: x2 = x1 K = np.zeros((len(x1), len(x2))) for i, xi in enumerate(x1): for j, xj in enumerate(x2): if self.kernel_type == 'gaussian': K[i, j] = self._gaussian_kernel(float(xi), float(xj)) elif self.kernel_type == 'rbf': K[i, j] = self._rbf_kernel(float(xi), float(xj)) elif self.kernel_type == 'polynomial': K[i, j] = self._polynomial_kernel(float(xi), float(xj)) else: raise ValueError(f"Unknown kernel type: {self.kernel_type}") return K def fit(self, x, y): """ Fit the RKHS spline to training data. Solves: α = (K + λI)⁻¹y Parameters: ----------- x : array-like Training input points (wavelengths) y : array-like Training output values (transmission) """ self.x_train = np.array(x) self.y_train = np.array(y) # Compute kernel matrix K = self._compute_kernel_matrix(self.x_train) # Add regularization: K + λI K_reg = K + self.lambda_reg * np.eye(len(self.x_train)) # Solve for coefficients: α = (K + λI)⁻¹y try: self.alpha = np.linalg.solve(K_reg, self.y_train) except np.linalg.LinAlgError: # Fallback to pseudoinverse if singular self.alpha = np.linalg.lstsq(K_reg, self.y_train, rcond=None)[0] return self def predict(self, x): """ Predict using fitted RKHS spline. f(x) = Σ α_i K(x, x_i) Parameters: ----------- x : array-like Points to predict at Returns: -------- y_pred : ndarray Predicted values """ if self.alpha is None: raise ValueError("Model must be fitted before prediction") x = np.array(x) K_pred = self._compute_kernel_matrix(self.x_train, x.reshape(-1, 1) if x.ndim == 1 else x) return K_pred.T @ self.alpha def get_rkhs_norm(self): """Compute RKHS norm ||f||²_{H_K} = α^T K α""" if self.alpha is None: raise ValueError("Model must be fitted before computing norm") K = self._compute_kernel_matrix(self.x_train) return self.alpha.T @ K @ self.alpha class SpectrumExtractor: """Extract data points from transmission spectrum images""" def __init__(self, image_path): self.image_path = image_path self.image = None self.x_data = None self.y_data = None def extract_spectrum_line(self, color_threshold=100): """ Extract spectrum line from image using color thresholding and contour detection. This is a simplified approach - for production use, consider using axiomatic-plots MCP server. """ if not CV2_AVAILABLE: raise ValueError("OpenCV not available for image processing") if self.image is None: self.load_image() # Convert to grayscale gray = cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY) # Apply threshold to isolate the spectrum line _, binary = cv2.threshold(gray, color_threshold, 255, cv2.THRESH_BINARY_INV) # Find contours contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # Extract points from largest contour (assumed to be spectrum line) if contours: largest_contour = max(contours, key=cv2.contourArea) points = largest_contour.reshape(-1, 2) # Sort by x-coordinate (wavelength) points = points[points[:, 0].argsort()] # Convert pixel coordinates to wavelength/transmission values # This requires calibration - using approximate scaling for demonstration height, width = gray.shape # Approximate wavelength range (adjust based on actual spectrum) wavelength_min, wavelength_max = 400, 700 # nm transmission_min, transmission_max = 0, 100 # % # Scale pixel coordinates self.x_data = wavelength_min + (points[:, 0] / width) * (wavelength_max - wavelength_min) self.y_data = transmission_max - (points[:, 1] / height) * (transmission_max - transmission_min) return self.x_data, self.y_data else: raise ValueError("No spectrum line detected in image") def load_image(self): """Load and preprocess the spectrum image""" if not CV2_AVAILABLE: raise ValueError("OpenCV not available for image loading") self.image = cv2.imread(self.image_path) if self.image is None: raise ValueError(f"Could not load image: {self.image_path}") return self def create_synthetic_transmission_spectrum(): """ Create synthetic transmission spectrum with double peak structure similar to the described spectrum around 500-550nm """ # Wavelength range wavelengths = np.linspace(400, 700, 200) # Double peak structure around 500-550nm peak1 = 30 * np.exp(-((wavelengths - 510)**2) / (2 * 15**2)) peak2 = 25 * np.exp(-((wavelengths - 540)**2) / (2 * 12**2)) # Baseline transmission baseline = 5 + 2 * np.sin((wavelengths - 400) / 50) # Combine peaks and baseline clean_spectrum = baseline + peak1 + peak2 # Add realistic noise np.random.seed(42) noise = 2 * np.random.normal(0, 1, len(wavelengths)) noisy_spectrum = clean_spectrum + noise return wavelengths, noisy_spectrum, clean_spectrum def analyze_spectrum_quality(original, smoothed, x_data): """Analyze the quality of spectrum reconstruction""" # Find peaks in both spectra orig_peaks, _ = find_peaks(original, height=np.max(original) * 0.3) smooth_peaks, _ = find_peaks(smoothed, height=np.max(smoothed) * 0.3) # Compute metrics mse = np.mean((original - smoothed)**2) peak_preservation = len(smooth_peaks) / len(orig_peaks) if len(orig_peaks) > 0 else 0 # Signal-to-noise improvement orig_noise = np.std(np.diff(original)) smooth_noise = np.std(np.diff(smoothed)) snr_improvement = orig_noise / smooth_noise if smooth_noise > 0 else np.inf return { 'mse': mse, 'peak_preservation': peak_preservation, 'snr_improvement': snr_improvement, 'original_peaks': len(orig_peaks), 'smoothed_peaks': len(smooth_peaks), 'peak_wavelengths_orig': x_data[orig_peaks] if len(orig_peaks) > 0 else [], 'peak_wavelengths_smooth': x_data[smooth_peaks] if len(smooth_peaks) > 0 else [] } def main(): """Main execution function""" print("RKHS Spline Kernel Projection for Transmission Spectrum Analysis") print("=" * 70) # Try to extract real data from transmission.png try: extractor = SpectrumExtractor('/Users/englund/Projects/2025-ai-experiments-MCPs/plot_extraction/transmission.png') x_data, y_data = extractor.extract_spectrum_line() print(f"✓ Successfully extracted {len(x_data)} data points from transmission.png") data_source = "extracted" except Exception as e: print(f"⚠ Could not extract from image: {e}") print("Using synthetic data with double peak structure...") x_data, y_data, true_clean = create_synthetic_transmission_spectrum() data_source = "synthetic" print(f"Data range: λ = {x_data.min():.1f}-{x_data.max():.1f} nm") print(f"Transmission range: {y_data.min():.1f}-{y_data.max():.1f}%") # Initialize RKHS projector with specified parameters projector = RKHSSplineProjector( epsilon=0.05, # ε ≈ 0.05 for optical spectra lambda_reg=0.001, # λ ≈ 0.001 for regularization kernel_type='gaussian' ) print(f"\nRKHS Parameters:") print(f" Kernel width (ε): {projector.epsilon}") print(f" Regularization (λ): {projector.lambda_reg}") print(f" Kernel type: {projector.kernel_type}") # Fit the RKHS spline print("\n📊 Fitting RKHS spline projection...") projector.fit(x_data, y_data) # Generate smooth predictions x_smooth = np.linspace(x_data.min(), x_data.max(), 500) y_smooth = projector.predict(x_smooth) # Compute RKHS norm rkhs_norm = projector.get_rkhs_norm() print(f"✓ RKHS norm ||f||²_{{H_K}}: {rkhs_norm:.6f}") # Analyze reconstruction quality y_reconstructed = projector.predict(x_data) metrics = analyze_spectrum_quality(y_data, y_reconstructed, x_data) print(f"\n📈 Reconstruction Quality:") print(f" MSE: {metrics['mse']:.6f}") print(f" Peak preservation: {metrics['peak_preservation']:.2f}") print(f" SNR improvement: {metrics['snr_improvement']:.2f}x") print(f" Original peaks detected: {metrics['original_peaks']}") print(f" Smoothed peaks detected: {metrics['smoothed_peaks']}") if len(metrics['peak_wavelengths_orig']) > 0: print(f" Peak wavelengths (original): {metrics['peak_wavelengths_orig']}") if len(metrics['peak_wavelengths_smooth']) > 0: print(f" Peak wavelengths (smoothed): {metrics['peak_wavelengths_smooth']}") # Create comprehensive visualization plt.figure(figsize=(15, 10)) # Main spectrum comparison plt.subplot(2, 2, 1) plt.plot(x_data, y_data, 'o', alpha=0.6, markersize=3, color='red', label='Original Data') plt.plot(x_smooth, y_smooth, '-', linewidth=2, color='blue', label='RKHS Projection') plt.xlabel('Wavelength (nm)') plt.ylabel('Transmission (%)') plt.title('RKHS Spline Kernel Projection\n' + f'ε={projector.epsilon}, λ={projector.lambda_reg}') plt.legend() plt.grid(True, alpha=0.3) # Residuals plt.subplot(2, 2, 2) residuals = y_data - projector.predict(x_data) plt.plot(x_data, residuals, 'o', alpha=0.6, markersize=3, color='green') plt.axhline(y=0, color='black', linestyle='--', alpha=0.5) plt.xlabel('Wavelength (nm)') plt.ylabel('Residuals') plt.title(f'Fitting Residuals (MSE: {metrics["mse"]:.6f})') plt.grid(True, alpha=0.3) # Kernel matrix visualization plt.subplot(2, 2, 3) # Sample points for kernel matrix visualization sample_indices = np.linspace(0, len(x_data)-1, min(50, len(x_data)), dtype=int) x_sample = x_data[sample_indices] K_sample = projector._compute_kernel_matrix(x_sample) im = plt.imshow(K_sample, cmap='viridis', aspect='auto') plt.colorbar(im) plt.xlabel('Sample Index') plt.ylabel('Sample Index') plt.title(f'Kernel Matrix K(x_i, x_j)\n({projector.kernel_type} kernel)') # Parameter sensitivity (different epsilon values) plt.subplot(2, 2, 4) epsilons = [0.01, 0.05, 0.1, 0.2] for eps in epsilons: temp_projector = RKHSSplineProjector(epsilon=eps, lambda_reg=0.001) temp_projector.fit(x_data, y_data) y_temp = temp_projector.predict(x_smooth) plt.plot(x_smooth, y_temp, label=f'ε={eps}', linewidth=1.5) plt.plot(x_data, y_data, 'o', alpha=0.4, markersize=2, color='red', label='Data') plt.xlabel('Wavelength (nm)') plt.ylabel('Transmission (%)') plt.title('Parameter Sensitivity (ε)') plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig('/Users/englund/Projects/2025-ai-experiments-MCPs/plot_extraction/rkhs_analysis.png', dpi=300, bbox_inches='tight') print("✓ Visualization saved as rkhs_analysis.png") # Save results to JSON results = { 'parameters': { 'epsilon': projector.epsilon, 'lambda_reg': projector.lambda_reg, 'kernel_type': projector.kernel_type }, 'data_source': data_source, 'data_points': len(x_data), 'wavelength_range': [float(x_data.min()), float(x_data.max())], 'transmission_range': [float(y_data.min()), float(y_data.max())], 'rkhs_norm': float(rkhs_norm), 'metrics': {k: float(v) if np.isscalar(v) else [float(x) for x in v] for k, v in metrics.items()}, 'mathematical_framework': { 'optimization_problem': 'min_{f ∈ H_K} ||f||²_{H_K} + λ Σ(f(x_i) - y_i)²', 'solution_form': 'f(x) = Σ α_i K(x, x_i)', 'coefficient_computation': 'α = (K + λI)⁻¹y' } } with open('/Users/englund/Projects/2025-ai-experiments-MCPs/plot_extraction/rkhs_results.json', 'w') as f: json.dump(results, f, indent=2) print(f"\n💾 Results saved to:") print(f" - rkhs_analysis.png (visualization)") print(f" - rkhs_results.json (numerical results)") print(f"\n🎯 Mathematical Framework Successfully Implemented:") print(f" ✓ Optimization: min_{{f ∈ H_K}} ||f||²_{{H_K}} + λ Σ(f(x_i) - y_i)²") print(f" ✓ Solution: f(x) = Σ α_i K(x, x_i)") print(f" ✓ Coefficients: α = (K + λI)⁻¹y") print(f" ✓ Double peak preservation with noise filtering") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dirkenglund/qudi-mcp-integration'

If you have feedback or need assistance with the MCP directory API, please join our Discord server