Skip to main content
Glama

Katamari MCP Server

by ciphernaut
adaptive_learning.pyโ€ข18.7 kB
""" Adaptive Learning Engine for ACP Heuristics Phase 2: Implements dynamic heuristic adjustment based on execution history, feedback collection, and performance tracking. """ import asyncio import json import logging import os import statistics from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass, asdict from collections import defaultdict, deque from ..utils.config import Config from ..utils.error_handler import ErrorHandler from .heuristics import HeuristicEngine, HeuristicTags from .data_models import CapabilityProfile from typing import Dict, Any logger = logging.getLogger(__name__) @dataclass class ExecutionFeedback: """Feedback data from capability execution""" capability_id: str execution_id: str timestamp: datetime success: bool execution_time: float error_type: Optional[str] = None error_message: Optional[str] = None user_satisfaction: Optional[int] = None # 1-5 scale performance_metrics: Optional[Dict[str, Any]] = None heuristic_profile: Optional[CapabilityProfile] = None actual_impact: Optional[Dict[str, Any]] = None @dataclass class PerformanceMetrics: """Performance tracking for capabilities""" capability_id: str total_executions: int success_rate: float avg_execution_time: float last_execution: datetime error_frequency: Dict[str, int] performance_trend: List[float] # Recent performance scores heuristic_accuracy: float # How well heuristics predicted success @dataclass class HeuristicAdjustment: """Record of heuristic adjustments""" capability_id: str timestamp: datetime tag: str old_value: int new_value: int reason: str confidence: float based_on_executions: int class AdaptiveLearningEngine: """Adaptive learning system for dynamic heuristic adjustment""" def __init__(self, config: Config): self.config = config self.error_handler = ErrorHandler() self.heuristic_engine = HeuristicEngine() # Storage paths workspace_root = config.get('workspace_root', os.getcwd()) self.feedback_dir = Path(workspace_root) / ".katamari" / "acp" / "feedback" self.metrics_dir = Path(workspace_root) / ".katamari" / "acp" / "metrics" self.adjustments_dir = Path(workspace_root) / ".katamari" / "acp" / "adjustments" # Create directories for dir_path in [self.feedback_dir, self.metrics_dir, self.adjustments_dir]: dir_path.mkdir(parents=True, exist_ok=True) # In-memory caches self._feedback_cache: Dict[str, List[ExecutionFeedback]] = defaultdict(list) self._metrics_cache: Dict[str, PerformanceMetrics] = {} self._adjustment_history: List[HeuristicAdjustment] = [] # Learning parameters self.min_feedback_for_adjustment = 5 self.adjustment_confidence_threshold = 0.7 self.performance_window = 20 # Number of recent executions to consider self.learning_rate = 0.1 # How much to adjust heuristics logger.info("Adaptive learning engine initialized") async def collect_feedback(self, feedback: ExecutionFeedback) -> None: """Collect execution feedback for learning""" try: # Store feedback await self._store_feedback(feedback) # Update metrics await self._update_metrics(feedback) # Check if adjustment is needed await self._evaluate_adjustment_need(feedback.capability_id) logger.debug(f"Collected feedback for {feedback.capability_id}: success={feedback.success}") except Exception as e: logger.error(f"Failed to collect feedback: {e}") await self.error_handler.handle_error(e, "adaptive_learning.collect_feedback") async def get_capability_metrics(self, capability_id: str) -> Optional[PerformanceMetrics]: """Get performance metrics for a capability""" try: if capability_id in self._metrics_cache: return self._metrics_cache[capability_id] # Load from disk metrics_file = self.metrics_dir / f"{capability_id}.json" if metrics_file.exists(): with open(metrics_file, 'r') as f: data = json.load(f) metrics = PerformanceMetrics( capability_id=data['capability_id'], total_executions=data['total_executions'], success_rate=data['success_rate'], avg_execution_time=data['avg_execution_time'], last_execution=datetime.fromisoformat(data['last_execution']), error_frequency=dict(data['error_frequency']), performance_trend=data['performance_trend'], heuristic_accuracy=data['heuristic_accuracy'] ) self._metrics_cache[capability_id] = metrics return metrics return None except Exception as e: logger.error(f"Failed to get capability metrics: {e}") return None async def analyze_heuristic_accuracy(self) -> Dict[str, float]: """Analyze how accurately heuristics predict execution success""" try: accuracy_scores = {} for capability_id, feedback_list in self._feedback_cache.items(): if len(feedback_list) < 3: continue correct_predictions = 0 total_predictions = 0 for feedback in feedback_list: if feedback.heuristic_profile: # Predict success based on heuristic profile predicted_success = await self._predict_success_from_heuristics( feedback.heuristic_profile ) if predicted_success is not None: total_predictions += 1 if predicted_success == feedback.success: correct_predictions += 1 if total_predictions > 0: accuracy_scores[capability_id] = correct_predictions / total_predictions return accuracy_scores except Exception as e: logger.error(f"Failed to analyze heuristic accuracy: {e}") return {} async def recommend_heuristic_adjustments(self) -> List[HeuristicAdjustment]: """Recommend heuristic adjustments based on learning""" try: adjustments = [] for capability_id, metrics in self._metrics_cache.items(): # Get recent feedback for this capability recent_feedback = self._feedback_cache[capability_id][-self.performance_window:] if len(recent_feedback) < self.min_feedback_for_adjustment: continue # Analyze patterns and recommend adjustments capability_adjustments = await self._analyze_capability_patterns( capability_id, recent_feedback, metrics ) adjustments.extend(capability_adjustments) # Sort by confidence adjustments.sort(key=lambda x: x.confidence, reverse=True) return adjustments except Exception as e: logger.error(f"Failed to recommend heuristic adjustments: {e}") return [] async def apply_heuristic_adjustment(self, adjustment: HeuristicAdjustment) -> bool: """Apply a heuristic adjustment""" try: # Update heuristic engine weights await self.heuristic_engine.update_tag_weight( adjustment.tag, adjustment.new_value, reason=adjustment.reason ) # Record adjustment await self._record_adjustment(adjustment) logger.info(f"Applied heuristic adjustment: {adjustment.tag} {adjustment.old_value}โ†’{adjustment.new_value}") return True except Exception as e: logger.error(f"Failed to apply heuristic adjustment: {e}") return False async def get_learning_summary(self) -> Dict[str, Any]: """Get summary of learning progress""" try: total_feedback = sum(len(feedback_list) for feedback_list in self._feedback_cache.values()) total_adjustments = len(self._adjustment_history) # Calculate average heuristic accuracy accuracy_scores = await self.analyze_heuristic_accuracy() avg_accuracy = statistics.mean(accuracy_scores.values()) if accuracy_scores else 0.0 # Recent adjustments recent_adjustments = [ adj for adj in self._adjustment_history if adj.timestamp > datetime.now() - timedelta(days=7) ] return { "total_feedback_collected": total_feedback, "capabilities_tracked": len(self._metrics_cache), "total_adjustments_made": total_adjustments, "recent_adjustments": len(recent_adjustments), "average_heuristic_accuracy": avg_accuracy, "learning_rate": self.learning_rate, "adjustment_confidence_threshold": self.adjustment_confidence_threshold } except Exception as e: logger.error(f"Failed to get learning summary: {e}") return {} async def _store_feedback(self, feedback: ExecutionFeedback) -> None: """Store feedback to disk and cache""" # Add to cache self._feedback_cache[feedback.capability_id].append(feedback) # Keep only recent feedback in cache if len(self._feedback_cache[feedback.capability_id]) > self.performance_window * 2: self._feedback_cache[feedback.capability_id] = self._feedback_cache[feedback.capability_id][-self.performance_window * 2:] # Store to disk feedback_file = self.feedback_dir / f"{feedback.capability_id}.jsonl" with open(feedback_file, 'a') as f: feedback_dict = asdict(feedback) feedback_dict['timestamp'] = feedback.timestamp.isoformat() if feedback.heuristic_profile: feedback_dict['heuristic_profile'] = asdict(feedback.heuristic_profile) f.write(json.dumps(feedback_dict) + '\n') async def _update_metrics(self, feedback: ExecutionFeedback) -> None: """Update performance metrics for a capability""" capability_id = feedback.capability_id # Get existing metrics or create new metrics = await self.get_capability_metrics(capability_id) if not metrics: metrics = PerformanceMetrics( capability_id=capability_id, total_executions=0, success_rate=0.0, avg_execution_time=0.0, last_execution=feedback.timestamp, error_frequency=defaultdict(int), performance_trend=[], heuristic_accuracy=0.0 ) # Update metrics metrics.total_executions += 1 metrics.last_execution = feedback.timestamp # Update success rate if feedback.success: metrics.performance_trend.append(1.0) else: metrics.performance_trend.append(0.0) if feedback.error_type: metrics.error_frequency[feedback.error_type] += 1 # Keep performance trend window limited if len(metrics.performance_trend) > self.performance_window: metrics.performance_trend = metrics.performance_trend[-self.performance_window:] # Calculate updated metrics metrics.success_rate = statistics.mean(metrics.performance_trend) metrics.avg_execution_time = ( (metrics.avg_execution_time * (metrics.total_executions - 1) + feedback.execution_time) / metrics.total_executions ) # Cache and store self._metrics_cache[capability_id] = metrics metrics_file = self.metrics_dir / f"{capability_id}.json" with open(metrics_file, 'w') as f: metrics_dict = asdict(metrics) metrics_dict['last_execution'] = metrics.last_execution.isoformat() json.dump(metrics_dict, f, indent=2) async def _evaluate_adjustment_need(self, capability_id: str) -> None: """Evaluate if heuristic adjustment is needed""" try: metrics = await self.get_capability_metrics(capability_id) if not metrics or metrics.total_executions < self.min_feedback_for_adjustment: return # Check if success rate is below threshold if metrics.success_rate < 0.7: # 70% success rate threshold adjustments = await self.recommend_heuristic_adjustments() # Apply high-confidence adjustments automatically for adjustment in adjustments: if (adjustment.confidence >= self.adjustment_confidence_threshold and adjustment.capability_id == capability_id): await self.apply_heuristic_adjustment(adjustment) break # Apply one adjustment at a time except Exception as e: logger.error(f"Failed to evaluate adjustment need: {e}") async def _analyze_capability_patterns( self, capability_id: str, feedback_list: List[ExecutionFeedback], metrics: PerformanceMetrics ) -> List[HeuristicAdjustment]: """Analyze patterns in capability execution and recommend adjustments""" adjustments = [] # Group feedback by success/failure successful = [f for f in feedback_list if f.success] failed = [f for f in feedback_list if not f.success] if not failed: return adjustments # No failures to analyze # Analyze heuristic patterns in failures failure_heuristics = [] for feedback in failed: if feedback.heuristic_profile: failure_heuristics.append(feedback.heuristic_profile) if not failure_heuristics: return adjustments # Calculate average heuristic values for failures avg_failure_heuristics = await self._average_heuristic_profiles(failure_heuristics) # Compare with successful executions if available if successful: success_heuristics = [f.heuristic_profile for f in successful if f.heuristic_profile] if success_heuristics: avg_success_heuristics = await self._average_heuristic_profiles(success_heuristics) # Identify tags that differ significantly for tag in HeuristicTags: failure_val = getattr(avg_failure_heuristics, tag.name) success_val = getattr(avg_success_heuristics, tag.name) # If failures have higher values, recommend reduction if failure_val > success_val + 1: adjustment = HeuristicAdjustment( capability_id=capability_id, timestamp=datetime.now(), tag=tag, old_value=failure_val, new_value=max(1, failure_val - 1), reason=f"High values in failed executions (avg: {failure_val:.1f} vs success: {success_val:.1f})", confidence=min(0.9, len(failed) / 10), based_on_executions=len(failed) ) adjustments.append(adjustment) return adjustments async def _average_heuristic_profiles(self, profiles: List[CapabilityProfile]) -> CapabilityProfile: """Calculate average heuristic profile from list""" if not profiles: return CapabilityProfile() avg_profile = CapabilityProfile() for tag in HeuristicTags: values = [getattr(profile, tag.name) for profile in profiles] avg_value = statistics.mean(values) setattr(avg_profile, tag.name, round(avg_value)) return avg_profile async def _predict_success_from_heuristics(self, profile: CapabilityProfile) -> Optional[bool]: """Predict execution success based on heuristic profile""" try: # Simple prediction: low risk, complexity, and scope values predict success risk_score = (profile.risk + profile.complexity + profile.scope) / 3 # High risk score predicts failure if risk_score > 7: return False elif risk_score < 4: return True else: return None # Uncertain except Exception as e: logger.error(f"Failed to predict success from heuristics: {e}") return None async def _record_adjustment(self, adjustment: HeuristicAdjustment) -> None: """Record heuristic adjustment""" self._adjustment_history.append(adjustment) # Keep history manageable if len(self._adjustment_history) > 1000: self._adjustment_history = self._adjustment_history[-1000:] # Store to disk adjustments_file = self.adjustments_dir / "history.jsonl" with open(adjustments_file, 'a') as f: adjustment_dict = asdict(adjustment) adjustment_dict['timestamp'] = adjustment.timestamp.isoformat() adjustment_dict['tag'] = adjustment.tag f.write(json.dumps(adjustment_dict) + '\n')

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server