"""
GEP-Native MCP Control Layer - Python Integration
================================================
Motor neuron pattern for tool execution with entropy-weighted routing.
Author: Gary W. Floyd / Lumiea Systems Research Division
Based on: Guided Entropy Principle (GEP) framework
Date: December 2025
Installation:
pip install psycopg2-binary sentence-transformers numpy --break-system-packages
Usage:
from gep_mcp_motor import GEPMotorNeuron
motor = GEPMotorNeuron(
db_conn_string="postgresql://nexus_mcp:password@localhost/gep_mcp_v2"
)
result = motor.execute_intent(
session_id="session_123",
operator_id="gary",
intent="check system health",
input_data={"verbose": True}
)
print(result['action']) # ALLOW, THROTTLE, STEP_UP, or BLOCK
print(result['gep_metrics']) # entropy_before, entropy_after, delta_s, alignment
"""
import psycopg2
import psycopg2.extras
import hashlib
import uuid
import time
import numpy as np
import subprocess
import json
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass
from sentence_transformers import SentenceTransformer
@dataclass
class GEPMetrics:
"""GEP decision metrics for a tool execution."""
entropy_before: float
entropy_after: float
delta_s: float
alignment: float
routing_score: float
def to_dict(self) -> Dict[str, float]:
return {
'entropy_before': self.entropy_before,
'entropy_after': self.entropy_after,
'delta_s': self.delta_s,
'alignment': self.alignment,
'routing_score': self.routing_score
}
class GEPMotorNeuron:
"""
GEP-native MCP control layer.
Each tool execution flows through:
1. Compute current entropy + alignment
2. DB routes to best tool + gates execution
3. Execute with constraints
4. Log outcome + update GEP state
"""
def __init__(self,
db_conn_string: str,
encoder_model: str = 'all-MiniLM-L6-v2'):
"""
Initialize GEP motor neuron.
Args:
db_conn_string: PostgreSQL connection string
encoder_model: SentenceTransformer model for intent encoding
"""
self.conn = psycopg2.connect(db_conn_string)
self.encoder = SentenceTransformer(encoder_model)
print(f"✓ GEP Motor Neuron initialized with {encoder_model}")
def _encode_intent(self, intent: str) -> List[float]:
"""Convert intent string to semantic vector (384-dim)."""
return self.encoder.encode(intent).tolist()
def _compute_entropy(self,
state: Dict[str, Any],
history: List[Dict[str, Any]]) -> float:
"""
Compute current entropy E(t) from system state + history.
Full GEP formula:
E(t) = H(p) - I(operator; system)
Simplified heuristic based on:
- Recent failure rate
- Variance in recent latencies
- Deviation from expected patterns
Args:
state: Current system state
history: Recent execution history
Returns:
Entropy value in [0, 1]
"""
if not history:
return 0.20 # baseline for new sessions
# Get last N outcomes
recent = history[-20:]
failure_rate = sum(1 for h in recent if h.get('outcome') != 'ok') / len(recent)
# Latency variance (normalized)
latencies = [h.get('latency_ms', 1000) for h in recent]
if len(latencies) > 1:
latency_std = np.std(latencies)
latency_normalized = min(1.0, latency_std / 5000.0)
else:
latency_normalized = 0.0
# Combine metrics
entropy = (failure_rate * 0.6 + latency_normalized * 0.4)
return min(0.95, max(0.05, entropy))
def _compute_alignment(self,
operator_id: str,
intent: str,
session_history: List[Dict[str, Any]]) -> float:
"""
Compute operator alignment A(t).
Full GEP uses BES attention mechanism:
A(t) = softmax(Q·K^T / √d_k) weighted by historical behavior
Simplified: semantic similarity to past successful intents
Args:
operator_id: Operator identifier
intent: Current intent string
session_history: Recent session history
Returns:
Alignment value in [0, 1]
"""
if not session_history:
return 0.85 # default high alignment for new sessions
# Get recent successful intents
successful_intents = [
h['intent'] for h in session_history[-50:]
if h.get('outcome') == 'ok'
]
if not successful_intents:
return 0.70 # moderate alignment if no success history
# Encode current + past intents
current_vec = self.encoder.encode(intent)
past_vecs = self.encoder.encode(successful_intents)
# Compute cosine similarities
similarities = []
for past_vec in past_vecs:
sim = np.dot(current_vec, past_vec) / (
np.linalg.norm(current_vec) * np.linalg.norm(past_vec)
)
similarities.append(sim)
# Alignment = top-k average similarity
top_k = min(10, len(similarities))
alignment = np.mean(sorted(similarities, reverse=True)[:top_k])
return float(min(0.98, max(0.40, alignment)))
def execute_intent(self,
session_id: str,
operator_id: str,
intent: str,
input_data: Dict[str, Any],
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Full motor neuron cycle with GEP routing.
Args:
session_id: Unique session identifier
operator_id: Operator/user identifier
intent: Natural language intent
input_data: Tool input parameters
context: Optional additional context
Returns:
{
'action': 'ALLOW|THROTTLE|STEP_UP|BLOCK',
'outcome': 'ok|fail|timeout|blocked|throttled',
'tool_name': str,
'output': Any,
'latency_ms': int,
'validated': bool,
'validation_errors': str | None,
'gep_metrics': {
'entropy_before': float,
'entropy_after': float,
'delta_s': float,
'alignment': float,
'routing_score': float
}
}
"""
event_id = uuid.uuid4()
context = context or {}
# Generate digests for audit trail
input_digest = hashlib.sha256(
str(sorted(input_data.items())).encode()
).hexdigest()
input_nonce = hashlib.sha256(
f"{session_id}:{time.time()}:{input_digest}".encode()
).hexdigest()
# Encode intent to semantic vector
intent_vec = self._encode_intent(intent)
# -----------------------------------------------------------------
# STEP 1: Compute GEP state
# -----------------------------------------------------------------
with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
# Get recent session history
cur.execute("""
SELECT intent, outcome, latency_ms, entropy_after
FROM gep.tool_events
WHERE session_id = %s
ORDER BY ts DESC
LIMIT 50
""", (session_id,))
session_history = cur.fetchall()
# Get recent system state for entropy calc
cur.execute("""
SELECT outcome, latency_ms, entropy_after
FROM gep.tool_events
WHERE operator_id = %s
AND ts > now() - interval '1 hour'
ORDER BY ts DESC
LIMIT 100
""", (operator_id,))
system_history = cur.fetchall()
entropy_current = self._compute_entropy(context, system_history)
alignment_current = self._compute_alignment(
operator_id, intent, session_history
)
print(f"GEP State: E={entropy_current:.3f}, A={alignment_current:.3f}")
# -----------------------------------------------------------------
# STEP 2: GEP routing + gating
# -----------------------------------------------------------------
with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("""
SELECT * FROM gep.gep_route_and_gate(
%s, %s, %s, %s::vector, %s, %s, %s::jsonb
)
""", (
session_id,
operator_id,
intent,
str(intent_vec),
entropy_current,
alignment_current,
psycopg2.extras.Json(context)
))
result = cur.fetchone()
if not result:
return self._build_error_response(
"Routing failed: no result from gep_route_and_gate",
event_id, session_id, operator_id,
intent, intent_vec, None, input_digest, input_nonce,
entropy_current, alignment_current
)
tool_id = result['tool_id']
tool_name = result['tool_name']
action = result['action']
routing_score = result['decision_score']
constraints = result['constraints']
reason = result['reason']
print(f"Routing: tool={tool_name}, action={action}, score={routing_score:.3f}")
print(f"Reason: {reason}")
# Block immediately if gated
if action == 'BLOCK':
self._log_event(
event_id, session_id, operator_id, intent, intent_vec,
tool_id, action, input_digest, input_nonce,
'blocked', None, entropy_current, entropy_current,
alignment_current, None, None, None, constraints, reason
)
return {
'action': action,
'outcome': 'blocked',
'tool_name': tool_name,
'reason': reason,
'gep_metrics': {
'entropy_before': entropy_current,
'entropy_after': entropy_current,
'delta_s': 0.0,
'alignment': alignment_current,
'routing_score': routing_score
}
}
# -----------------------------------------------------------------
# STEP 3: Execute with constraints
# -----------------------------------------------------------------
start_time = time.time()
outcome = 'ok'
output = None
output_digest = None
validated = None
validation_errors = None
try:
# Apply constraints
timeout_ms = constraints.get('max_runtime_ms', 30000)
max_output = constraints.get('max_output_bytes', 200000)
print(f"Executing: {tool_name} (timeout={timeout_ms}ms, max_output={max_output}b)")
# Execute tool
output = self._execute_tool(
tool_name,
constraints['protocol'],
constraints['endpoint'],
input_data,
timeout_ms=timeout_ms / 1000.0
)
# Validate output
if len(str(output)) > max_output:
outcome = 'fail'
validation_errors = f"Output exceeds max size: {len(str(output))} > {max_output}"
validated = False
else:
validated, validation_errors = self._validate_output(
tool_id, output
)
if not validated:
outcome = 'fail'
output_digest = hashlib.sha256(str(output).encode()).hexdigest()
except subprocess.TimeoutExpired:
outcome = 'timeout'
output = None
validated = False
validation_errors = "Execution timeout"
except Exception as e:
outcome = 'fail'
output = str(e)
validated = False
validation_errors = f"Execution error: {e}"
latency_ms = int((time.time() - start_time) * 1000)
# -----------------------------------------------------------------
# STEP 4: Compute entropy_after + log
# -----------------------------------------------------------------
entropy_after = self._compute_entropy_after(
entropy_current, outcome, latency_ms, validated
)
print(f"Result: outcome={outcome}, latency={latency_ms}ms, E_after={entropy_after:.3f}")
self._log_event(
event_id, session_id, operator_id, intent, intent_vec,
tool_id, action, input_digest, input_nonce,
outcome, latency_ms, entropy_current, entropy_after,
alignment_current, output_digest, validated, validation_errors,
constraints, None
)
return {
'action': action,
'outcome': outcome,
'tool_name': tool_name,
'output': output,
'latency_ms': latency_ms,
'validated': validated,
'validation_errors': validation_errors,
'gep_metrics': {
'entropy_before': entropy_current,
'entropy_after': entropy_after,
'delta_s': entropy_after - entropy_current,
'alignment': alignment_current,
'routing_score': routing_score
}
}
def _execute_tool(self,
name: str,
protocol: str,
endpoint: str,
input_data: Dict[str, Any],
timeout_ms: float) -> Any:
"""
Execute tool based on protocol.
Supported protocols:
- local_exec: Execute local shell script
- http: HTTP POST request
- jsonrpc: JSON-RPC call
- ssh: Remote execution via SSH
- mcp: MCP tool call
"""
if protocol == 'local_exec':
# Pass JSON on stdin (deterministic, auditable, ordered)
input_json = json.dumps(input_data)
result = subprocess.run(
[endpoint],
input=input_json,
capture_output=True,
timeout=timeout_ms,
text=True,
check=False
)
if result.returncode != 0:
raise RuntimeError(f"Tool failed with exit code {result.returncode}: {result.stderr}")
return result.stdout.strip()
elif protocol == 'http':
import requests
resp = requests.post(
endpoint,
json=input_data,
timeout=timeout_ms
)
resp.raise_for_status()
return resp.json()
elif protocol == 'jsonrpc':
import requests
payload = {
'jsonrpc': '2.0',
'method': name,
'params': input_data,
'id': str(uuid.uuid4())
}
resp = requests.post(
endpoint,
json=payload,
timeout=timeout_ms
)
resp.raise_for_status()
result = resp.json()
if 'error' in result:
raise RuntimeError(f"JSON-RPC error: {result['error']}")
return result.get('result')
else:
raise NotImplementedError(f"Protocol {protocol} not implemented")
def _validate_output(self,
tool_id: int,
output: Any) -> Tuple[bool, Optional[str]]:
"""
Validate output against schema.
Returns:
(is_valid, error_message)
"""
with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("""
SELECT * FROM gep.output_schema WHERE tool_id = %s
""", (tool_id,))
schema = cur.fetchone()
if not schema:
return True, None # no schema = no validation
expected_format = schema['expected_format']
# Basic format validation
if expected_format == 'json':
try:
json.loads(str(output))
return True, None
except json.JSONDecodeError as e:
return False, f"Invalid JSON: {e}"
elif expected_format == 'exit_code':
# For shell scripts, success = 0
return output == '0' or output == '', None
# Add more validation as needed
return True, None
def _compute_entropy_after(self,
entropy_before: float,
outcome: str,
latency_ms: int,
validated: bool) -> float:
"""
Compute entropy after execution.
GEP principle: Successful execution reduces entropy (increases certainty)
Failed/slow execution increases entropy (increases uncertainty)
Args:
entropy_before: Initial entropy
outcome: Execution outcome
latency_ms: Execution latency
validated: Whether output was validated
Returns:
Updated entropy value
"""
delta = 0.0
# Outcome impact
if outcome == 'ok' and validated:
delta = -0.05 # success reduces uncertainty
elif outcome == 'fail':
delta = +0.10 # failure increases uncertainty
elif outcome == 'timeout':
delta = +0.15 # timeout is very uncertain
# Latency penalty
if latency_ms > 5000:
delta += 0.05 # slow execution adds uncertainty
return min(0.95, max(0.05, entropy_before + delta))
def _log_event(self, *args):
"""Log event to database and update GEP state."""
with self.conn.cursor() as cur:
cur.execute("""
SELECT gep.log_and_update(
%s, %s, %s, %s, %s::vector, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s::jsonb, %s
)
""", args)
self.conn.commit()
def _build_error_response(self,
error_msg: str,
event_id: uuid.UUID,
session_id: str,
operator_id: str,
intent: str,
intent_vec: List[float],
tool_id: Optional[int],
input_digest: str,
input_nonce: str,
entropy_current: float,
alignment_current: float) -> Dict[str, Any]:
"""Build error response and log to database."""
self._log_event(
event_id, session_id, operator_id, intent, intent_vec,
tool_id, 'BLOCK', input_digest, input_nonce,
'blocked', None, entropy_current, entropy_current,
alignment_current, None, None, error_msg, {}, error_msg
)
return {
'action': 'BLOCK',
'outcome': 'blocked',
'reason': error_msg,
'gep_metrics': {
'entropy_before': entropy_current,
'entropy_after': entropy_current,
'delta_s': 0.0,
'alignment': alignment_current,
'routing_score': 0.0
}
}
def update_intent_embeddings(self, batch_size: int = 10):
"""
Update intent_embedding vectors for all intent_map entries.
Call this after initial setup to populate semantic vectors.
Args:
batch_size: Number of intents to process at once
"""
with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("""
SELECT intent_id, intent_pattern
FROM gep.intent_map
WHERE intent_embedding IS NULL
""")
intents = cur.fetchall()
if not intents:
print("✓ All intent embeddings already populated")
return
print(f"Updating {len(intents)} intent embeddings...")
for i in range(0, len(intents), batch_size):
batch = intents[i:i+batch_size]
patterns = [intent['intent_pattern'] for intent in batch]
# Encode batch
embeddings = self.encoder.encode(patterns)
# Update database
for intent, embedding in zip(batch, embeddings):
cur.execute("""
UPDATE gep.intent_map
SET intent_embedding = %s::vector
WHERE intent_id = %s
""", (str(embedding.tolist()), intent['intent_id']))
self.conn.commit()
print(f" Updated {min(i+batch_size, len(intents))}/{len(intents)}")
print("✓ Intent embeddings updated successfully")
def get_tool_stats(self) -> List[Dict[str, Any]]:
"""Get performance statistics for all tools."""
with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM gep.v_tool_performance")
return cur.fetchall()
def get_recent_events(self, limit: int = 20) -> List[Dict[str, Any]]:
"""Get recent execution events."""
with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(f"SELECT * FROM gep.v_recent_events LIMIT %s", (limit,))
return cur.fetchall()
def close(self):
"""Close database connection."""
self.conn.close()
def main():
"""Example usage and testing."""
import argparse
parser = argparse.ArgumentParser(description='GEP Motor Neuron Test')
parser.add_argument('--db', default='postgresql://nexus_mcp:change_me_in_production@localhost/gep_mcp_v2',
help='Database connection string')
parser.add_argument('--update-embeddings', action='store_true',
help='Update intent embeddings')
parser.add_argument('--stats', action='store_true',
help='Show tool statistics')
parser.add_argument('--test', action='store_true',
help='Run test execution')
args = parser.parse_args()
# Initialize motor neuron
motor = GEPMotorNeuron(args.db)
if args.update_embeddings:
motor.update_intent_embeddings()
if args.stats:
print("\n=== Tool Performance Statistics ===")
stats = motor.get_tool_stats()
for stat in stats:
print(f"\n{stat['name']}:")
print(f" Uses: {stat['uses_total']}, Failures: {stat['failures_total']}")
print(f" Success Rate: {stat['success_rate']}")
print(f" Current Entropy: {stat['current_entropy']}")
print(f" Avg Latency: {stat['avg_latency_ms']}ms")
if args.test:
print("\n=== Test Execution ===")
result = motor.execute_intent(
session_id="test_session",
operator_id="gary",
intent="check system health",
input_data={"verbose": "true"}
)
print(f"\nResult:")
print(f" Action: {result['action']}")
print(f" Outcome: {result['outcome']}")
print(f" Tool: {result['tool_name']}")
print(f" GEP Metrics: {result['gep_metrics']}")
motor.close()
if __name__ == '__main__':
main()