"""
MCP Tool Result Types - Agent Orchestration Platform
Architecture Integration:
- Design Patterns: Result pattern with comprehensive status reporting
- Security Model: Secure result transmission with validation
- Performance Profile: Efficient result serialization with minimal overhead
Technical Decisions:
- Standardized Results: Consistent result structure across all MCP tools
- Rich Metadata: Comprehensive operation context and diagnostics
- Error Handling: Detailed error information with recovery guidance
- Serialization: JSON-compatible types for MCP protocol compliance
Dependencies & Integration:
- External: None (stdlib only for maximum portability)
- Internal: Foundation for all MCP tool implementations
Quality Assurance:
- Test Coverage: Property-based testing for all result operations
- Error Handling: Comprehensive validation with detailed diagnostics
Author: Adder_3 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from .agent import AgentState, AgentStatus
from .communication import OperationResult, OperationStatus
from .ids import AgentId, ProcessId, SessionId, TabId
from .session import SessionState, SessionStatus
# ============================================================================
# MCP TOOL ENUMERATION - Tool Types and Result Categories
# ============================================================================
class MCPToolType(Enum):
"""Types of MCP tools in the orchestration platform."""
AGENT_MANAGEMENT = "agent_management"
SESSION_MANAGEMENT = "session_management"
COMMUNICATION = "communication"
MONITORING = "monitoring"
CONFIGURATION = "configuration"
class ResultCategory(Enum):
"""Categories of MCP tool results."""
CREATION = "creation"
DELETION = "deletion"
UPDATE = "update"
QUERY = "query"
OPERATION = "operation"
# ============================================================================
# BASE MCP TOOL RESULT - Generic Result Type
# ============================================================================
@dataclass(frozen=True)
class MCPToolResult(OperationResult):
"""
Base result type for all MCP tool operations.
Provides standardized result structure with success/failure status,
data payload, and comprehensive error information.
"""
success: bool = False
data: Optional[Any] = None
operation_id: Optional[str] = None
def __post_init__(self):
"""Validate MCP tool result structure."""
super().__post_init__()
# Ensure success consistency with status
if self.success and self.status == OperationStatus.FAILURE:
raise ValueError("Success flag cannot be True with FAILURE status")
if not self.success and self.status == OperationStatus.SUCCESS:
raise ValueError("Success flag cannot be False with SUCCESS status")
@classmethod
def success_result(
cls,
message: str,
data: Optional[Any] = None,
operation_id: Optional[str] = None,
) -> "MCPToolResult":
"""Create successful result."""
return cls(
operation_type="mcp_tool",
status=OperationStatus.SUCCESS,
message=message.strip(),
success=True,
data=data,
operation_id=operation_id,
metadata={"operation_id": operation_id} if operation_id else {},
)
@classmethod
def error_result(
cls,
message: str,
error_code: str,
error_details: Optional[str] = None,
operation_id: Optional[str] = None,
) -> "MCPToolResult":
"""Create error result."""
return cls(
operation_type="mcp_tool",
status=OperationStatus.FAILURE,
message=message.strip(),
error_code=(
error_code.strip() if error_code and error_code.strip() else error_code
),
error_details={"details": error_details} if error_details else {},
success=False,
data=None,
operation_id=operation_id,
metadata={"operation_id": operation_id} if operation_id else {},
)
# ============================================================================
# HEALTH STATUS MONITORING - System Health Results
# ============================================================================
@dataclass(frozen=True)
class HealthStatus:
"""
System health status information.
Provides comprehensive system metrics for monitoring
and alerting on system performance and resource usage.
"""
overall_status: str = "unknown"
cpu_usage_percent: float = 0.0
memory_usage_percent: float = 0.0
disk_usage_percent: float = 0.0
active_sessions: int = 0
active_agents: int = 0
error_count: int = 0
uptime_seconds: int = 0
def __post_init__(self):
"""Validate health status metrics."""
if self.overall_status not in {"healthy", "warning", "critical", "unknown"}:
raise ValueError(f"Invalid overall status: {self.overall_status}")
if self.cpu_usage_percent < 0.0 or self.cpu_usage_percent > 800.0:
raise ValueError(
f"CPU usage must be between 0-800%: {self.cpu_usage_percent}"
)
if self.memory_usage_percent < 0.0 or self.memory_usage_percent > 100.0:
raise ValueError(
f"Memory usage must be between 0-100%: {self.memory_usage_percent}"
)
if self.disk_usage_percent < 0.0 or self.disk_usage_percent > 100.0:
raise ValueError(
f"Disk usage must be between 0-100%: {self.disk_usage_percent}"
)
if self.active_sessions < 0:
raise ValueError(
f"Active sessions cannot be negative: {self.active_sessions}"
)
if self.active_agents < 0:
raise ValueError(f"Active agents cannot be negative: {self.active_agents}")
if self.error_count < 0:
raise ValueError(f"Error count cannot be negative: {self.error_count}")
if self.uptime_seconds < 0:
raise ValueError(f"Uptime cannot be negative: {self.uptime_seconds}")
def is_healthy(self) -> bool:
"""Check if system is healthy."""
return self.overall_status == "healthy"
def needs_attention(self) -> bool:
"""Check if system needs attention."""
return self.overall_status in {"warning", "critical"}
# ============================================================================
# AGENT MANAGEMENT RESULT TYPES - Agent Lifecycle Operations
# ============================================================================
@dataclass(frozen=True)
class AgentCreationResult(OperationResult):
"""
Immutable result for agent creation operations.
Architecture:
- Pattern: Result pattern with comprehensive agent information
- Security: Secure transmission of agent credentials and context
- Performance: Efficient result construction with lazy evaluation
- Integration: Foundation for agent lifecycle management
Contracts:
Preconditions:
- Agent creation operation has been attempted
- All provided identifiers are validated
Postconditions:
- Result accurately reflects creation outcome
- Agent state is complete if creation succeeded
- Error information is comprehensive if creation failed
Invariants:
- Agent state consistency with creation parameters
- Process and tab information accuracy
- Resource allocation within specified limits
Security Implementation:
- Agent Isolation: Process and tab information for isolation
- Resource Validation: Confirmed resource allocation within limits
- State Integrity: Complete agent state with validation
- Error Security: No sensitive information in error messages
"""
# Agent identification and state
agent_id: Optional[AgentId] = None
agent_name: Optional[str] = None
agent_state: Optional[AgentState] = None
# Process and integration information
process_id: Optional[ProcessId] = None
iterm_tab_id: Optional[TabId] = None
# Configuration and specialization
specialization: Optional[str] = None
system_prompt_applied: bool = False
claude_config_applied: bool = False
# Resource allocation information
allocated_memory_mb: Optional[float] = None
allocated_cpu_percent: Optional[float] = None
working_directory: Optional[Path] = None
def __post_init__(self):
"""Validate agent creation result consistency."""
super().__post_init__()
# Success validation
if self.is_success():
if not self.agent_id:
raise ValueError("Successful agent creation must have agent ID")
if not self.agent_name:
raise ValueError("Successful agent creation must have agent name")
if not self.process_id:
raise ValueError("Successful agent creation must have process ID")
if not self.iterm_tab_id:
raise ValueError("Successful agent creation must have iTerm tab ID")
# Validate agent state consistency
if self.agent_state:
if self.agent_state.agent_id != self.agent_id:
raise ValueError("Agent state ID must match result agent ID")
if self.agent_state.name != self.agent_name:
raise ValueError("Agent state name must match result agent name")
if self.agent_state.process_id != self.process_id:
raise ValueError(
"Agent state process ID must match result process ID"
)
# Resource validation
if self.allocated_memory_mb is not None and self.allocated_memory_mb <= 0:
raise ValueError("Allocated memory must be positive")
if self.allocated_cpu_percent is not None and (
self.allocated_cpu_percent <= 0 or self.allocated_cpu_percent > 100
):
raise ValueError("Allocated CPU percent must be in range (0, 100]")
def get_agent_summary(self) -> Dict[str, Any]:
"""
Get comprehensive agent creation summary.
Returns:
Dict[str, Any]: Agent creation summary
"""
return {
"agent_id": str(self.agent_id) if self.agent_id else None,
"agent_name": self.agent_name,
"status": self.status.value,
"process_id": int(self.process_id) if self.process_id else None,
"tab_id": str(self.iterm_tab_id) if self.iterm_tab_id else None,
"specialization": self.specialization,
"system_prompt_applied": self.system_prompt_applied,
"claude_config_applied": self.claude_config_applied,
"allocated_memory_mb": self.allocated_memory_mb,
"allocated_cpu_percent": self.allocated_cpu_percent,
"working_directory": (
str(self.working_directory) if self.working_directory else None
),
"duration_ms": self.duration_ms,
"error_code": self.error_code,
"message": self.message,
}
@dataclass(frozen=True)
class AgentDeletionResult(OperationResult):
"""
Immutable result for agent deletion operations.
Contracts:
Invariants:
- Cleanup information is complete and accurate
- Resource deallocation is properly tracked
- Process termination is confirmed
"""
# Agent identification
agent_id: Optional[AgentId] = None
agent_name: Optional[str] = None
# Cleanup information
process_terminated: bool = False
tab_closed: bool = False
resources_cleaned: bool = False
state_removed: bool = False
# Resource recovery information
recovered_memory_mb: Optional[float] = None
recovered_cpu_percent: Optional[float] = None
files_cleaned: int = 0
def __post_init__(self):
"""Validate agent deletion result consistency."""
super().__post_init__()
# Success validation
if self.is_success():
if not self.agent_id:
raise ValueError("Successful agent deletion must have agent ID")
if not self.agent_name:
raise ValueError("Successful agent deletion must have agent name")
# Resource validation
if self.recovered_memory_mb is not None and self.recovered_memory_mb < 0:
raise ValueError("Recovered memory cannot be negative")
if self.recovered_cpu_percent is not None and self.recovered_cpu_percent < 0:
raise ValueError("Recovered CPU percent cannot be negative")
if self.files_cleaned < 0:
raise ValueError("Files cleaned count cannot be negative")
def get_cleanup_summary(self) -> Dict[str, Any]:
"""
Get comprehensive cleanup summary.
Returns:
Dict[str, Any]: Cleanup operation summary
"""
return {
"agent_id": str(self.agent_id) if self.agent_id else None,
"agent_name": self.agent_name,
"status": self.status.value,
"process_terminated": self.process_terminated,
"tab_closed": self.tab_closed,
"resources_cleaned": self.resources_cleaned,
"state_removed": self.state_removed,
"recovered_memory_mb": self.recovered_memory_mb,
"recovered_cpu_percent": self.recovered_cpu_percent,
"files_cleaned": self.files_cleaned,
"duration_ms": self.duration_ms,
}
# ============================================================================
# SESSION MANAGEMENT RESULT TYPES - Session Lifecycle Operations
# ============================================================================
@dataclass(frozen=True)
class SessionCreationResult(OperationResult):
"""
Immutable result for session creation operations.
Architecture:
- Pattern: Result pattern with comprehensive session information
- Security: Secure session establishment with validation
- Performance: Efficient session initialization with resource tracking
- Integration: Foundation for multi-agent coordination
Contracts:
Preconditions:
- Session creation operation has been attempted
- Root path is validated and accessible
Postconditions:
- Session state is complete if creation succeeded
- Security context is established and validated
- Git integration is configured if repository detected
Invariants:
- Session isolation and security boundaries
- Resource allocation within system limits
- File system structure consistency
"""
# Session identification and state
session_id: Optional[SessionId] = None
session_name: Optional[str] = None
session_state: Optional[SessionState] = None
# File system and project information
root_path: Optional[Path] = None
git_repository_detected: bool = False
git_branch: Optional[str] = None
project_type: Optional[str] = None
# Security and isolation
security_context_created: bool = False
filesystem_boundaries_set: bool = False
encryption_keys_generated: bool = False
# Task and development integration
todo_file_found: bool = False
tasks_directory_created: bool = False
development_structure_validated: bool = False
def __post_init__(self):
"""Validate session creation result consistency."""
super().__post_init__()
# Success validation
if self.is_success():
if not self.session_id:
raise ValueError("Successful session creation must have session ID")
if not self.session_name:
raise ValueError("Successful session creation must have session name")
if not self.root_path:
raise ValueError("Successful session creation must have root path")
# Validate session state consistency
if self.session_state:
if self.session_state.session_id != self.session_id:
raise ValueError("Session state ID must match result session ID")
if self.session_state.name != self.session_name:
raise ValueError(
"Session state name must match result session name"
)
if self.session_state.root_path != self.root_path:
raise ValueError(
"Session state root path must match result root path"
)
def get_session_summary(self) -> Dict[str, Any]:
"""
Get comprehensive session creation summary.
Returns:
Dict[str, Any]: Session creation summary
"""
return {
"session_id": str(self.session_id) if self.session_id else None,
"session_name": self.session_name,
"status": self.status.value,
"root_path": str(self.root_path) if self.root_path else None,
"git_repository_detected": self.git_repository_detected,
"git_branch": self.git_branch,
"project_type": self.project_type,
"security_context_created": self.security_context_created,
"filesystem_boundaries_set": self.filesystem_boundaries_set,
"encryption_keys_generated": self.encryption_keys_generated,
"todo_file_found": self.todo_file_found,
"tasks_directory_created": self.tasks_directory_created,
"development_structure_validated": self.development_structure_validated,
"duration_ms": self.duration_ms,
}
@dataclass(frozen=True)
class SessionDeletionResult(OperationResult):
"""
Immutable result for session deletion operations.
Contracts:
Invariants:
- All agents properly terminated before session deletion
- Resources completely cleaned and deallocated
- Security context properly destroyed
"""
# Session identification
session_id: Optional[SessionId] = None
session_name: Optional[str] = None
# Agent cleanup information
agents_terminated: int = 0
agents_failed_termination: int = 0
# Resource cleanup
processes_terminated: int = 0
tabs_closed: int = 0
files_preserved: bool = False
temp_files_cleaned: int = 0
# Security cleanup
encryption_keys_destroyed: bool = False
audit_trail_preserved: bool = False
def __post_init__(self):
"""Validate session deletion result consistency."""
super().__post_init__()
# Success validation
if self.is_success():
if not self.session_id:
raise ValueError("Successful session deletion must have session ID")
if not self.session_name:
raise ValueError("Successful session deletion must have session name")
# Count validation
if self.agents_terminated < 0 or self.agents_failed_termination < 0:
raise ValueError("Agent counts cannot be negative")
if self.processes_terminated < 0 or self.tabs_closed < 0:
raise ValueError("Resource counts cannot be negative")
if self.temp_files_cleaned < 0:
raise ValueError("File counts cannot be negative")
def get_deletion_summary(self) -> Dict[str, Any]:
"""
Get comprehensive deletion summary.
Returns:
Dict[str, Any]: Session deletion summary
"""
return {
"session_id": str(self.session_id) if self.session_id else None,
"session_name": self.session_name,
"status": self.status.value,
"agents_terminated": self.agents_terminated,
"agents_failed_termination": self.agents_failed_termination,
"processes_terminated": self.processes_terminated,
"tabs_closed": self.tabs_closed,
"files_preserved": self.files_preserved,
"temp_files_cleaned": self.temp_files_cleaned,
"encryption_keys_destroyed": self.encryption_keys_destroyed,
"audit_trail_preserved": self.audit_trail_preserved,
"duration_ms": self.duration_ms,
}
# ============================================================================
# COMMUNICATION RESULT TYPES - Message and Communication Operations
# ============================================================================
@dataclass(frozen=True)
class MessageDeliveryResult(OperationResult):
"""
Immutable result for message delivery operations.
Contracts:
Invariants:
- Message delivery status is accurate and timely
- Response information is complete when applicable
- Error details provide actionable information
"""
# Message identification
message_id: Optional[str] = None
agent_name: Optional[str] = None
# Delivery information
message_delivered: bool = False
response_received: bool = False
response_message: Optional[str] = None
# Timing and performance
delivery_time_ms: Optional[int] = None
response_time_ms: Optional[int] = None
# Processing information
adder_prompt_prepended: bool = False
message_sanitized: bool = False
iterm_injection_successful: bool = False
def __post_init__(self):
"""Validate message delivery result consistency."""
super().__post_init__()
# Success validation
if self.is_success():
if not self.message_id:
raise ValueError("Successful message delivery must have message ID")
if not self.agent_name:
raise ValueError("Successful message delivery must have agent name")
# Timing validation
if self.delivery_time_ms is not None and self.delivery_time_ms < 0:
raise ValueError("Delivery time cannot be negative")
if self.response_time_ms is not None and self.response_time_ms < 0:
raise ValueError("Response time cannot be negative")
# Response consistency
if self.response_received and not self.response_message:
raise ValueError("Response received flag requires response message")
def get_delivery_summary(self) -> Dict[str, Any]:
"""
Get comprehensive delivery summary.
Returns:
Dict[str, Any]: Message delivery summary
"""
return {
"message_id": self.message_id,
"agent_name": self.agent_name,
"status": self.status.value,
"message_delivered": self.message_delivered,
"response_received": self.response_received,
"delivery_time_ms": self.delivery_time_ms,
"response_time_ms": self.response_time_ms,
"adder_prompt_prepended": self.adder_prompt_prepended,
"message_sanitized": self.message_sanitized,
"iterm_injection_successful": self.iterm_injection_successful,
"duration_ms": self.duration_ms,
}
@dataclass(frozen=True)
class ConversationManagementResult(OperationResult):
"""
Immutable result for conversation management operations.
Contracts:
Invariants:
- Conversation state changes are accurately tracked
- Tab and process management is properly coordinated
- Context preservation or clearing is correctly handled
"""
# Agent and conversation identification
agent_name: Optional[str] = None
conversation_action: Optional[str] = None # "clear", "start_new", "restore"
# Tab and process management
old_tab_closed: bool = False
new_tab_created: bool = False
process_restarted: bool = False
# Context management
context_preserved: bool = False
conversation_history_cleared: bool = False
agent_configuration_retained: bool = False
# Performance metrics
tab_creation_time_ms: Optional[int] = None
process_start_time_ms: Optional[int] = None
context_restoration_time_ms: Optional[int] = None
def __post_init__(self):
"""Validate conversation management result consistency."""
super().__post_init__()
# Success validation
if self.is_success():
if not self.agent_name:
raise ValueError(
"Successful conversation management must have agent name"
)
if not self.conversation_action:
raise ValueError("Successful conversation management must have action")
# Timing validation
timing_fields = [
self.tab_creation_time_ms,
self.process_start_time_ms,
self.context_restoration_time_ms,
]
for timing in timing_fields:
if timing is not None and timing < 0:
raise ValueError("Timing measurements cannot be negative")
# Action consistency validation
if self.conversation_action == "clear":
if not self.conversation_history_cleared:
raise ValueError("Clear action must clear conversation history")
if self.conversation_action == "start_new":
if not self.new_tab_created:
raise ValueError("Start new action must create new tab")
def get_conversation_summary(self) -> Dict[str, Any]:
"""
Get comprehensive conversation management summary.
Returns:
Dict[str, Any]: Conversation management summary
"""
return {
"agent_name": self.agent_name,
"conversation_action": self.conversation_action,
"status": self.status.value,
"old_tab_closed": self.old_tab_closed,
"new_tab_created": self.new_tab_created,
"process_restarted": self.process_restarted,
"context_preserved": self.context_preserved,
"conversation_history_cleared": self.conversation_history_cleared,
"agent_configuration_retained": self.agent_configuration_retained,
"tab_creation_time_ms": self.tab_creation_time_ms,
"process_start_time_ms": self.process_start_time_ms,
"context_restoration_time_ms": self.context_restoration_time_ms,
"duration_ms": self.duration_ms,
}
# ============================================================================
# MONITORING RESULT TYPES - Status and Health Operations
# ============================================================================
@dataclass(frozen=True)
class SessionStatusResult(OperationResult):
"""
Immutable result for session status query operations.
Architecture:
- Pattern: Query result with comprehensive status information
- Security: Secure status transmission without sensitive data exposure
- Performance: Efficient status collection and aggregation
- Integration: Complete system health and performance visibility
Contracts:
Preconditions:
- Status query operation has been attempted
- Session context is available for querying
Postconditions:
- Status information is accurate and current
- Performance metrics are properly calculated
- Health assessments are reliable and actionable
Invariants:
- Status consistency across all reported metrics
- Timestamp accuracy for all measurements
- Resource usage accuracy and validation
"""
# Session overview
session_count: int = 0
total_agents: int = 0
active_agents: int = 0
idle_agents: int = 0
error_agents: int = 0
# Per-session information
session_details: List[Dict[str, Any]] = field(default_factory=list)
# System resource summary
total_cpu_usage: float = 0.0
total_memory_usage_mb: float = 0.0
total_disk_io_mb: float = 0.0
# Health assessment
overall_health: str = "unknown" # "healthy", "warning", "critical", "emergency"
alerts_count: int = 0
critical_alerts_count: int = 0
def __post_init__(self):
"""Validate session status result consistency."""
super().__post_init__()
# Count validation
if any(
count < 0
for count in [
self.session_count,
self.total_agents,
self.active_agents,
self.idle_agents,
self.error_agents,
self.alerts_count,
self.critical_alerts_count,
]
):
raise ValueError("Counts cannot be negative")
# Agent count consistency
if (
self.active_agents + self.idle_agents + self.error_agents
> self.total_agents
):
raise ValueError("Sum of agent status counts cannot exceed total agents")
# Resource validation
if (
self.total_cpu_usage < 0 or self.total_cpu_usage > 800
): # Max 8 agents * 100%
raise ValueError("Total CPU usage out of valid range")
if self.total_memory_usage_mb < 0:
raise ValueError("Memory usage cannot be negative")
if self.total_disk_io_mb < 0:
raise ValueError("Disk I/O usage cannot be negative")
# Health validation
valid_health_states = {"healthy", "warning", "critical", "emergency", "unknown"}
if self.overall_health not in valid_health_states:
raise ValueError(f"Invalid health state: {self.overall_health}")
# Alert consistency
if self.critical_alerts_count > self.alerts_count:
raise ValueError("Critical alerts cannot exceed total alerts")
def get_health_summary(self) -> Dict[str, Any]:
"""
Get comprehensive health summary.
Returns:
Dict[str, Any]: System health summary
"""
return {
"overall_health": self.overall_health,
"session_count": self.session_count,
"total_agents": self.total_agents,
"active_agents": self.active_agents,
"idle_agents": self.idle_agents,
"error_agents": self.error_agents,
"total_cpu_usage": self.total_cpu_usage,
"total_memory_usage_mb": self.total_memory_usage_mb,
"total_disk_io_mb": self.total_disk_io_mb,
"alerts_count": self.alerts_count,
"critical_alerts_count": self.critical_alerts_count,
"query_duration_ms": self.duration_ms,
}
def get_agent_distribution(self) -> Dict[str, float]:
"""
Get agent status distribution as percentages.
Returns:
Dict[str, float]: Agent status percentages
"""
if self.total_agents == 0:
return {"active": 0.0, "idle": 0.0, "error": 0.0}
return {
"active": (self.active_agents / self.total_agents) * 100,
"idle": (self.idle_agents / self.total_agents) * 100,
"error": (self.error_agents / self.total_agents) * 100,
}
def needs_attention(self) -> bool:
"""
Determine if system needs immediate attention.
Returns:
bool: True if system requires attention
"""
return (
self.overall_health in ["critical", "emergency"]
or self.critical_alerts_count > 0
or self.error_agents > self.total_agents * 0.3 # More than 30% error
)
# ============================================================================
# UTILITY RESULT FUNCTIONS - Result Creation and Validation
# ============================================================================
def create_success_result(
operation_type: str,
duration_ms: int = None,
message: str = "Operation completed successfully",
metadata: Dict[str, Any] = None,
) -> OperationResult:
"""
Create successful operation result with validation.
Args:
operation_type: Type of operation performed
duration_ms: Optional operation duration
message: Success message
metadata: Optional metadata dictionary
Returns:
OperationResult: Successful operation result
"""
return OperationResult(
operation_type=operation_type,
status=OperationStatus.SUCCESS,
duration_ms=duration_ms,
message=message,
metadata=metadata or {},
)
def create_failure_result(
operation_type: str,
error_code: str,
error_message: str,
duration_ms: int = None,
error_details: Dict[str, Any] = None,
) -> OperationResult:
"""
Create failed operation result with validation.
Args:
operation_type: Type of operation that failed
error_code: Error code for classification
error_message: Human-readable error message
duration_ms: Optional operation duration
error_details: Optional detailed error information
Returns:
OperationResult: Failed operation result
"""
return OperationResult(
operation_type=operation_type,
status=OperationStatus.FAILURE,
duration_ms=duration_ms,
message=error_message,
error_code=error_code,
error_details=error_details or {},
)
def create_timeout_result(
operation_type: str, timeout_seconds: int, duration_ms: int = None
) -> OperationResult:
"""
Create timeout operation result with validation.
Args:
operation_type: Type of operation that timed out
timeout_seconds: Timeout threshold that was exceeded
duration_ms: Optional actual operation duration
Returns:
OperationResult: Timeout operation result
"""
return OperationResult(
operation_type=operation_type,
status=OperationStatus.TIMEOUT,
duration_ms=duration_ms,
message=f"Operation timed out after {timeout_seconds} seconds",
error_code="OPERATION_TIMEOUT",
error_details={"timeout_seconds": timeout_seconds},
)
def validate_result_consistency(result: OperationResult) -> None:
"""
Validate operation result consistency and completeness.
Args:
result: Operation result to validate
Raises:
ValueError: If result is inconsistent or invalid
"""
# Status consistency validation
if result.status == OperationStatus.FAILURE:
if not result.error_code:
raise ValueError("Failed results must have error code")
if not result.message:
raise ValueError("Failed results must have error message")
if result.status == OperationStatus.SUCCESS:
if result.error_code:
raise ValueError("Successful results should not have error code")
# Duration validation
if result.duration_ms is not None and result.duration_ms < 0:
raise ValueError("Duration cannot be negative")
# Metadata validation
if len(result.metadata) > 100:
raise ValueError("Too many metadata entries")
for key, value in result.metadata.items():
if not isinstance(key, str) or len(key) > 100:
raise ValueError(f"Invalid metadata key: {key}")
# Allow basic JSON-serializable types
if not isinstance(value, (str, int, float, bool, type(None))):
if not (isinstance(value, (list, dict)) and len(str(value)) <= 1000):
raise ValueError(f"Invalid metadata value type for key {key}")
def aggregate_results(results: List[OperationResult]) -> OperationResult:
"""
Aggregate multiple operation results into a single summary result.
Args:
results: List of operation results to aggregate
Returns:
OperationResult: Aggregated result summary
"""
if not results:
return create_success_result(
"empty_aggregation", message="No operations to aggregate"
)
# Count statuses
status_counts = {}
for result in results:
status = result.status.value
status_counts[status] = status_counts.get(status, 0) + 1
# Determine overall status
if status_counts.get("failure", 0) > 0:
overall_status = OperationStatus.FAILURE
elif status_counts.get("timeout", 0) > 0:
overall_status = OperationStatus.TIMEOUT
elif status_counts.get("cancelled", 0) > 0:
overall_status = OperationStatus.CANCELLED
elif status_counts.get("partial_success", 0) > 0:
overall_status = OperationStatus.PARTIAL_SUCCESS
else:
overall_status = OperationStatus.SUCCESS
# Calculate total duration
total_duration = sum(r.duration_ms for r in results if r.duration_ms is not None)
# Collect error information
error_codes = [r.error_code for r in results if r.error_code]
error_messages = [r.message for r in results if r.status == OperationStatus.FAILURE]
return OperationResult(
operation_type="batch_operation",
status=overall_status,
duration_ms=total_duration if total_duration > 0 else None,
message=f"Aggregated {len(results)} operations: {status_counts}",
error_code=error_codes[0] if error_codes else None,
error_details={
"operation_count": len(results),
"status_breakdown": status_counts,
"error_codes": error_codes,
"error_messages": error_messages[:5], # Limit to first 5 errors
},
)