Skip to main content
Glama
gep_mcp_motor.py25.3 kB
""" GEP-Native MCP Control Layer - Python Integration ================================================ Motor neuron pattern for tool execution with entropy-weighted routing. Author: Gary W. Floyd / Lumiea Systems Research Division Based on: Guided Entropy Principle (GEP) framework Date: December 2025 Installation: pip install psycopg2-binary sentence-transformers numpy --break-system-packages Usage: from gep_mcp_motor import GEPMotorNeuron motor = GEPMotorNeuron( db_conn_string="postgresql://nexus_mcp:password@localhost/gep_mcp_v2" ) result = motor.execute_intent( session_id="session_123", operator_id="gary", intent="check system health", input_data={"verbose": True} ) print(result['action']) # ALLOW, THROTTLE, STEP_UP, or BLOCK print(result['gep_metrics']) # entropy_before, entropy_after, delta_s, alignment """ import psycopg2 import psycopg2.extras import hashlib import uuid import time import numpy as np import subprocess import json from typing import Dict, Any, List, Optional, Tuple from dataclasses import dataclass from sentence_transformers import SentenceTransformer @dataclass class GEPMetrics: """GEP decision metrics for a tool execution.""" entropy_before: float entropy_after: float delta_s: float alignment: float routing_score: float def to_dict(self) -> Dict[str, float]: return { 'entropy_before': self.entropy_before, 'entropy_after': self.entropy_after, 'delta_s': self.delta_s, 'alignment': self.alignment, 'routing_score': self.routing_score } class GEPMotorNeuron: """ GEP-native MCP control layer. Each tool execution flows through: 1. Compute current entropy + alignment 2. DB routes to best tool + gates execution 3. Execute with constraints 4. Log outcome + update GEP state """ def __init__(self, db_conn_string: str, encoder_model: str = 'all-MiniLM-L6-v2'): """ Initialize GEP motor neuron. Args: db_conn_string: PostgreSQL connection string encoder_model: SentenceTransformer model for intent encoding """ self.conn = psycopg2.connect(db_conn_string) self.encoder = SentenceTransformer(encoder_model) print(f"✓ GEP Motor Neuron initialized with {encoder_model}") def _encode_intent(self, intent: str) -> List[float]: """Convert intent string to semantic vector (384-dim).""" return self.encoder.encode(intent).tolist() def _compute_entropy(self, state: Dict[str, Any], history: List[Dict[str, Any]]) -> float: """ Compute current entropy E(t) from system state + history. Full GEP formula: E(t) = H(p) - I(operator; system) Simplified heuristic based on: - Recent failure rate - Variance in recent latencies - Deviation from expected patterns Args: state: Current system state history: Recent execution history Returns: Entropy value in [0, 1] """ if not history: return 0.20 # baseline for new sessions # Get last N outcomes recent = history[-20:] failure_rate = sum(1 for h in recent if h.get('outcome') != 'ok') / len(recent) # Latency variance (normalized) latencies = [h.get('latency_ms', 1000) for h in recent] if len(latencies) > 1: latency_std = np.std(latencies) latency_normalized = min(1.0, latency_std / 5000.0) else: latency_normalized = 0.0 # Combine metrics entropy = (failure_rate * 0.6 + latency_normalized * 0.4) return min(0.95, max(0.05, entropy)) def _compute_alignment(self, operator_id: str, intent: str, session_history: List[Dict[str, Any]]) -> float: """ Compute operator alignment A(t). Full GEP uses BES attention mechanism: A(t) = softmax(Q·K^T / √d_k) weighted by historical behavior Simplified: semantic similarity to past successful intents Args: operator_id: Operator identifier intent: Current intent string session_history: Recent session history Returns: Alignment value in [0, 1] """ if not session_history: return 0.85 # default high alignment for new sessions # Get recent successful intents successful_intents = [ h['intent'] for h in session_history[-50:] if h.get('outcome') == 'ok' ] if not successful_intents: return 0.70 # moderate alignment if no success history # Encode current + past intents current_vec = self.encoder.encode(intent) past_vecs = self.encoder.encode(successful_intents) # Compute cosine similarities similarities = [] for past_vec in past_vecs: sim = np.dot(current_vec, past_vec) / ( np.linalg.norm(current_vec) * np.linalg.norm(past_vec) ) similarities.append(sim) # Alignment = top-k average similarity top_k = min(10, len(similarities)) alignment = np.mean(sorted(similarities, reverse=True)[:top_k]) return float(min(0.98, max(0.40, alignment))) def execute_intent(self, session_id: str, operator_id: str, intent: str, input_data: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]: """ Full motor neuron cycle with GEP routing. Args: session_id: Unique session identifier operator_id: Operator/user identifier intent: Natural language intent input_data: Tool input parameters context: Optional additional context Returns: { 'action': 'ALLOW|THROTTLE|STEP_UP|BLOCK', 'outcome': 'ok|fail|timeout|blocked|throttled', 'tool_name': str, 'output': Any, 'latency_ms': int, 'validated': bool, 'validation_errors': str | None, 'gep_metrics': { 'entropy_before': float, 'entropy_after': float, 'delta_s': float, 'alignment': float, 'routing_score': float } } """ event_id = uuid.uuid4() context = context or {} # Generate digests for audit trail input_digest = hashlib.sha256( str(sorted(input_data.items())).encode() ).hexdigest() input_nonce = hashlib.sha256( f"{session_id}:{time.time()}:{input_digest}".encode() ).hexdigest() # Encode intent to semantic vector intent_vec = self._encode_intent(intent) # ----------------------------------------------------------------- # STEP 1: Compute GEP state # ----------------------------------------------------------------- with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: # Get recent session history cur.execute(""" SELECT intent, outcome, latency_ms, entropy_after FROM gep.tool_events WHERE session_id = %s ORDER BY ts DESC LIMIT 50 """, (session_id,)) session_history = cur.fetchall() # Get recent system state for entropy calc cur.execute(""" SELECT outcome, latency_ms, entropy_after FROM gep.tool_events WHERE operator_id = %s AND ts > now() - interval '1 hour' ORDER BY ts DESC LIMIT 100 """, (operator_id,)) system_history = cur.fetchall() entropy_current = self._compute_entropy(context, system_history) alignment_current = self._compute_alignment( operator_id, intent, session_history ) print(f"GEP State: E={entropy_current:.3f}, A={alignment_current:.3f}") # ----------------------------------------------------------------- # STEP 2: GEP routing + gating # ----------------------------------------------------------------- with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(""" SELECT * FROM gep.gep_route_and_gate( %s, %s, %s, %s::vector, %s, %s, %s::jsonb ) """, ( session_id, operator_id, intent, str(intent_vec), entropy_current, alignment_current, psycopg2.extras.Json(context) )) result = cur.fetchone() if not result: return self._build_error_response( "Routing failed: no result from gep_route_and_gate", event_id, session_id, operator_id, intent, intent_vec, None, input_digest, input_nonce, entropy_current, alignment_current ) tool_id = result['tool_id'] tool_name = result['tool_name'] action = result['action'] routing_score = result['decision_score'] constraints = result['constraints'] reason = result['reason'] print(f"Routing: tool={tool_name}, action={action}, score={routing_score:.3f}") print(f"Reason: {reason}") # Block immediately if gated if action == 'BLOCK': self._log_event( event_id, session_id, operator_id, intent, intent_vec, tool_id, action, input_digest, input_nonce, 'blocked', None, entropy_current, entropy_current, alignment_current, None, None, None, constraints, reason ) return { 'action': action, 'outcome': 'blocked', 'tool_name': tool_name, 'reason': reason, 'gep_metrics': { 'entropy_before': entropy_current, 'entropy_after': entropy_current, 'delta_s': 0.0, 'alignment': alignment_current, 'routing_score': routing_score } } # ----------------------------------------------------------------- # STEP 3: Execute with constraints # ----------------------------------------------------------------- start_time = time.time() outcome = 'ok' output = None output_digest = None validated = None validation_errors = None try: # Apply constraints timeout_ms = constraints.get('max_runtime_ms', 30000) max_output = constraints.get('max_output_bytes', 200000) print(f"Executing: {tool_name} (timeout={timeout_ms}ms, max_output={max_output}b)") # Execute tool output = self._execute_tool( tool_name, constraints['protocol'], constraints['endpoint'], input_data, timeout_ms=timeout_ms / 1000.0 ) # Validate output if len(str(output)) > max_output: outcome = 'fail' validation_errors = f"Output exceeds max size: {len(str(output))} > {max_output}" validated = False else: validated, validation_errors = self._validate_output( tool_id, output ) if not validated: outcome = 'fail' output_digest = hashlib.sha256(str(output).encode()).hexdigest() except subprocess.TimeoutExpired: outcome = 'timeout' output = None validated = False validation_errors = "Execution timeout" except Exception as e: outcome = 'fail' output = str(e) validated = False validation_errors = f"Execution error: {e}" latency_ms = int((time.time() - start_time) * 1000) # ----------------------------------------------------------------- # STEP 4: Compute entropy_after + log # ----------------------------------------------------------------- entropy_after = self._compute_entropy_after( entropy_current, outcome, latency_ms, validated ) print(f"Result: outcome={outcome}, latency={latency_ms}ms, E_after={entropy_after:.3f}") self._log_event( event_id, session_id, operator_id, intent, intent_vec, tool_id, action, input_digest, input_nonce, outcome, latency_ms, entropy_current, entropy_after, alignment_current, output_digest, validated, validation_errors, constraints, None ) return { 'action': action, 'outcome': outcome, 'tool_name': tool_name, 'output': output, 'latency_ms': latency_ms, 'validated': validated, 'validation_errors': validation_errors, 'gep_metrics': { 'entropy_before': entropy_current, 'entropy_after': entropy_after, 'delta_s': entropy_after - entropy_current, 'alignment': alignment_current, 'routing_score': routing_score } } def _execute_tool(self, name: str, protocol: str, endpoint: str, input_data: Dict[str, Any], timeout_ms: float) -> Any: """ Execute tool based on protocol. Supported protocols: - local_exec: Execute local shell script - http: HTTP POST request - jsonrpc: JSON-RPC call - ssh: Remote execution via SSH - mcp: MCP tool call """ if protocol == 'local_exec': # Pass JSON on stdin (deterministic, auditable, ordered) input_json = json.dumps(input_data) result = subprocess.run( [endpoint], input=input_json, capture_output=True, timeout=timeout_ms, text=True, check=False ) if result.returncode != 0: raise RuntimeError(f"Tool failed with exit code {result.returncode}: {result.stderr}") return result.stdout.strip() elif protocol == 'http': import requests resp = requests.post( endpoint, json=input_data, timeout=timeout_ms ) resp.raise_for_status() return resp.json() elif protocol == 'jsonrpc': import requests payload = { 'jsonrpc': '2.0', 'method': name, 'params': input_data, 'id': str(uuid.uuid4()) } resp = requests.post( endpoint, json=payload, timeout=timeout_ms ) resp.raise_for_status() result = resp.json() if 'error' in result: raise RuntimeError(f"JSON-RPC error: {result['error']}") return result.get('result') else: raise NotImplementedError(f"Protocol {protocol} not implemented") def _validate_output(self, tool_id: int, output: Any) -> Tuple[bool, Optional[str]]: """ Validate output against schema. Returns: (is_valid, error_message) """ with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(""" SELECT * FROM gep.output_schema WHERE tool_id = %s """, (tool_id,)) schema = cur.fetchone() if not schema: return True, None # no schema = no validation expected_format = schema['expected_format'] # Basic format validation if expected_format == 'json': try: json.loads(str(output)) return True, None except json.JSONDecodeError as e: return False, f"Invalid JSON: {e}" elif expected_format == 'exit_code': # For shell scripts, success = 0 return output == '0' or output == '', None # Add more validation as needed return True, None def _compute_entropy_after(self, entropy_before: float, outcome: str, latency_ms: int, validated: bool) -> float: """ Compute entropy after execution. GEP principle: Successful execution reduces entropy (increases certainty) Failed/slow execution increases entropy (increases uncertainty) Args: entropy_before: Initial entropy outcome: Execution outcome latency_ms: Execution latency validated: Whether output was validated Returns: Updated entropy value """ delta = 0.0 # Outcome impact if outcome == 'ok' and validated: delta = -0.05 # success reduces uncertainty elif outcome == 'fail': delta = +0.10 # failure increases uncertainty elif outcome == 'timeout': delta = +0.15 # timeout is very uncertain # Latency penalty if latency_ms > 5000: delta += 0.05 # slow execution adds uncertainty return min(0.95, max(0.05, entropy_before + delta)) def _log_event(self, *args): """Log event to database and update GEP state.""" with self.conn.cursor() as cur: cur.execute(""" SELECT gep.log_and_update( %s, %s, %s, %s, %s::vector, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s ) """, args) self.conn.commit() def _build_error_response(self, error_msg: str, event_id: uuid.UUID, session_id: str, operator_id: str, intent: str, intent_vec: List[float], tool_id: Optional[int], input_digest: str, input_nonce: str, entropy_current: float, alignment_current: float) -> Dict[str, Any]: """Build error response and log to database.""" self._log_event( event_id, session_id, operator_id, intent, intent_vec, tool_id, 'BLOCK', input_digest, input_nonce, 'blocked', None, entropy_current, entropy_current, alignment_current, None, None, error_msg, {}, error_msg ) return { 'action': 'BLOCK', 'outcome': 'blocked', 'reason': error_msg, 'gep_metrics': { 'entropy_before': entropy_current, 'entropy_after': entropy_current, 'delta_s': 0.0, 'alignment': alignment_current, 'routing_score': 0.0 } } def update_intent_embeddings(self, batch_size: int = 10): """ Update intent_embedding vectors for all intent_map entries. Call this after initial setup to populate semantic vectors. Args: batch_size: Number of intents to process at once """ with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(""" SELECT intent_id, intent_pattern FROM gep.intent_map WHERE intent_embedding IS NULL """) intents = cur.fetchall() if not intents: print("✓ All intent embeddings already populated") return print(f"Updating {len(intents)} intent embeddings...") for i in range(0, len(intents), batch_size): batch = intents[i:i+batch_size] patterns = [intent['intent_pattern'] for intent in batch] # Encode batch embeddings = self.encoder.encode(patterns) # Update database for intent, embedding in zip(batch, embeddings): cur.execute(""" UPDATE gep.intent_map SET intent_embedding = %s::vector WHERE intent_id = %s """, (str(embedding.tolist()), intent['intent_id'])) self.conn.commit() print(f" Updated {min(i+batch_size, len(intents))}/{len(intents)}") print("✓ Intent embeddings updated successfully") def get_tool_stats(self) -> List[Dict[str, Any]]: """Get performance statistics for all tools.""" with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT * FROM gep.v_tool_performance") return cur.fetchall() def get_recent_events(self, limit: int = 20) -> List[Dict[str, Any]]: """Get recent execution events.""" with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(f"SELECT * FROM gep.v_recent_events LIMIT %s", (limit,)) return cur.fetchall() def close(self): """Close database connection.""" self.conn.close() def main(): """Example usage and testing.""" import argparse parser = argparse.ArgumentParser(description='GEP Motor Neuron Test') parser.add_argument('--db', default='postgresql://nexus_mcp:change_me_in_production@localhost/gep_mcp_v2', help='Database connection string') parser.add_argument('--update-embeddings', action='store_true', help='Update intent embeddings') parser.add_argument('--stats', action='store_true', help='Show tool statistics') parser.add_argument('--test', action='store_true', help='Run test execution') args = parser.parse_args() # Initialize motor neuron motor = GEPMotorNeuron(args.db) if args.update_embeddings: motor.update_intent_embeddings() if args.stats: print("\n=== Tool Performance Statistics ===") stats = motor.get_tool_stats() for stat in stats: print(f"\n{stat['name']}:") print(f" Uses: {stat['uses_total']}, Failures: {stat['failures_total']}") print(f" Success Rate: {stat['success_rate']}") print(f" Current Entropy: {stat['current_entropy']}") print(f" Avg Latency: {stat['avg_latency_ms']}ms") if args.test: print("\n=== Test Execution ===") result = motor.execute_intent( session_id="test_session", operator_id="gary", intent="check system health", input_data={"verbose": "true"} ) print(f"\nResult:") print(f" Action: {result['action']}") print(f" Outcome: {result['outcome']}") print(f" Tool: {result['tool_name']}") print(f" GEP Metrics: {result['gep_metrics']}") motor.close() if __name__ == '__main__': main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/darkt22002/gep-mcp-motor'

If you have feedback or need assistance with the MCP directory API, please join our Discord server