data_models.pyโข12.2 kB
"""
Data Models for ACP Phase 2
Centralized data models for feedback, learning, and performance tracking.
Provides consistent data structures across the ACP system.
"""
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass, asdict, field
import uuid
class FeedbackChannel(Enum):
"""Channels for feedback collection"""
DIRECT_USER = "direct_user"
AUTOMATIC = "automatic"
SYSTEM_MONITORING = "system_monitoring"
TEST_RESULTS = "test_results"
PERFORMANCE_METRICS = "performance_metrics"
class LearningSignal(Enum):
"""Types of learning signals"""
SUCCESS = "success"
FAILURE = "failure"
PERFORMANCE_DEGRADATION = "performance_degradation"
USER_DISSATISFACTION = "user_dissatisfaction"
ERROR_PATTERN = "error_pattern"
RESOURCE_EXHAUSTION = "resource_exhaustion"
class AdaptationType(Enum):
"""Types of heuristic adaptations"""
WEIGHT_ADJUSTMENT = "weight_adjustment"
THRESHOLD_CHANGE = "threshold_change"
RULE_MODIFICATION = "rule_modification"
TAG_RECLASSIFICATION = "tag_reclassification"
@dataclass
class FeedbackEvent:
"""Base feedback event"""
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
capability_id: str = ""
execution_id: Optional[str] = None
channel: FeedbackChannel = FeedbackChannel.AUTOMATIC
signal: LearningSignal = LearningSignal.SUCCESS
# Event data
success: bool = True
execution_time: float = 0.0
error_type: Optional[str] = None
error_message: Optional[str] = None
# User feedback
user_rating: Optional[int] = None # 1-5 scale
user_comment: Optional[str] = None
# Performance data
performance_metrics: Optional[Dict[str, Any]] = None
resource_usage: Optional[Dict[str, Any]] = None
# Context
heuristic_profile: Optional[Dict[str, Any]] = None
operation_context: Optional[Dict[str, Any]] = None
# Metadata
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class LearningRecord:
"""Record of learning activity"""
record_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
adaptation_type: AdaptationType = AdaptationType.WEIGHT_ADJUSTMENT
# What triggered this learning
trigger_event_id: str = ""
trigger_signal: LearningSignal = LearningSignal.SUCCESS
capability_id: str = ""
# What was adapted
target_component: str = "" # e.g., "heuristic_engine", "router", "controller"
target_parameter: str = "" # e.g., "risk_weight", "approval_threshold"
# Adaptation details
old_value: Optional[Union[str, int, float, bool]] = None
new_value: Optional[Union[str, int, float, bool]] = None
adaptation_reason: str = ""
# Learning metadata
confidence: float = 0.0 # 0-1 confidence in this adaptation
based_on_samples: int = 0 # Number of samples that led to this
expected_impact: str = "" # Expected impact description
# Validation
validation_pending: bool = True
validation_result: Optional[bool] = None
validation_timestamp: Optional[datetime] = None
@dataclass
class PerformanceSnapshot:
"""Snapshot of system performance"""
snapshot_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
capability_id: str = ""
execution_id: Optional[str] = None
# Timing
start_time: datetime = field(default_factory=datetime.now)
end_time: Optional[datetime] = None
duration: float = 0.0
# Resource usage
cpu_percent: float = 0.0
memory_mb: int = 0
memory_percent: float = 0.0
disk_io_read_bytes: int = 0
disk_io_write_bytes: int = 0
network_io_sent_bytes: int = 0
network_io_recv_bytes: int = 0
# Process metrics
thread_count: int = 0
open_files: int = 0
file_descriptors: int = 0
# Custom metrics
custom_metrics: Dict[str, Any] = field(default_factory=dict)
@dataclass
class CapabilityProfile:
"""Profile of a capability's behavior and performance"""
capability_id: str = ""
created_at: datetime = field(default_factory=datetime.now)
last_updated: datetime = field(default_factory=datetime.now)
# Execution statistics
total_executions: int = 0
successful_executions: int = 0
failed_executions: int = 0
success_rate: float = 0.0
# Performance statistics
avg_execution_time: float = 0.0
min_execution_time: float = 0.0
max_execution_time: float = 0.0
avg_memory_usage: float = 0.0
peak_memory_usage: int = 0
# Error analysis
error_frequency: Dict[str, int] = field(default_factory=dict)
common_errors: List[str] = field(default_factory=list)
error_patterns: List[str] = field(default_factory=list)
# User satisfaction
user_ratings: List[int] = field(default_factory=list)
avg_user_rating: Optional[float] = None
user_comments: List[str] = field(default_factory=list)
# Heuristic accuracy
heuristic_predictions: int = 0
heuristic_accuracy: float = 0.0
# Learning history
adaptations_applied: List[str] = field(default_factory=list) # Learning record IDs
learning_effectiveness: float = 0.0
# Health indicators
health_score: float = 0.0 # 0-100
performance_grade: str = "F" # A, B, C, D, F
stability_score: float = 0.0 # 0-100
# Metadata
tags: List[str] = field(default_factory=list)
dependencies: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class AdaptationProposal:
"""Proposal for heuristic adaptation"""
proposal_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
# Proposal details
adaptation_type: AdaptationType = AdaptationType.WEIGHT_ADJUSTMENT
target_capability: str = ""
target_component: str = ""
target_parameter: str = ""
# Proposed change
current_value: Optional[Union[str, int, float, bool]] = None
proposed_value: Optional[Union[str, int, float, bool]] = None
change_magnitude: float = 0.0 # How big the change is
# Rationale
reasoning: str = ""
supporting_evidence: List[str] = field(default_factory=list)
confidence_score: float = 0.0 # 0-1
# Expected impact
expected_benefit: str = ""
expected_risk: str = ""
impact_confidence: float = 0.0
# Validation
validation_method: str = ""
validation_criteria: List[str] = field(default_factory=list)
rollback_plan: str = ""
# Status
status: str = "proposed" # proposed, approved, rejected, applied, rolled_back
reviewed_by: Optional[str] = None
reviewed_at: Optional[datetime] = None
applied_at: Optional[datetime] = None
# Results
actual_impact: Optional[str] = None
success_metrics: Optional[Dict[str, Any]] = None
side_effects: List[str] = field(default_factory=list)
@dataclass
class LearningSession:
"""A learning session that processes feedback and generates adaptations"""
session_id: str = field(default_factory=lambda: str(uuid.uuid4()))
start_time: datetime = field(default_factory=datetime.now)
end_time: Optional[datetime] = None
# Session scope
capability_filter: Optional[List[str]] = None
time_window_hours: int = 24
feedback_types: List[LearningSignal] = field(default_factory=list)
# Processing results
feedback_events_processed: int = 0
adaptations_generated: int = 0
adaptations_applied: int = 0
# Quality metrics
processing_accuracy: float = 0.0
adaptation_success_rate: float = 0.0
# Session data
processed_events: List[str] = field(default_factory=list) # Event IDs
generated_proposals: List[str] = field(default_factory=list) # Proposal IDs
applied_adaptations: List[str] = field(default_factory=list) # Record IDs
# Status
status: str = "running" # running, completed, failed, cancelled
error_message: Optional[str] = None
# Metadata
session_config: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
# Validation functions
def validate_feedback_event(event: FeedbackEvent) -> List[str]:
"""Validate a feedback event and return list of issues"""
issues = []
if not event.capability_id:
issues.append("capability_id is required")
if event.user_rating and (event.user_rating < 1 or event.user_rating > 5):
issues.append("user_rating must be between 1 and 5")
if event.execution_time < 0:
issues.append("execution_time cannot be negative")
# FeedbackEvent doesn't have confidence field, remove this check
return issues
def validate_learning_record(record: LearningRecord) -> List[str]:
"""Validate a learning record and return list of issues"""
issues = []
if not record.trigger_event_id:
issues.append("trigger_event_id is required")
if not record.capability_id:
issues.append("capability_id is required")
if not record.target_component:
issues.append("target_component is required")
if record.confidence < 0 or record.confidence > 1:
issues.append("confidence must be between 0 and 1")
if record.based_on_samples < 0:
issues.append("based_on_samples cannot be negative")
return issues
def validate_adaptation_proposal(proposal: AdaptationProposal) -> List[str]:
"""Validate an adaptation proposal and return list of issues"""
issues = []
if not proposal.target_capability:
issues.append("target_capability is required")
if not proposal.target_component:
issues.append("target_component is required")
if not proposal.target_parameter:
issues.append("target_parameter is required")
if proposal.confidence_score < 0 or proposal.confidence_score > 1:
issues.append("confidence_score must be between 0 and 1")
if proposal.impact_confidence < 0 or proposal.impact_confidence > 1:
issues.append("impact_confidence must be between 0 and 1")
if not proposal.reasoning:
issues.append("reasoning is required")
return issues
# Serialization helpers
def serialize_event(event: FeedbackEvent) -> Dict[str, Any]:
"""Serialize feedback event to dictionary"""
data = asdict(event)
data['timestamp'] = event.timestamp.isoformat()
data['channel'] = event.channel.value
data['signal'] = event.signal.value
return data
def deserialize_event(data: Dict[str, Any]) -> FeedbackEvent:
"""Deserialize feedback event from dictionary"""
data['timestamp'] = datetime.fromisoformat(data['timestamp'])
data['channel'] = FeedbackChannel(data['channel'])
data['signal'] = LearningSignal(data['signal'])
return FeedbackEvent(**data)
def serialize_learning_record(record: LearningRecord) -> Dict[str, Any]:
"""Serialize learning record to dictionary"""
data = asdict(record)
data['timestamp'] = record.timestamp.isoformat()
data['trigger_signal'] = record.trigger_signal.value
data['adaptation_type'] = record.adaptation_type.value
if record.validation_timestamp:
data['validation_timestamp'] = record.validation_timestamp.isoformat()
return data
def deserialize_learning_record(data: Dict[str, Any]) -> LearningRecord:
"""Deserialize learning record from dictionary"""
data['timestamp'] = datetime.fromisoformat(data['timestamp'])
data['trigger_signal'] = LearningSignal(data['trigger_signal'])
data['adaptation_type'] = AdaptationType(data['adaptation_type'])
if data.get('validation_timestamp'):
data['validation_timestamp'] = datetime.fromisoformat(data['validation_timestamp'])
return LearningRecord(**data)