heuristics.pyโข14.7 kB
"""
Heuristic engine for intelligent ACP decision making.
"""
from enum import Enum
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class Complexity(Enum):
SIMPLE = "simple"
MODERATE = "moderate"
COMPLEX = "complex"
ARCHITECTURAL = "architectural"
class Scope(Enum):
CAPABILITY = "capability"
WORKFLOW = "workflow"
CORE = "core"
SECURITY = "security"
class Persistence(Enum):
EPHEMERAL = "ephemeral"
SESSION = "session"
PERSISTENT = "persistent"
PERMANENT = "permanent"
class Testing(Enum):
NONE = "none"
BASIC = "basic"
COMPREHENSIVE = "comprehensive"
PARALLEL = "parallel"
class Approval(Enum):
AUTO = "auto"
NOTIFY = "notify"
CONFIRM = "confirm"
MANUAL = "manual"
class DataExposure(Enum):
NONE = "none"
LOCAL = "local"
INTERNAL = "internal"
EXTERNAL = "external"
EXFILTRATION = "exfiltration"
@dataclass
class HeuristicTags:
"""Heuristic tags for ACP operations."""
risk: RiskLevel
complexity: Complexity
scope: Scope
persistence: Persistence
testing: Testing
approval: Approval
data_exposure: DataExposure
def to_dict(self) -> Dict[str, str]:
"""Convert to dictionary representation."""
return {
"risk": self.risk.value,
"complexity": self.complexity.value,
"scope": self.scope.value,
"persistence": self.persistence.value,
"testing": self.testing.value,
"approval": self.approval.value,
"data_exposure": self.data_exposure.value,
}
@dataclass
class HeuristicProfile:
"""Heuristic profile for components with learning capabilities."""
tags: HeuristicTags
performance_history: List[Dict[str, Any]] = None
learned_patterns: List[Dict[str, Any]] = None
feedback_history: List[Dict[str, Any]] = None
def __post_init__(self):
if self.performance_history is None:
self.performance_history = []
if self.learned_patterns is None:
self.learned_patterns = []
if self.feedback_history is None:
self.feedback_history = []
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
'tags': self.tags.to_dict(),
'performance_history': self.performance_history,
'learned_patterns': self.learned_patterns,
'feedback_history': self.feedback_history
}
@classmethod
def default(cls) -> 'HeuristicProfile':
"""Create default heuristic profile."""
tags = HeuristicTags(
risk=RiskLevel.LOW,
complexity=Complexity.SIMPLE,
scope=Scope.CAPABILITY,
persistence=Persistence.EPHEMERAL,
testing=Testing.BASIC,
approval=Approval.AUTO,
data_exposure=DataExposure.NONE
)
return cls(tags=tags)
@property
def risk(self) -> RiskLevel:
return self.tags.risk
@property
def complexity(self) -> Complexity:
return self.tags.complexity
@property
def scope(self) -> Scope:
return self.tags.scope
@property
def data_exposure(self) -> DataExposure:
return self.tags.data_exposure
class HeuristicEngine:
"""Static heuristic engine for ACP decision making."""
def __init__(self):
self._rules = self._initialize_rules()
def _initialize_rules(self) -> Dict[str, Any]:
"""Initialize static heuristic rules."""
return {
# Auto-approval conditions (more permissive for common operations)
"auto_approve": {
"risk": [RiskLevel.LOW, RiskLevel.MEDIUM],
"complexity": [Complexity.SIMPLE, Complexity.MODERATE],
"scope": [Scope.CAPABILITY, Scope.WORKFLOW]
},
# Parallel testing requirements
"parallel_testing": {
"scope": [Scope.CORE, Scope.SECURITY],
"risk": [RiskLevel.HIGH, RiskLevel.CRITICAL],
"complexity": [Complexity.ARCHITECTURAL]
},
# Git commit requirements
"git_commit": {
"persistence": [Persistence.PERSISTENT, Persistence.PERMANENT],
"scope": [Scope.CAPABILITY, Scope.WORKFLOW, Scope.CORE, Scope.SECURITY]
},
# Manual approval requirements
"manual_approval": {
"risk": [RiskLevel.CRITICAL],
"scope": [Scope.SECURITY],
"complexity": [Complexity.ARCHITECTURAL]
}
}
def evaluate_operation(self, operation: Dict[str, Any]) -> HeuristicTags:
"""Evaluate an operation and assign heuristic tags."""
tags = HeuristicTags(
risk=self._assess_risk(operation),
complexity=self._assess_complexity(operation),
scope=self._assess_scope(operation),
persistence=self._assess_persistence(operation),
testing=self._determine_testing(operation),
approval=self._determine_approval(operation),
data_exposure=self._assess_data_exposure(operation)
)
# Apply cross-tag consistency rules
return self._apply_consistency_rules(tags)
def _assess_risk(self, operation: Dict[str, Any]) -> RiskLevel:
"""Assess risk level of operation."""
op_type = operation.get("type", "")
target = operation.get("target", "")
# Critical-risk operations (core system changes)
if any(x in target.lower() for x in ["router", "security", "validator", "registry"]):
return RiskLevel.CRITICAL
if op_type in ["delete", "modify_core", "security_change"]:
return RiskLevel.HIGH
# Medium-risk operations (system modifications)
if op_type in ["modify", "compose", "heal"]:
return RiskLevel.MEDIUM
# Low-risk operations (including web operations)
if op_type in ["create", "analyze", "document", "execute"]:
return RiskLevel.LOW
# Web operations are typically low risk
if target.startswith("web_"):
return RiskLevel.LOW
return RiskLevel.MEDIUM # Default
def _assess_complexity(self, operation: Dict[str, Any]) -> Complexity:
"""Assess complexity of operation."""
op_type = operation.get("type", "")
dependencies = operation.get("dependencies", [])
target = operation.get("target", "")
# Architectural complexity (core system changes)
if op_type in ["modify_core", "security_change"] or target.startswith("router"):
return Complexity.ARCHITECTURAL
# Complex operations (multi-step workflows)
if len(dependencies) > 3 or op_type in ["compose", "heal"]:
return Complexity.COMPLEX
# Moderate complexity (single modifications)
if len(dependencies) > 0 or op_type in ["modify"]:
return Complexity.MODERATE
# Web operations are typically simple
if target.startswith("web_"):
return Complexity.SIMPLE
# Simple operations (read/execute)
if op_type in ["create", "analyze", "document", "execute"]:
return Complexity.SIMPLE
return Complexity.SIMPLE # Default to simple for better UX
def _assess_scope(self, operation: Dict[str, Any]) -> Scope:
"""Assess scope of operation."""
target = operation.get("target", "")
op_type = operation.get("type", "")
# Security scope
if any(x in target.lower() for x in ["security", "validator", "auth"]):
return Scope.SECURITY
# Core scope
if any(x in target.lower() for x in ["router", "core", "server"]):
return Scope.CORE
# Workflow scope
if op_type in ["compose", "workflow"]:
return Scope.WORKFLOW
# Default to capability scope
return Scope.CAPABILITY
def _assess_persistence(self, operation: Dict[str, Any]) -> Persistence:
"""Assess persistence level of operation."""
op_type = operation.get("type", "")
if op_type in ["analyze", "inspect"]:
return Persistence.EPHEMERAL
if op_type in ["test", "experiment"]:
return Persistence.SESSION
if op_type in ["create", "modify"]:
return Persistence.PERSISTENT
return Persistence.PERMANENT
def _determine_testing(self, operation: Dict[str, Any]) -> Testing:
"""Determine testing requirements."""
scope = self._assess_scope(operation)
risk = self._assess_risk(operation)
# Parallel testing for core/security changes
if scope in [Scope.CORE, Scope.SECURITY]:
return Testing.PARALLEL
# Comprehensive testing for high-risk operations
if risk in [RiskLevel.HIGH, RiskLevel.CRITICAL]:
return Testing.COMPREHENSIVE
# Basic testing for modifications
if operation.get("type") in ["modify", "compose"]:
return Testing.BASIC
return Testing.NONE
def _determine_approval(self, operation: Dict[str, Any]) -> Approval:
"""Determine approval requirements."""
risk = self._assess_risk(operation)
scope = self._assess_scope(operation)
complexity = self._assess_complexity(operation)
# Manual approval for critical operations
if risk == RiskLevel.CRITICAL or scope == Scope.SECURITY:
return Approval.MANUAL
# Confirmation for high-risk or complex operations
if risk == RiskLevel.HIGH or complexity == Complexity.ARCHITECTURAL:
return Approval.CONFIRM
# Notification for medium-risk operations
if risk == RiskLevel.MEDIUM:
return Approval.NOTIFY
# Auto-approval for low-risk operations
return Approval.AUTO
def _assess_data_exposure(self, operation: Dict[str, Any]) -> DataExposure:
"""Assess data exposure risk of operation."""
op_type = operation.get("type", "")
target = operation.get("target", "")
description = operation.get("description", "").lower()
# High exfiltration risk - system information gathering
if any(keyword in target.lower() for keyword in ["facter", "system_info", "env", "processes", "network", "filesystem"]):
return DataExposure.EXFILTRATION
if any(keyword in description for keyword in ["system information", "environment variables", "process list", "network scan", "running processes"]):
return DataExposure.EXFILTRATION
# External data access
if target.startswith(("web_", "api_", "http")) or op_type in ["web_scrape", "api_call"]:
return DataExposure.EXTERNAL
# Internal system data
if any(keyword in target.lower() for keyword in ["database", "cache", "logs", "config"]):
return DataExposure.INTERNAL
# Local file access
if op_type in ["read_file", "write_file", "list_dir"] or "file" in target.lower():
return DataExposure.LOCAL
# No data exposure
if op_type in ["analyze", "validate", "test", "document"]:
return DataExposure.NONE
return DataExposure.NONE # Default to no exposure
def _apply_consistency_rules(self, tags: HeuristicTags) -> HeuristicTags:
"""Apply cross-tag consistency rules."""
# Critical risk always requires manual approval
if tags.risk == RiskLevel.CRITICAL:
tags.approval = Approval.MANUAL
tags.testing = Testing.PARALLEL
# Core scope always requires git commit
if tags.scope == Scope.CORE:
tags.persistence = Persistence.PERSISTENT
# Security scope always requires comprehensive testing
if tags.scope == Scope.SECURITY:
tags.testing = Testing.COMPREHENSIVE
# High data exposure requires manual approval and comprehensive testing
if tags.data_exposure in [DataExposure.EXTERNAL, DataExposure.EXFILTRATION]:
tags.approval = Approval.MANUAL
tags.testing = Testing.COMPREHENSIVE
if tags.data_exposure == DataExposure.EXFILTRATION:
tags.risk = RiskLevel.CRITICAL
return tags
def can_auto_approve(self, tags: HeuristicTags) -> bool:
"""Check if operation can be auto-approved."""
return (
tags.risk in self._rules["auto_approve"]["risk"] and
tags.complexity in self._rules["auto_approve"]["complexity"] and
tags.scope in self._rules["auto_approve"]["scope"]
)
def needs_parallel_testing(self, tags: HeuristicTags) -> bool:
"""Check if operation needs parallel testing."""
return (
tags.scope in self._rules["parallel_testing"]["scope"] or
tags.risk in self._rules["parallel_testing"]["risk"] or
tags.complexity in self._rules["parallel_testing"]["complexity"]
)
def needs_git_commit(self, tags: HeuristicTags) -> bool:
"""Check if operation needs git commit."""
return (
tags.persistence in self._rules["git_commit"]["persistence"] or
tags.scope in self._rules["git_commit"]["scope"]
)
def needs_manual_approval(self, tags: HeuristicTags) -> bool:
"""Check if operation needs manual approval."""
return (
tags.risk in self._rules["manual_approval"]["risk"] or
tags.scope in self._rules["manual_approval"]["scope"] or
tags.complexity in self._rules["manual_approval"]["complexity"]
)
async def update_tag_weight(
self,
tag: str,
new_value: int,
reason: str
) -> bool:
"""Update heuristic tag weight for adaptive learning."""
# For Phase 2, this is a placeholder for dynamic weight updates
# In a full implementation, this would modify the internal rules
# based on learning feedback
return True