Skip to main content
Glama

Katamari MCP Server

by ciphernaut
feedback.pyโ€ข16.5 kB
""" Feedback Collection System for ACP Phase 2: Implements comprehensive feedback collection from various sources including user feedback, execution metrics, and automated monitoring. """ import asyncio import json import logging import os from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Any, Union from dataclasses import dataclass, asdict, field from enum import Enum from ..utils.config import Config from ..utils.error_handler import ErrorHandler from .adaptive_learning import ExecutionFeedback, AdaptiveLearningEngine logger = logging.getLogger(__name__) class FeedbackType(Enum): """Types of feedback that can be collected""" EXECUTION_RESULT = "execution_result" USER_SATISFACTION = "user_satisfaction" PERFORMANCE_METRICS = "performance_metrics" ERROR_ANALYSIS = "error_analysis" AUTOMATED_MONITORING = "automated_monitoring" class FeedbackSource(Enum): """Sources of feedback""" DIRECT_USER = "direct_user" SYSTEM_MONITORING = "system_monitoring" EXECUTION_ENGINE = "execution_engine" TEST_RESULTS = "test_results" GIT_ANALYSIS = "git_analysis" @dataclass class FeedbackSubmission: """User-submitted feedback""" capability_id: str execution_id: Optional[str] feedback_type: FeedbackType source: FeedbackSource rating: Optional[int] = None # 1-5 scale comment: Optional[str] = None metrics: Optional[Dict[str, Any]] = None timestamp: datetime = field(default_factory=datetime.now) @dataclass class AutomatedMetrics: """Automatically collected performance metrics""" execution_time: float memory_usage: Optional[int] = None # in MB cpu_usage: Optional[float] = None # percentage network_calls: Optional[int] = None file_operations: Optional[int] = None error_count: int = 0 warning_count: int = 0 class FeedbackCollector: """Centralized feedback collection system""" def __init__(self, config: Config, learning_engine: AdaptiveLearningEngine): self.config = config self.learning_engine = learning_engine self.error_handler = ErrorHandler() # Storage paths workspace_root = config.get('workspace_root', os.getcwd()) self.feedback_dir = Path(workspace_root) / ".katamari" / "acp" / "feedback" self.submissions_file = self.feedback_dir / "submissions.jsonl" # Create directories self.feedback_dir.mkdir(parents=True, exist_ok=True) # Feedback processing queue self._feedback_queue: asyncio.Queue = asyncio.Queue() self._processing_task: Optional[asyncio.Task] = None # Feedback validation self._valid_ratings = set(range(1, 6)) # 1-5 scale logger.info("Feedback collector initialized") async def start(self) -> None: """Start the feedback processing system""" if self._processing_task is None: self._processing_task = asyncio.create_task(self._process_feedback_queue()) logger.info("Feedback processing started") async def stop(self) -> None: """Stop the feedback processing system""" if self._processing_task: self._processing_task.cancel() try: await self._processing_task except asyncio.CancelledError: pass self._processing_task = None logger.info("Feedback processing stopped") async def submit_feedback(self, submission: FeedbackSubmission) -> bool: """Submit feedback for processing""" try: # Validate feedback if not await self._validate_feedback(submission): logger.warning(f"Invalid feedback submission: {submission}") return False # Queue for processing await self._feedback_queue.put(submission) logger.debug(f"Feedback submitted for {submission.capability_id}") return True except Exception as e: logger.error(f"Failed to submit feedback: {e}") return False async def collect_execution_feedback( self, capability_id: str, execution_id: str, success: bool, execution_time: float, error_type: Optional[str] = None, error_message: Optional[str] = None, automated_metrics: Optional[AutomatedMetrics] = None, heuristic_profile: Optional[Any] = None ) -> bool: """Collect feedback from capability execution""" try: # Create execution feedback feedback = ExecutionFeedback( capability_id=capability_id, execution_id=execution_id, timestamp=datetime.now(), success=success, execution_time=execution_time, error_type=error_type, error_message=error_message, performance_metrics=asdict(automated_metrics) if automated_metrics else None, heuristic_profile=heuristic_profile ) # Submit to learning engine await self.learning_engine.collect_feedback(feedback) # Also create user-facing feedback submission submission = FeedbackSubmission( capability_id=capability_id, execution_id=execution_id, feedback_type=FeedbackType.EXECUTION_RESULT, source=FeedbackSource.EXECUTION_ENGINE, metrics={ "success": success, "execution_time": execution_time, "error_type": error_type, "error_message": error_message, "automated_metrics": asdict(automated_metrics) if automated_metrics else None } ) await self.submit_feedback(submission) return True except Exception as e: logger.error(f"Failed to collect execution feedback: {e}") return False async def collect_user_satisfaction( self, capability_id: str, execution_id: Optional[str], rating: int, comment: Optional[str] = None ) -> bool: """Collect user satisfaction feedback""" try: submission = FeedbackSubmission( capability_id=capability_id, execution_id=execution_id, feedback_type=FeedbackType.USER_SATISFACTION, source=FeedbackSource.DIRECT_USER, rating=rating, comment=comment ) return await self.submit_feedback(submission) except Exception as e: logger.error(f"Failed to collect user satisfaction: {e}") return False async def collect_performance_metrics( self, capability_id: str, execution_id: Optional[str], metrics: AutomatedMetrics ) -> bool: """Collect automated performance metrics""" try: submission = FeedbackSubmission( capability_id=capability_id, execution_id=execution_id, feedback_type=FeedbackType.PERFORMANCE_METRICS, source=FeedbackSource.SYSTEM_MONITORING, metrics=asdict(metrics) ) return await self.submit_feedback(submission) except Exception as e: logger.error(f"Failed to collect performance metrics: {e}") return False async def get_feedback_summary( self, capability_id: Optional[str] = None, feedback_type: Optional[FeedbackType] = None, days_back: int = 30 ) -> Dict[str, Any]: """Get summary of collected feedback""" try: cutoff_date = datetime.now() - timedelta(days=days_back) # Read feedback submissions submissions = await self._read_submissions(cutoff_date, capability_id, feedback_type) if not submissions: return {"total_submissions": 0} # Calculate summary statistics summary = { "total_submissions": len(submissions), "capability_breakdown": {}, "type_breakdown": {}, "source_breakdown": {}, "average_rating": None, "rating_distribution": {i: 0 for i in self._valid_ratings}, "recent_trend": [] } ratings = [] for submission in submissions: # Capability breakdown cap = submission.capability_id summary["capability_breakdown"][cap] = summary["capability_breakdown"].get(cap, 0) + 1 # Type breakdown ft = submission.feedback_type.value summary["type_breakdown"][ft] = summary["type_breakdown"].get(ft, 0) + 1 # Source breakdown src = submission.source.value summary["source_breakdown"][src] = summary["source_breakdown"].get(src, 0) + 1 # Rating statistics if submission.rating: ratings.append(submission.rating) summary["rating_distribution"][submission.rating] += 1 # Calculate average rating if ratings: summary["average_rating"] = sum(ratings) / len(ratings) # Recent trend (last 7 days) recent_cutoff = datetime.now() - timedelta(days=7) recent_submissions = [s for s in submissions if s.timestamp > recent_cutoff] if recent_submissions: recent_ratings = [s.rating for s in recent_submissions if s.rating] if recent_ratings: summary["recent_trend"] = { "count": len(recent_submissions), "average_rating": sum(recent_ratings) / len(recent_ratings) } return summary except Exception as e: logger.error(f"Failed to get feedback summary: {e}") return {"error": str(e)} async def _validate_feedback(self, submission: FeedbackSubmission) -> bool: """Validate feedback submission""" try: # Check required fields if not submission.capability_id: return False # Validate rating if provided if submission.rating and submission.rating not in self._valid_ratings: return False # Validate comment length if submission.comment and len(submission.comment) > 1000: return False return True except Exception as e: logger.error(f"Feedback validation error: {e}") return False async def _process_feedback_queue(self) -> None: """Process feedback submissions from queue""" while True: try: # Get submission from queue submission = await self._feedback_queue.get() # Store submission await self._store_submission(submission) # Process based on type await self._process_submission(submission) # Mark task as done self._feedback_queue.task_done() except asyncio.CancelledError: break except Exception as e: logger.error(f"Error processing feedback: {e}") async def _store_submission(self, submission: FeedbackSubmission) -> None: """Store feedback submission to disk""" try: submission_dict = asdict(submission) submission_dict['timestamp'] = submission.timestamp.isoformat() submission_dict['feedback_type'] = submission.feedback_type.value submission_dict['source'] = submission.source.value with open(self.submissions_file, 'a') as f: f.write(json.dumps(submission_dict) + '\n') except Exception as e: logger.error(f"Failed to store submission: {e}") async def _process_submission(self, submission: FeedbackSubmission) -> None: """Process feedback submission based on type""" try: if submission.feedback_type == FeedbackType.USER_SATISFACTION: # Update user satisfaction in learning engine await self._update_user_satisfaction(submission) elif submission.feedback_type == FeedbackType.PERFORMANCE_METRICS: # Update performance metrics await self._update_performance_metrics(submission) elif submission.feedback_type == FeedbackType.ERROR_ANALYSIS: # Analyze error patterns await self._analyze_error_patterns(submission) except Exception as e: logger.error(f"Failed to process submission: {e}") async def _update_user_satisfaction(self, submission: FeedbackSubmission) -> None: """Update user satisfaction metrics""" # This would update the learning engine with user satisfaction data # Implementation depends on learning engine interface pass async def _update_performance_metrics(self, submission: FeedbackSubmission) -> None: """Update performance metrics""" # This would update performance tracking # Implementation depends on metrics system pass async def _analyze_error_patterns(self, submission: FeedbackSubmission) -> None: """Analyze error patterns for learning""" # This would analyze error patterns for heuristic adjustment # Implementation depends on analysis requirements pass async def _read_submissions( self, since: datetime, capability_id: Optional[str] = None, feedback_type: Optional[FeedbackType] = None ) -> List[FeedbackSubmission]: """Read feedback submissions from disk""" try: submissions = [] if not self.submissions_file.exists(): return submissions with open(self.submissions_file, 'r') as f: for line in f: try: data = json.loads(line.strip()) submission = FeedbackSubmission( capability_id=data['capability_id'], execution_id=data.get('execution_id'), feedback_type=FeedbackType(data['feedback_type']), source=FeedbackSource(data['source']), rating=data.get('rating'), comment=data.get('comment'), metrics=data.get('metrics'), timestamp=datetime.fromisoformat(data['timestamp']) ) # Apply filters if submission.timestamp < since: continue if capability_id and submission.capability_id != capability_id: continue if feedback_type and submission.feedback_type != feedback_type: continue submissions.append(submission) except (json.JSONDecodeError, KeyError, ValueError) as e: logger.warning(f"Skipping invalid feedback line: {e}") continue return submissions except Exception as e: logger.error(f"Failed to read submissions: {e}") return []

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server