Skip to main content
Glama

Katamari MCP Server

by ciphernaut
heuristics.pyโ€ข14.7 kB
""" Heuristic engine for intelligent ACP decision making. """ from enum import Enum from typing import Dict, List, Optional, Any from dataclasses import dataclass class RiskLevel(Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" class Complexity(Enum): SIMPLE = "simple" MODERATE = "moderate" COMPLEX = "complex" ARCHITECTURAL = "architectural" class Scope(Enum): CAPABILITY = "capability" WORKFLOW = "workflow" CORE = "core" SECURITY = "security" class Persistence(Enum): EPHEMERAL = "ephemeral" SESSION = "session" PERSISTENT = "persistent" PERMANENT = "permanent" class Testing(Enum): NONE = "none" BASIC = "basic" COMPREHENSIVE = "comprehensive" PARALLEL = "parallel" class Approval(Enum): AUTO = "auto" NOTIFY = "notify" CONFIRM = "confirm" MANUAL = "manual" class DataExposure(Enum): NONE = "none" LOCAL = "local" INTERNAL = "internal" EXTERNAL = "external" EXFILTRATION = "exfiltration" @dataclass class HeuristicTags: """Heuristic tags for ACP operations.""" risk: RiskLevel complexity: Complexity scope: Scope persistence: Persistence testing: Testing approval: Approval data_exposure: DataExposure def to_dict(self) -> Dict[str, str]: """Convert to dictionary representation.""" return { "risk": self.risk.value, "complexity": self.complexity.value, "scope": self.scope.value, "persistence": self.persistence.value, "testing": self.testing.value, "approval": self.approval.value, "data_exposure": self.data_exposure.value, } @dataclass class HeuristicProfile: """Heuristic profile for components with learning capabilities.""" tags: HeuristicTags performance_history: List[Dict[str, Any]] = None learned_patterns: List[Dict[str, Any]] = None feedback_history: List[Dict[str, Any]] = None def __post_init__(self): if self.performance_history is None: self.performance_history = [] if self.learned_patterns is None: self.learned_patterns = [] if self.feedback_history is None: self.feedback_history = [] def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { 'tags': self.tags.to_dict(), 'performance_history': self.performance_history, 'learned_patterns': self.learned_patterns, 'feedback_history': self.feedback_history } @classmethod def default(cls) -> 'HeuristicProfile': """Create default heuristic profile.""" tags = HeuristicTags( risk=RiskLevel.LOW, complexity=Complexity.SIMPLE, scope=Scope.CAPABILITY, persistence=Persistence.EPHEMERAL, testing=Testing.BASIC, approval=Approval.AUTO, data_exposure=DataExposure.NONE ) return cls(tags=tags) @property def risk(self) -> RiskLevel: return self.tags.risk @property def complexity(self) -> Complexity: return self.tags.complexity @property def scope(self) -> Scope: return self.tags.scope @property def data_exposure(self) -> DataExposure: return self.tags.data_exposure class HeuristicEngine: """Static heuristic engine for ACP decision making.""" def __init__(self): self._rules = self._initialize_rules() def _initialize_rules(self) -> Dict[str, Any]: """Initialize static heuristic rules.""" return { # Auto-approval conditions (more permissive for common operations) "auto_approve": { "risk": [RiskLevel.LOW, RiskLevel.MEDIUM], "complexity": [Complexity.SIMPLE, Complexity.MODERATE], "scope": [Scope.CAPABILITY, Scope.WORKFLOW] }, # Parallel testing requirements "parallel_testing": { "scope": [Scope.CORE, Scope.SECURITY], "risk": [RiskLevel.HIGH, RiskLevel.CRITICAL], "complexity": [Complexity.ARCHITECTURAL] }, # Git commit requirements "git_commit": { "persistence": [Persistence.PERSISTENT, Persistence.PERMANENT], "scope": [Scope.CAPABILITY, Scope.WORKFLOW, Scope.CORE, Scope.SECURITY] }, # Manual approval requirements "manual_approval": { "risk": [RiskLevel.CRITICAL], "scope": [Scope.SECURITY], "complexity": [Complexity.ARCHITECTURAL] } } def evaluate_operation(self, operation: Dict[str, Any]) -> HeuristicTags: """Evaluate an operation and assign heuristic tags.""" tags = HeuristicTags( risk=self._assess_risk(operation), complexity=self._assess_complexity(operation), scope=self._assess_scope(operation), persistence=self._assess_persistence(operation), testing=self._determine_testing(operation), approval=self._determine_approval(operation), data_exposure=self._assess_data_exposure(operation) ) # Apply cross-tag consistency rules return self._apply_consistency_rules(tags) def _assess_risk(self, operation: Dict[str, Any]) -> RiskLevel: """Assess risk level of operation.""" op_type = operation.get("type", "") target = operation.get("target", "") # Critical-risk operations (core system changes) if any(x in target.lower() for x in ["router", "security", "validator", "registry"]): return RiskLevel.CRITICAL if op_type in ["delete", "modify_core", "security_change"]: return RiskLevel.HIGH # Medium-risk operations (system modifications) if op_type in ["modify", "compose", "heal"]: return RiskLevel.MEDIUM # Low-risk operations (including web operations) if op_type in ["create", "analyze", "document", "execute"]: return RiskLevel.LOW # Web operations are typically low risk if target.startswith("web_"): return RiskLevel.LOW return RiskLevel.MEDIUM # Default def _assess_complexity(self, operation: Dict[str, Any]) -> Complexity: """Assess complexity of operation.""" op_type = operation.get("type", "") dependencies = operation.get("dependencies", []) target = operation.get("target", "") # Architectural complexity (core system changes) if op_type in ["modify_core", "security_change"] or target.startswith("router"): return Complexity.ARCHITECTURAL # Complex operations (multi-step workflows) if len(dependencies) > 3 or op_type in ["compose", "heal"]: return Complexity.COMPLEX # Moderate complexity (single modifications) if len(dependencies) > 0 or op_type in ["modify"]: return Complexity.MODERATE # Web operations are typically simple if target.startswith("web_"): return Complexity.SIMPLE # Simple operations (read/execute) if op_type in ["create", "analyze", "document", "execute"]: return Complexity.SIMPLE return Complexity.SIMPLE # Default to simple for better UX def _assess_scope(self, operation: Dict[str, Any]) -> Scope: """Assess scope of operation.""" target = operation.get("target", "") op_type = operation.get("type", "") # Security scope if any(x in target.lower() for x in ["security", "validator", "auth"]): return Scope.SECURITY # Core scope if any(x in target.lower() for x in ["router", "core", "server"]): return Scope.CORE # Workflow scope if op_type in ["compose", "workflow"]: return Scope.WORKFLOW # Default to capability scope return Scope.CAPABILITY def _assess_persistence(self, operation: Dict[str, Any]) -> Persistence: """Assess persistence level of operation.""" op_type = operation.get("type", "") if op_type in ["analyze", "inspect"]: return Persistence.EPHEMERAL if op_type in ["test", "experiment"]: return Persistence.SESSION if op_type in ["create", "modify"]: return Persistence.PERSISTENT return Persistence.PERMANENT def _determine_testing(self, operation: Dict[str, Any]) -> Testing: """Determine testing requirements.""" scope = self._assess_scope(operation) risk = self._assess_risk(operation) # Parallel testing for core/security changes if scope in [Scope.CORE, Scope.SECURITY]: return Testing.PARALLEL # Comprehensive testing for high-risk operations if risk in [RiskLevel.HIGH, RiskLevel.CRITICAL]: return Testing.COMPREHENSIVE # Basic testing for modifications if operation.get("type") in ["modify", "compose"]: return Testing.BASIC return Testing.NONE def _determine_approval(self, operation: Dict[str, Any]) -> Approval: """Determine approval requirements.""" risk = self._assess_risk(operation) scope = self._assess_scope(operation) complexity = self._assess_complexity(operation) # Manual approval for critical operations if risk == RiskLevel.CRITICAL or scope == Scope.SECURITY: return Approval.MANUAL # Confirmation for high-risk or complex operations if risk == RiskLevel.HIGH or complexity == Complexity.ARCHITECTURAL: return Approval.CONFIRM # Notification for medium-risk operations if risk == RiskLevel.MEDIUM: return Approval.NOTIFY # Auto-approval for low-risk operations return Approval.AUTO def _assess_data_exposure(self, operation: Dict[str, Any]) -> DataExposure: """Assess data exposure risk of operation.""" op_type = operation.get("type", "") target = operation.get("target", "") description = operation.get("description", "").lower() # High exfiltration risk - system information gathering if any(keyword in target.lower() for keyword in ["facter", "system_info", "env", "processes", "network", "filesystem"]): return DataExposure.EXFILTRATION if any(keyword in description for keyword in ["system information", "environment variables", "process list", "network scan", "running processes"]): return DataExposure.EXFILTRATION # External data access if target.startswith(("web_", "api_", "http")) or op_type in ["web_scrape", "api_call"]: return DataExposure.EXTERNAL # Internal system data if any(keyword in target.lower() for keyword in ["database", "cache", "logs", "config"]): return DataExposure.INTERNAL # Local file access if op_type in ["read_file", "write_file", "list_dir"] or "file" in target.lower(): return DataExposure.LOCAL # No data exposure if op_type in ["analyze", "validate", "test", "document"]: return DataExposure.NONE return DataExposure.NONE # Default to no exposure def _apply_consistency_rules(self, tags: HeuristicTags) -> HeuristicTags: """Apply cross-tag consistency rules.""" # Critical risk always requires manual approval if tags.risk == RiskLevel.CRITICAL: tags.approval = Approval.MANUAL tags.testing = Testing.PARALLEL # Core scope always requires git commit if tags.scope == Scope.CORE: tags.persistence = Persistence.PERSISTENT # Security scope always requires comprehensive testing if tags.scope == Scope.SECURITY: tags.testing = Testing.COMPREHENSIVE # High data exposure requires manual approval and comprehensive testing if tags.data_exposure in [DataExposure.EXTERNAL, DataExposure.EXFILTRATION]: tags.approval = Approval.MANUAL tags.testing = Testing.COMPREHENSIVE if tags.data_exposure == DataExposure.EXFILTRATION: tags.risk = RiskLevel.CRITICAL return tags def can_auto_approve(self, tags: HeuristicTags) -> bool: """Check if operation can be auto-approved.""" return ( tags.risk in self._rules["auto_approve"]["risk"] and tags.complexity in self._rules["auto_approve"]["complexity"] and tags.scope in self._rules["auto_approve"]["scope"] ) def needs_parallel_testing(self, tags: HeuristicTags) -> bool: """Check if operation needs parallel testing.""" return ( tags.scope in self._rules["parallel_testing"]["scope"] or tags.risk in self._rules["parallel_testing"]["risk"] or tags.complexity in self._rules["parallel_testing"]["complexity"] ) def needs_git_commit(self, tags: HeuristicTags) -> bool: """Check if operation needs git commit.""" return ( tags.persistence in self._rules["git_commit"]["persistence"] or tags.scope in self._rules["git_commit"]["scope"] ) def needs_manual_approval(self, tags: HeuristicTags) -> bool: """Check if operation needs manual approval.""" return ( tags.risk in self._rules["manual_approval"]["risk"] or tags.scope in self._rules["manual_approval"]["scope"] or tags.complexity in self._rules["manual_approval"]["complexity"] ) async def update_tag_weight( self, tag: str, new_value: int, reason: str ) -> bool: """Update heuristic tag weight for adaptive learning.""" # For Phase 2, this is a placeholder for dynamic weight updates # In a full implementation, this would modify the internal rules # based on learning feedback return True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server