feedback.pyโข16.5 kB
"""
Feedback Collection System for ACP
Phase 2: Implements comprehensive feedback collection from various sources
including user feedback, execution metrics, and automated monitoring.
"""
import asyncio
import json
import logging
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass, asdict, field
from enum import Enum
from ..utils.config import Config
from ..utils.error_handler import ErrorHandler
from .adaptive_learning import ExecutionFeedback, AdaptiveLearningEngine
logger = logging.getLogger(__name__)
class FeedbackType(Enum):
"""Types of feedback that can be collected"""
EXECUTION_RESULT = "execution_result"
USER_SATISFACTION = "user_satisfaction"
PERFORMANCE_METRICS = "performance_metrics"
ERROR_ANALYSIS = "error_analysis"
AUTOMATED_MONITORING = "automated_monitoring"
class FeedbackSource(Enum):
"""Sources of feedback"""
DIRECT_USER = "direct_user"
SYSTEM_MONITORING = "system_monitoring"
EXECUTION_ENGINE = "execution_engine"
TEST_RESULTS = "test_results"
GIT_ANALYSIS = "git_analysis"
@dataclass
class FeedbackSubmission:
"""User-submitted feedback"""
capability_id: str
execution_id: Optional[str]
feedback_type: FeedbackType
source: FeedbackSource
rating: Optional[int] = None # 1-5 scale
comment: Optional[str] = None
metrics: Optional[Dict[str, Any]] = None
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class AutomatedMetrics:
"""Automatically collected performance metrics"""
execution_time: float
memory_usage: Optional[int] = None # in MB
cpu_usage: Optional[float] = None # percentage
network_calls: Optional[int] = None
file_operations: Optional[int] = None
error_count: int = 0
warning_count: int = 0
class FeedbackCollector:
"""Centralized feedback collection system"""
def __init__(self, config: Config, learning_engine: AdaptiveLearningEngine):
self.config = config
self.learning_engine = learning_engine
self.error_handler = ErrorHandler()
# Storage paths
workspace_root = config.get('workspace_root', os.getcwd())
self.feedback_dir = Path(workspace_root) / ".katamari" / "acp" / "feedback"
self.submissions_file = self.feedback_dir / "submissions.jsonl"
# Create directories
self.feedback_dir.mkdir(parents=True, exist_ok=True)
# Feedback processing queue
self._feedback_queue: asyncio.Queue = asyncio.Queue()
self._processing_task: Optional[asyncio.Task] = None
# Feedback validation
self._valid_ratings = set(range(1, 6)) # 1-5 scale
logger.info("Feedback collector initialized")
async def start(self) -> None:
"""Start the feedback processing system"""
if self._processing_task is None:
self._processing_task = asyncio.create_task(self._process_feedback_queue())
logger.info("Feedback processing started")
async def stop(self) -> None:
"""Stop the feedback processing system"""
if self._processing_task:
self._processing_task.cancel()
try:
await self._processing_task
except asyncio.CancelledError:
pass
self._processing_task = None
logger.info("Feedback processing stopped")
async def submit_feedback(self, submission: FeedbackSubmission) -> bool:
"""Submit feedback for processing"""
try:
# Validate feedback
if not await self._validate_feedback(submission):
logger.warning(f"Invalid feedback submission: {submission}")
return False
# Queue for processing
await self._feedback_queue.put(submission)
logger.debug(f"Feedback submitted for {submission.capability_id}")
return True
except Exception as e:
logger.error(f"Failed to submit feedback: {e}")
return False
async def collect_execution_feedback(
self,
capability_id: str,
execution_id: str,
success: bool,
execution_time: float,
error_type: Optional[str] = None,
error_message: Optional[str] = None,
automated_metrics: Optional[AutomatedMetrics] = None,
heuristic_profile: Optional[Any] = None
) -> bool:
"""Collect feedback from capability execution"""
try:
# Create execution feedback
feedback = ExecutionFeedback(
capability_id=capability_id,
execution_id=execution_id,
timestamp=datetime.now(),
success=success,
execution_time=execution_time,
error_type=error_type,
error_message=error_message,
performance_metrics=asdict(automated_metrics) if automated_metrics else None,
heuristic_profile=heuristic_profile
)
# Submit to learning engine
await self.learning_engine.collect_feedback(feedback)
# Also create user-facing feedback submission
submission = FeedbackSubmission(
capability_id=capability_id,
execution_id=execution_id,
feedback_type=FeedbackType.EXECUTION_RESULT,
source=FeedbackSource.EXECUTION_ENGINE,
metrics={
"success": success,
"execution_time": execution_time,
"error_type": error_type,
"error_message": error_message,
"automated_metrics": asdict(automated_metrics) if automated_metrics else None
}
)
await self.submit_feedback(submission)
return True
except Exception as e:
logger.error(f"Failed to collect execution feedback: {e}")
return False
async def collect_user_satisfaction(
self,
capability_id: str,
execution_id: Optional[str],
rating: int,
comment: Optional[str] = None
) -> bool:
"""Collect user satisfaction feedback"""
try:
submission = FeedbackSubmission(
capability_id=capability_id,
execution_id=execution_id,
feedback_type=FeedbackType.USER_SATISFACTION,
source=FeedbackSource.DIRECT_USER,
rating=rating,
comment=comment
)
return await self.submit_feedback(submission)
except Exception as e:
logger.error(f"Failed to collect user satisfaction: {e}")
return False
async def collect_performance_metrics(
self,
capability_id: str,
execution_id: Optional[str],
metrics: AutomatedMetrics
) -> bool:
"""Collect automated performance metrics"""
try:
submission = FeedbackSubmission(
capability_id=capability_id,
execution_id=execution_id,
feedback_type=FeedbackType.PERFORMANCE_METRICS,
source=FeedbackSource.SYSTEM_MONITORING,
metrics=asdict(metrics)
)
return await self.submit_feedback(submission)
except Exception as e:
logger.error(f"Failed to collect performance metrics: {e}")
return False
async def get_feedback_summary(
self,
capability_id: Optional[str] = None,
feedback_type: Optional[FeedbackType] = None,
days_back: int = 30
) -> Dict[str, Any]:
"""Get summary of collected feedback"""
try:
cutoff_date = datetime.now() - timedelta(days=days_back)
# Read feedback submissions
submissions = await self._read_submissions(cutoff_date, capability_id, feedback_type)
if not submissions:
return {"total_submissions": 0}
# Calculate summary statistics
summary = {
"total_submissions": len(submissions),
"capability_breakdown": {},
"type_breakdown": {},
"source_breakdown": {},
"average_rating": None,
"rating_distribution": {i: 0 for i in self._valid_ratings},
"recent_trend": []
}
ratings = []
for submission in submissions:
# Capability breakdown
cap = submission.capability_id
summary["capability_breakdown"][cap] = summary["capability_breakdown"].get(cap, 0) + 1
# Type breakdown
ft = submission.feedback_type.value
summary["type_breakdown"][ft] = summary["type_breakdown"].get(ft, 0) + 1
# Source breakdown
src = submission.source.value
summary["source_breakdown"][src] = summary["source_breakdown"].get(src, 0) + 1
# Rating statistics
if submission.rating:
ratings.append(submission.rating)
summary["rating_distribution"][submission.rating] += 1
# Calculate average rating
if ratings:
summary["average_rating"] = sum(ratings) / len(ratings)
# Recent trend (last 7 days)
recent_cutoff = datetime.now() - timedelta(days=7)
recent_submissions = [s for s in submissions if s.timestamp > recent_cutoff]
if recent_submissions:
recent_ratings = [s.rating for s in recent_submissions if s.rating]
if recent_ratings:
summary["recent_trend"] = {
"count": len(recent_submissions),
"average_rating": sum(recent_ratings) / len(recent_ratings)
}
return summary
except Exception as e:
logger.error(f"Failed to get feedback summary: {e}")
return {"error": str(e)}
async def _validate_feedback(self, submission: FeedbackSubmission) -> bool:
"""Validate feedback submission"""
try:
# Check required fields
if not submission.capability_id:
return False
# Validate rating if provided
if submission.rating and submission.rating not in self._valid_ratings:
return False
# Validate comment length
if submission.comment and len(submission.comment) > 1000:
return False
return True
except Exception as e:
logger.error(f"Feedback validation error: {e}")
return False
async def _process_feedback_queue(self) -> None:
"""Process feedback submissions from queue"""
while True:
try:
# Get submission from queue
submission = await self._feedback_queue.get()
# Store submission
await self._store_submission(submission)
# Process based on type
await self._process_submission(submission)
# Mark task as done
self._feedback_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error processing feedback: {e}")
async def _store_submission(self, submission: FeedbackSubmission) -> None:
"""Store feedback submission to disk"""
try:
submission_dict = asdict(submission)
submission_dict['timestamp'] = submission.timestamp.isoformat()
submission_dict['feedback_type'] = submission.feedback_type.value
submission_dict['source'] = submission.source.value
with open(self.submissions_file, 'a') as f:
f.write(json.dumps(submission_dict) + '\n')
except Exception as e:
logger.error(f"Failed to store submission: {e}")
async def _process_submission(self, submission: FeedbackSubmission) -> None:
"""Process feedback submission based on type"""
try:
if submission.feedback_type == FeedbackType.USER_SATISFACTION:
# Update user satisfaction in learning engine
await self._update_user_satisfaction(submission)
elif submission.feedback_type == FeedbackType.PERFORMANCE_METRICS:
# Update performance metrics
await self._update_performance_metrics(submission)
elif submission.feedback_type == FeedbackType.ERROR_ANALYSIS:
# Analyze error patterns
await self._analyze_error_patterns(submission)
except Exception as e:
logger.error(f"Failed to process submission: {e}")
async def _update_user_satisfaction(self, submission: FeedbackSubmission) -> None:
"""Update user satisfaction metrics"""
# This would update the learning engine with user satisfaction data
# Implementation depends on learning engine interface
pass
async def _update_performance_metrics(self, submission: FeedbackSubmission) -> None:
"""Update performance metrics"""
# This would update performance tracking
# Implementation depends on metrics system
pass
async def _analyze_error_patterns(self, submission: FeedbackSubmission) -> None:
"""Analyze error patterns for learning"""
# This would analyze error patterns for heuristic adjustment
# Implementation depends on analysis requirements
pass
async def _read_submissions(
self,
since: datetime,
capability_id: Optional[str] = None,
feedback_type: Optional[FeedbackType] = None
) -> List[FeedbackSubmission]:
"""Read feedback submissions from disk"""
try:
submissions = []
if not self.submissions_file.exists():
return submissions
with open(self.submissions_file, 'r') as f:
for line in f:
try:
data = json.loads(line.strip())
submission = FeedbackSubmission(
capability_id=data['capability_id'],
execution_id=data.get('execution_id'),
feedback_type=FeedbackType(data['feedback_type']),
source=FeedbackSource(data['source']),
rating=data.get('rating'),
comment=data.get('comment'),
metrics=data.get('metrics'),
timestamp=datetime.fromisoformat(data['timestamp'])
)
# Apply filters
if submission.timestamp < since:
continue
if capability_id and submission.capability_id != capability_id:
continue
if feedback_type and submission.feedback_type != feedback_type:
continue
submissions.append(submission)
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.warning(f"Skipping invalid feedback line: {e}")
continue
return submissions
except Exception as e:
logger.error(f"Failed to read submissions: {e}")
return []