adaptive_learning.pyโข18.7 kB
"""
Adaptive Learning Engine for ACP Heuristics
Phase 2: Implements dynamic heuristic adjustment based on execution history,
feedback collection, and performance tracking.
"""
import asyncio
import json
import logging
import os
import statistics
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass, asdict
from collections import defaultdict, deque
from ..utils.config import Config
from ..utils.error_handler import ErrorHandler
from .heuristics import HeuristicEngine, HeuristicTags
from .data_models import CapabilityProfile
from typing import Dict, Any
logger = logging.getLogger(__name__)
@dataclass
class ExecutionFeedback:
"""Feedback data from capability execution"""
capability_id: str
execution_id: str
timestamp: datetime
success: bool
execution_time: float
error_type: Optional[str] = None
error_message: Optional[str] = None
user_satisfaction: Optional[int] = None # 1-5 scale
performance_metrics: Optional[Dict[str, Any]] = None
heuristic_profile: Optional[CapabilityProfile] = None
actual_impact: Optional[Dict[str, Any]] = None
@dataclass
class PerformanceMetrics:
"""Performance tracking for capabilities"""
capability_id: str
total_executions: int
success_rate: float
avg_execution_time: float
last_execution: datetime
error_frequency: Dict[str, int]
performance_trend: List[float] # Recent performance scores
heuristic_accuracy: float # How well heuristics predicted success
@dataclass
class HeuristicAdjustment:
"""Record of heuristic adjustments"""
capability_id: str
timestamp: datetime
tag: str
old_value: int
new_value: int
reason: str
confidence: float
based_on_executions: int
class AdaptiveLearningEngine:
"""Adaptive learning system for dynamic heuristic adjustment"""
def __init__(self, config: Config):
self.config = config
self.error_handler = ErrorHandler()
self.heuristic_engine = HeuristicEngine()
# Storage paths
workspace_root = config.get('workspace_root', os.getcwd())
self.feedback_dir = Path(workspace_root) / ".katamari" / "acp" / "feedback"
self.metrics_dir = Path(workspace_root) / ".katamari" / "acp" / "metrics"
self.adjustments_dir = Path(workspace_root) / ".katamari" / "acp" / "adjustments"
# Create directories
for dir_path in [self.feedback_dir, self.metrics_dir, self.adjustments_dir]:
dir_path.mkdir(parents=True, exist_ok=True)
# In-memory caches
self._feedback_cache: Dict[str, List[ExecutionFeedback]] = defaultdict(list)
self._metrics_cache: Dict[str, PerformanceMetrics] = {}
self._adjustment_history: List[HeuristicAdjustment] = []
# Learning parameters
self.min_feedback_for_adjustment = 5
self.adjustment_confidence_threshold = 0.7
self.performance_window = 20 # Number of recent executions to consider
self.learning_rate = 0.1 # How much to adjust heuristics
logger.info("Adaptive learning engine initialized")
async def collect_feedback(self, feedback: ExecutionFeedback) -> None:
"""Collect execution feedback for learning"""
try:
# Store feedback
await self._store_feedback(feedback)
# Update metrics
await self._update_metrics(feedback)
# Check if adjustment is needed
await self._evaluate_adjustment_need(feedback.capability_id)
logger.debug(f"Collected feedback for {feedback.capability_id}: success={feedback.success}")
except Exception as e:
logger.error(f"Failed to collect feedback: {e}")
await self.error_handler.handle_error(e, "adaptive_learning.collect_feedback")
async def get_capability_metrics(self, capability_id: str) -> Optional[PerformanceMetrics]:
"""Get performance metrics for a capability"""
try:
if capability_id in self._metrics_cache:
return self._metrics_cache[capability_id]
# Load from disk
metrics_file = self.metrics_dir / f"{capability_id}.json"
if metrics_file.exists():
with open(metrics_file, 'r') as f:
data = json.load(f)
metrics = PerformanceMetrics(
capability_id=data['capability_id'],
total_executions=data['total_executions'],
success_rate=data['success_rate'],
avg_execution_time=data['avg_execution_time'],
last_execution=datetime.fromisoformat(data['last_execution']),
error_frequency=dict(data['error_frequency']),
performance_trend=data['performance_trend'],
heuristic_accuracy=data['heuristic_accuracy']
)
self._metrics_cache[capability_id] = metrics
return metrics
return None
except Exception as e:
logger.error(f"Failed to get capability metrics: {e}")
return None
async def analyze_heuristic_accuracy(self) -> Dict[str, float]:
"""Analyze how accurately heuristics predict execution success"""
try:
accuracy_scores = {}
for capability_id, feedback_list in self._feedback_cache.items():
if len(feedback_list) < 3:
continue
correct_predictions = 0
total_predictions = 0
for feedback in feedback_list:
if feedback.heuristic_profile:
# Predict success based on heuristic profile
predicted_success = await self._predict_success_from_heuristics(
feedback.heuristic_profile
)
if predicted_success is not None:
total_predictions += 1
if predicted_success == feedback.success:
correct_predictions += 1
if total_predictions > 0:
accuracy_scores[capability_id] = correct_predictions / total_predictions
return accuracy_scores
except Exception as e:
logger.error(f"Failed to analyze heuristic accuracy: {e}")
return {}
async def recommend_heuristic_adjustments(self) -> List[HeuristicAdjustment]:
"""Recommend heuristic adjustments based on learning"""
try:
adjustments = []
for capability_id, metrics in self._metrics_cache.items():
# Get recent feedback for this capability
recent_feedback = self._feedback_cache[capability_id][-self.performance_window:]
if len(recent_feedback) < self.min_feedback_for_adjustment:
continue
# Analyze patterns and recommend adjustments
capability_adjustments = await self._analyze_capability_patterns(
capability_id, recent_feedback, metrics
)
adjustments.extend(capability_adjustments)
# Sort by confidence
adjustments.sort(key=lambda x: x.confidence, reverse=True)
return adjustments
except Exception as e:
logger.error(f"Failed to recommend heuristic adjustments: {e}")
return []
async def apply_heuristic_adjustment(self, adjustment: HeuristicAdjustment) -> bool:
"""Apply a heuristic adjustment"""
try:
# Update heuristic engine weights
await self.heuristic_engine.update_tag_weight(
adjustment.tag,
adjustment.new_value,
reason=adjustment.reason
)
# Record adjustment
await self._record_adjustment(adjustment)
logger.info(f"Applied heuristic adjustment: {adjustment.tag} {adjustment.old_value}โ{adjustment.new_value}")
return True
except Exception as e:
logger.error(f"Failed to apply heuristic adjustment: {e}")
return False
async def get_learning_summary(self) -> Dict[str, Any]:
"""Get summary of learning progress"""
try:
total_feedback = sum(len(feedback_list) for feedback_list in self._feedback_cache.values())
total_adjustments = len(self._adjustment_history)
# Calculate average heuristic accuracy
accuracy_scores = await self.analyze_heuristic_accuracy()
avg_accuracy = statistics.mean(accuracy_scores.values()) if accuracy_scores else 0.0
# Recent adjustments
recent_adjustments = [
adj for adj in self._adjustment_history
if adj.timestamp > datetime.now() - timedelta(days=7)
]
return {
"total_feedback_collected": total_feedback,
"capabilities_tracked": len(self._metrics_cache),
"total_adjustments_made": total_adjustments,
"recent_adjustments": len(recent_adjustments),
"average_heuristic_accuracy": avg_accuracy,
"learning_rate": self.learning_rate,
"adjustment_confidence_threshold": self.adjustment_confidence_threshold
}
except Exception as e:
logger.error(f"Failed to get learning summary: {e}")
return {}
async def _store_feedback(self, feedback: ExecutionFeedback) -> None:
"""Store feedback to disk and cache"""
# Add to cache
self._feedback_cache[feedback.capability_id].append(feedback)
# Keep only recent feedback in cache
if len(self._feedback_cache[feedback.capability_id]) > self.performance_window * 2:
self._feedback_cache[feedback.capability_id] = self._feedback_cache[feedback.capability_id][-self.performance_window * 2:]
# Store to disk
feedback_file = self.feedback_dir / f"{feedback.capability_id}.jsonl"
with open(feedback_file, 'a') as f:
feedback_dict = asdict(feedback)
feedback_dict['timestamp'] = feedback.timestamp.isoformat()
if feedback.heuristic_profile:
feedback_dict['heuristic_profile'] = asdict(feedback.heuristic_profile)
f.write(json.dumps(feedback_dict) + '\n')
async def _update_metrics(self, feedback: ExecutionFeedback) -> None:
"""Update performance metrics for a capability"""
capability_id = feedback.capability_id
# Get existing metrics or create new
metrics = await self.get_capability_metrics(capability_id)
if not metrics:
metrics = PerformanceMetrics(
capability_id=capability_id,
total_executions=0,
success_rate=0.0,
avg_execution_time=0.0,
last_execution=feedback.timestamp,
error_frequency=defaultdict(int),
performance_trend=[],
heuristic_accuracy=0.0
)
# Update metrics
metrics.total_executions += 1
metrics.last_execution = feedback.timestamp
# Update success rate
if feedback.success:
metrics.performance_trend.append(1.0)
else:
metrics.performance_trend.append(0.0)
if feedback.error_type:
metrics.error_frequency[feedback.error_type] += 1
# Keep performance trend window limited
if len(metrics.performance_trend) > self.performance_window:
metrics.performance_trend = metrics.performance_trend[-self.performance_window:]
# Calculate updated metrics
metrics.success_rate = statistics.mean(metrics.performance_trend)
metrics.avg_execution_time = (
(metrics.avg_execution_time * (metrics.total_executions - 1) + feedback.execution_time) /
metrics.total_executions
)
# Cache and store
self._metrics_cache[capability_id] = metrics
metrics_file = self.metrics_dir / f"{capability_id}.json"
with open(metrics_file, 'w') as f:
metrics_dict = asdict(metrics)
metrics_dict['last_execution'] = metrics.last_execution.isoformat()
json.dump(metrics_dict, f, indent=2)
async def _evaluate_adjustment_need(self, capability_id: str) -> None:
"""Evaluate if heuristic adjustment is needed"""
try:
metrics = await self.get_capability_metrics(capability_id)
if not metrics or metrics.total_executions < self.min_feedback_for_adjustment:
return
# Check if success rate is below threshold
if metrics.success_rate < 0.7: # 70% success rate threshold
adjustments = await self.recommend_heuristic_adjustments()
# Apply high-confidence adjustments automatically
for adjustment in adjustments:
if (adjustment.confidence >= self.adjustment_confidence_threshold and
adjustment.capability_id == capability_id):
await self.apply_heuristic_adjustment(adjustment)
break # Apply one adjustment at a time
except Exception as e:
logger.error(f"Failed to evaluate adjustment need: {e}")
async def _analyze_capability_patterns(
self,
capability_id: str,
feedback_list: List[ExecutionFeedback],
metrics: PerformanceMetrics
) -> List[HeuristicAdjustment]:
"""Analyze patterns in capability execution and recommend adjustments"""
adjustments = []
# Group feedback by success/failure
successful = [f for f in feedback_list if f.success]
failed = [f for f in feedback_list if not f.success]
if not failed:
return adjustments # No failures to analyze
# Analyze heuristic patterns in failures
failure_heuristics = []
for feedback in failed:
if feedback.heuristic_profile:
failure_heuristics.append(feedback.heuristic_profile)
if not failure_heuristics:
return adjustments
# Calculate average heuristic values for failures
avg_failure_heuristics = await self._average_heuristic_profiles(failure_heuristics)
# Compare with successful executions if available
if successful:
success_heuristics = [f.heuristic_profile for f in successful if f.heuristic_profile]
if success_heuristics:
avg_success_heuristics = await self._average_heuristic_profiles(success_heuristics)
# Identify tags that differ significantly
for tag in HeuristicTags:
failure_val = getattr(avg_failure_heuristics, tag.name)
success_val = getattr(avg_success_heuristics, tag.name)
# If failures have higher values, recommend reduction
if failure_val > success_val + 1:
adjustment = HeuristicAdjustment(
capability_id=capability_id,
timestamp=datetime.now(),
tag=tag,
old_value=failure_val,
new_value=max(1, failure_val - 1),
reason=f"High values in failed executions (avg: {failure_val:.1f} vs success: {success_val:.1f})",
confidence=min(0.9, len(failed) / 10),
based_on_executions=len(failed)
)
adjustments.append(adjustment)
return adjustments
async def _average_heuristic_profiles(self, profiles: List[CapabilityProfile]) -> CapabilityProfile:
"""Calculate average heuristic profile from list"""
if not profiles:
return CapabilityProfile()
avg_profile = CapabilityProfile()
for tag in HeuristicTags:
values = [getattr(profile, tag.name) for profile in profiles]
avg_value = statistics.mean(values)
setattr(avg_profile, tag.name, round(avg_value))
return avg_profile
async def _predict_success_from_heuristics(self, profile: CapabilityProfile) -> Optional[bool]:
"""Predict execution success based on heuristic profile"""
try:
# Simple prediction: low risk, complexity, and scope values predict success
risk_score = (profile.risk + profile.complexity + profile.scope) / 3
# High risk score predicts failure
if risk_score > 7:
return False
elif risk_score < 4:
return True
else:
return None # Uncertain
except Exception as e:
logger.error(f"Failed to predict success from heuristics: {e}")
return None
async def _record_adjustment(self, adjustment: HeuristicAdjustment) -> None:
"""Record heuristic adjustment"""
self._adjustment_history.append(adjustment)
# Keep history manageable
if len(self._adjustment_history) > 1000:
self._adjustment_history = self._adjustment_history[-1000:]
# Store to disk
adjustments_file = self.adjustments_dir / "history.jsonl"
with open(adjustments_file, 'a') as f:
adjustment_dict = asdict(adjustment)
adjustment_dict['timestamp'] = adjustment.timestamp.isoformat()
adjustment_dict['tag'] = adjustment.tag
f.write(json.dumps(adjustment_dict) + '\n')