Skip to main content
Glama
manasp21

Psi-MCP: Advanced Quantum Systems MCP Server

by manasp21
ml.py22.1 kB
""" Quantum Machine Learning Module This module provides quantum machine learning functionality including quantum neural networks, variational classifiers, and quantum ML algorithms. """ import logging from typing import Dict, Any, List, Optional, Union, Tuple import asyncio import numpy as np logger = logging.getLogger(__name__) async def quantum_neural_network( input_data: List[List[float]], labels: List[int], n_qubits: int = 4, n_layers: int = 2, learning_rate: float = 0.1, epochs: int = 50 ) -> Dict[str, Any]: """ Train a quantum neural network. Args: input_data: Training input data labels: Training labels n_qubits: Number of qubits n_layers: Number of variational layers learning_rate: Learning rate epochs: Number of training epochs Returns: QNN training results """ logger.info(f"Training QNN with {n_qubits} qubits and {n_layers} layers") try: import pennylane as qml from pennylane import numpy as pnp # Validate input data if not input_data or not labels: raise ValueError("Input data and labels cannot be empty") input_data = np.array(input_data) labels = np.array(labels) # Ensure data dimensionality matches n_qubits if input_data.shape[1] > n_qubits: logger.warning(f"Input dimension {input_data.shape[1]} > n_qubits {n_qubits}, truncating data") input_data = input_data[:, :n_qubits] elif input_data.shape[1] < n_qubits: # Pad with zeros padding = np.zeros((input_data.shape[0], n_qubits - input_data.shape[1])) input_data = np.concatenate([input_data, padding], axis=1) # Create device dev = qml.device('default.qubit', wires=n_qubits) # Define QNN architecture def data_encoding(x): """Encode classical data into quantum states.""" for i in range(n_qubits): qml.RY(x[i], wires=i) def variational_layer(params): """Variational layer with parameterized gates.""" # Rotation gates for i in range(n_qubits): qml.RY(params[i], wires=i) # Entangling gates for i in range(n_qubits - 1): qml.CNOT(wires=[i, i + 1]) @qml.qnode(dev) def qnn_circuit(inputs, params): """Quantum neural network circuit.""" # Data encoding data_encoding(inputs) # Variational layers for layer in range(n_layers): layer_params = params[layer * n_qubits:(layer + 1) * n_qubits] variational_layer(layer_params) # Measurement return qml.expval(qml.PauliZ(0)) # Initialize parameters n_params = n_layers * n_qubits params = pnp.random.uniform(0, 2 * pnp.pi, n_params, requires_grad=True) # Define cost function def cost_function(params, X, y): predictions = [] for x in X: pred = qnn_circuit(x, params) predictions.append(pred) predictions = pnp.array(predictions) # Binary cross-entropy loss (adapted for quantum outputs) # Convert Pauli-Z expectation values (-1 to 1) to probabilities (0 to 1) probs = (predictions + 1) / 2 # Avoid log(0) issues probs = pnp.clip(probs, 1e-7, 1 - 1e-7) loss = -pnp.mean(y * pnp.log(probs) + (1 - y) * pnp.log(1 - probs)) return loss # Training loop optimizer = qml.AdamOptimizer(stepsize=learning_rate) cost_history = [] accuracy_history = [] for epoch in range(epochs): # Update parameters params, cost = optimizer.step_and_cost( lambda p: cost_function(p, input_data, labels), params ) cost_history.append(float(cost)) # Calculate accuracy predictions = [] for x in input_data: pred_val = qnn_circuit(x, params) # Convert to binary prediction pred_class = 1 if pred_val > 0 else 0 predictions.append(pred_class) accuracy = np.mean(np.array(predictions) == labels) accuracy_history.append(float(accuracy)) if epoch % 10 == 0: logger.info(f"Epoch {epoch}: Cost = {cost:.4f}, Accuracy = {accuracy:.4f}") # Final predictions final_predictions = [] prediction_probabilities = [] for x in input_data: pred_val = qnn_circuit(x, params) pred_class = 1 if pred_val > 0 else 0 pred_prob = (pred_val + 1) / 2 # Convert to probability final_predictions.append(pred_class) prediction_probabilities.append(float(pred_prob)) final_accuracy = np.mean(np.array(final_predictions) == labels) return { 'success': True, 'n_qubits': n_qubits, 'n_layers': n_layers, 'epochs': epochs, 'final_accuracy': float(final_accuracy), 'final_cost': float(cost_history[-1]), 'cost_history': cost_history, 'accuracy_history': accuracy_history, 'optimal_parameters': params.tolist(), 'predictions': final_predictions, 'prediction_probabilities': prediction_probabilities, 'training_samples': len(input_data) } except Exception as e: logger.error(f"Error in QNN training: {e}") return {'success': False, 'error': str(e)} async def variational_classifier( training_data: List[List[float]], training_labels: List[int], test_data: List[List[float]], ansatz_type: str = "basic", optimizer: str = "adam", max_iterations: int = 100 ) -> Dict[str, Any]: """ Train a variational quantum classifier. Args: training_data: Training feature vectors training_labels: Training labels test_data: Test feature vectors ansatz_type: Type of ansatz circuit optimizer: Classical optimizer max_iterations: Maximum training iterations Returns: Classification results """ logger.info(f"Training variational classifier with {ansatz_type} ansatz") try: import pennylane as qml from pennylane import numpy as pnp # Convert to numpy arrays X_train = np.array(training_data) y_train = np.array(training_labels) X_test = np.array(test_data) # Determine number of qubits based on feature dimension n_features = X_train.shape[1] n_qubits = max(2, min(n_features, 6)) # Limit qubits for simulation # Truncate or pad features if necessary if n_features > n_qubits: X_train = X_train[:, :n_qubits] X_test = X_test[:, :n_qubits] elif n_features < n_qubits: pad_train = np.zeros((X_train.shape[0], n_qubits - n_features)) pad_test = np.zeros((X_test.shape[0], n_qubits - n_features)) X_train = np.concatenate([X_train, pad_train], axis=1) X_test = np.concatenate([X_test, pad_test], axis=1) # Normalize features X_train = X_train / (np.linalg.norm(X_train, axis=1, keepdims=True) + 1e-8) X_test = X_test / (np.linalg.norm(X_test, axis=1, keepdims=True) + 1e-8) # Create device dev = qml.device('default.qubit', wires=n_qubits) # Define ansatz circuits def basic_ansatz(x, params): # Data encoding for i in range(n_qubits): qml.RY(x[i] * np.pi, wires=i) # Variational circuit for i in range(n_qubits): qml.RY(params[i], wires=i) for i in range(n_qubits - 1): qml.CNOT(wires=[i, i + 1]) def hardware_efficient_ansatz(x, params): # Data encoding for i in range(n_qubits): qml.RY(x[i] * np.pi, wires=i) # Hardware efficient layers layer_size = n_qubits * 2 # RY and RZ for each qubit n_layers = len(params) // layer_size for layer in range(n_layers): for i in range(n_qubits): qml.RY(params[layer * layer_size + i], wires=i) qml.RZ(params[layer * layer_size + n_qubits + i], wires=i) for i in range(n_qubits - 1): qml.CNOT(wires=[i, i + 1]) # Choose ansatz if ansatz_type == "hardware_efficient": ansatz = hardware_efficient_ansatz n_params = n_qubits * 4 # 2 layers, 2 params per qubit per layer else: ansatz = basic_ansatz n_params = n_qubits @qml.qnode(dev) def classifier_circuit(x, params): ansatz(x, params) return qml.expval(qml.PauliZ(0)) # Initialize parameters params = pnp.random.uniform(0, 2 * pnp.pi, n_params, requires_grad=True) # Cost function def cost_fn(params): predictions = pnp.array([classifier_circuit(x, params) for x in X_train]) # Convert labels to {-1, 1} format target = 2 * y_train - 1 # Squared loss loss = pnp.mean((predictions - target) ** 2) return loss # Training if optimizer == "adam": opt = qml.AdamOptimizer(stepsize=0.1) else: opt = qml.GradientDescentOptimizer(stepsize=0.1) costs = [] for iteration in range(max_iterations): params, cost = opt.step_and_cost(cost_fn, params) costs.append(float(cost)) if iteration % 20 == 0: logger.info(f"Iteration {iteration}: Cost = {cost:.4f}") # Early stopping if iteration > 10 and abs(costs[-1] - costs[-10]) < 1e-6: break # Test predictions test_predictions = [] test_scores = [] for x in X_test: score = classifier_circuit(x, params) prediction = 1 if score > 0 else 0 test_predictions.append(prediction) test_scores.append(float(score)) # Training accuracy train_predictions = [] for x in X_train: score = classifier_circuit(x, params) prediction = 1 if score > 0 else 0 train_predictions.append(prediction) train_accuracy = np.mean(np.array(train_predictions) == y_train) return { 'success': True, 'ansatz_type': ansatz_type, 'n_qubits': n_qubits, 'n_parameters': n_params, 'training_samples': len(X_train), 'test_samples': len(X_test), 'train_accuracy': float(train_accuracy), 'final_cost': float(costs[-1]), 'cost_history': costs, 'test_predictions': test_predictions, 'test_scores': test_scores, 'optimal_parameters': params.tolist(), 'iterations': len(costs) } except Exception as e: logger.error(f"Error in variational classifier: {e}") return {'success': False, 'error': str(e)} async def quantum_kernel_method( training_data: List[List[float]], training_labels: List[int], test_data: List[List[float]], kernel_type: str = "basic", gamma: float = 1.0 ) -> Dict[str, Any]: """ Implement quantum kernel methods for classification. Args: training_data: Training feature vectors training_labels: Training labels test_data: Test feature vectors kernel_type: Type of quantum kernel gamma: Kernel parameter Returns: Quantum kernel classification results """ logger.info(f"Running quantum kernel method with {kernel_type} kernel") try: import pennylane as qml from pennylane import numpy as pnp X_train = np.array(training_data) y_train = np.array(training_labels) X_test = np.array(test_data) n_features = X_train.shape[1] n_qubits = min(n_features, 4) # Limit for simulation # Adjust feature dimension if n_features > n_qubits: X_train = X_train[:, :n_qubits] X_test = X_test[:, :n_qubits] # Normalize X_train = X_train / (np.linalg.norm(X_train, axis=1, keepdims=True) + 1e-8) X_test = X_test / (np.linalg.norm(X_test, axis=1, keepdims=True) + 1e-8) dev = qml.device('default.qubit', wires=n_qubits) def feature_map(x): """Quantum feature map.""" if kernel_type == "basic": for i in range(n_qubits): qml.RY(x[i] * np.pi, wires=i) for i in range(n_qubits - 1): qml.CNOT(wires=[i, i + 1]) elif kernel_type == "pauli": # Pauli feature map for i in range(n_qubits): qml.RY(x[i] * np.pi, wires=i) for i in range(n_qubits): for j in range(i + 1, n_qubits): qml.CNOT(wires=[i, j]) qml.RZ(x[i] * x[j] * gamma, wires=j) qml.CNOT(wires=[i, j]) else: # Default feature map for i in range(n_qubits): qml.RY(x[i] * np.pi, wires=i) @qml.qnode(dev) def kernel_circuit(x1, x2): \"\"\"Compute quantum kernel between two data points.\"\"\"\n feature_map(x1)\n qml.adjoint(feature_map)(x2)\n return qml.probs(wires=range(n_qubits))\n \n def quantum_kernel(x1, x2):\n \"\"\"Quantum kernel function.\"\"\"\n probs = kernel_circuit(x1, x2)\n # Kernel is the probability of measuring |0...0⟩\n return probs[0]\n \n # Compute kernel matrix\n logger.info(\"Computing kernel matrix...\")\n n_train = len(X_train)\n n_test = len(X_test)\n \n K_train = np.zeros((n_train, n_train))\n for i in range(n_train):\n for j in range(i, n_train):\n k_val = quantum_kernel(X_train[i], X_train[j])\n K_train[i, j] = k_val\n K_train[j, i] = k_val # Kernel is symmetric\n \n K_test = np.zeros((n_test, n_train))\n for i in range(n_test):\n for j in range(n_train):\n K_test[i, j] = quantum_kernel(X_test[i], X_train[j])\n \n # Solve kernel ridge regression\n # K * alpha = y\n regularization = 1e-6\n K_reg = K_train + regularization * np.eye(n_train)\n \n try:\n alpha = np.linalg.solve(K_reg, y_train)\n except np.linalg.LinAlgError:\n # Fallback to pseudo-inverse\n alpha = np.linalg.pinv(K_reg) @ y_train\n \n # Make predictions\n test_predictions = K_test @ alpha\n test_classes = (test_predictions > 0.5).astype(int)\n \n # Training predictions\n train_predictions = K_train @ alpha\n train_classes = (train_predictions > 0.5).astype(int)\n train_accuracy = np.mean(train_classes == y_train)\n \n return {\n 'success': True,\n 'kernel_type': kernel_type,\n 'n_qubits': n_qubits,\n 'training_samples': n_train,\n 'test_samples': n_test,\n 'train_accuracy': float(train_accuracy),\n 'test_predictions': test_classes.tolist(),\n 'test_scores': test_predictions.tolist(),\n 'kernel_matrix_rank': np.linalg.matrix_rank(K_train),\n 'kernel_trace': float(np.trace(K_train))\n }\n \n except Exception as e:\n logger.error(f\"Error in quantum kernel method: {e}\")\n return {'success': False, 'error': str(e)}\n\nasync def quantum_generative_model(\n training_data: List[List[float]],\n n_qubits: int = 4,\n n_layers: int = 3,\n learning_rate: float = 0.1,\n epochs: int = 100\n) -> Dict[str, Any]:\n \"\"\"\n Train a quantum generative model.\n \n Args:\n training_data: Training data samples\n n_qubits: Number of qubits\n n_layers: Number of layers in generator\n learning_rate: Learning rate\n epochs: Training epochs\n \n Returns:\n Generative model results\n \"\"\"\n logger.info(f\"Training quantum generative model with {n_qubits} qubits\")\n \n try:\n import pennylane as qml\n from pennylane import numpy as pnp\n \n X_train = np.array(training_data)\n n_samples, n_features = X_train.shape\n \n # Limit features to n_qubits\n if n_features > n_qubits:\n X_train = X_train[:, :n_qubits]\n n_features = n_qubits\n \n # Normalize to [0, 1]\n X_train = (X_train - X_train.min()) / (X_train.max() - X_train.min() + 1e-8)\n \n dev = qml.device('default.qubit', wires=n_qubits)\n \n def generator_circuit(params):\n \"\"\"Quantum generator circuit.\"\"\"\n # Initialize in superposition\n for i in range(n_qubits):\n qml.Hadamard(wires=i)\n \n # Variational layers\n for layer in range(n_layers):\n for i in range(n_qubits):\n qml.RY(params[layer * n_qubits * 2 + i], wires=i)\n qml.RZ(params[layer * n_qubits * 2 + n_qubits + i], wires=i)\n \n for i in range(n_qubits - 1):\n qml.CNOT(wires=[i, i + 1])\n \n @qml.qnode(dev)\n def generate_sample(params):\n generator_circuit(params)\n return qml.probs(wires=range(n_features))\n \n # Initialize parameters\n n_params = n_layers * n_qubits * 2\n params = pnp.random.uniform(0, 2 * pnp.pi, n_params, requires_grad=True)\n \n def target_distribution(x):\n \"\"\"Target distribution based on training data.\"\"\"\n # Empirical distribution\n closest_idx = np.argmin(np.sum((X_train - x)**2, axis=1))\n return 1.0 / n_samples # Uniform over training samples\n \n def kl_divergence_loss(params):\n \"\"\"KL divergence between generated and target distributions.\"\"\"\n generated_probs = generate_sample(params)\n \n # Calculate empirical target probabilities\n n_states = 2**n_features\n target_probs = np.zeros(n_states)\n \n for sample in X_train:\n # Convert continuous sample to discrete state\n state_idx = 0\n for i, val in enumerate(sample[:n_features]):\n if val > 0.5:\n state_idx += 2**i\n target_probs[state_idx] += 1.0 / n_samples\n \n # Add small epsilon to avoid log(0)\n epsilon = 1e-8\n target_probs += epsilon\n generated_probs += epsilon\n \n # Normalize\n target_probs /= np.sum(target_probs)\n generated_probs /= np.sum(generated_probs)\n \n # KL divergence\n kl_div = np.sum(target_probs * np.log(target_probs / generated_probs))\n return kl_div\n \n # Training loop\n optimizer = qml.AdamOptimizer(stepsize=learning_rate)\n losses = []\n \n for epoch in range(epochs):\n params, loss = optimizer.step_and_cost(kl_divergence_loss, params)\n losses.append(float(loss))\n \n if epoch % 20 == 0:\n logger.info(f\"Epoch {epoch}: Loss = {loss:.4f}\")\n \n # Generate samples\n generated_probs = generate_sample(params)\n \n # Sample from the learned distribution\n n_generate = min(50, n_samples)\n generated_samples = []\n \n for _ in range(n_generate):\n # Sample a quantum state\n state_idx = np.random.choice(len(generated_probs), p=generated_probs)\n \n # Convert state index to binary representation\n binary = format(state_idx, f'0{n_features}b')\n sample = [int(bit) for bit in binary]\n generated_samples.append(sample)\n \n return {\n 'success': True,\n 'n_qubits': n_qubits,\n 'n_layers': n_layers,\n 'epochs': epochs,\n 'final_loss': float(losses[-1]),\n 'loss_history': losses,\n 'generated_samples': generated_samples,\n 'generated_probabilities': generated_probs.tolist(),\n 'training_samples': len(X_train),\n 'optimal_parameters': params.tolist()\n }\n \n except Exception as e:\n logger.error(f\"Error in quantum generative model: {e}\")\n return {'success': False, 'error': str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/manasp21/Psi-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server