"""
Agent State and Lifecycle Types - Agent Orchestration Platform
Architecture Integration:
- Design Patterns: Immutable State pattern for thread safety
- Security Model: State isolation with encrypted persistence
- Performance Profile: O(1) state transitions with minimal copying
Technical Decisions:
- Immutable Dataclasses: Prevent accidental state mutation
- Enum-based Status: Type-safe state machine transitions
- Resource Tracking: Comprehensive monitoring for performance
- Message History: Bounded conversation tracking
Dependencies & Integration:
- External: None (stdlib only for maximum portability)
- Internal: Depends on ids.py for branded types
Quality Assurance:
- Test Coverage: Property-based testing for state transitions
- Error Handling: Comprehensive validation with state consistency
Author: Adder_3 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
from .ids import AgentId, MessageId, ProcessId, SessionId, TabId
# ============================================================================
# AGENT STATUS ENUMERATION - Type-Safe State Machine
# ============================================================================
class AgentStatus(Enum):
"""
Agent lifecycle status with explicit state transitions.
State Transitions:
CREATED → STARTING → ACTIVE
ACTIVE ↔ IDLE
Any State → ERROR → RESTARTING → ACTIVE
Any State → TERMINATING → TERMINATED
"""
CREATED = "created" # Agent configuration created, not yet started
STARTING = "starting" # iTerm2 tab created, Claude Code process starting
ACTIVE = "active" # Claude Code process running and responsive
IDLE = "idle" # Process running but no active tasks
ERROR = "error" # Process crashed or unresponsive
RESTARTING = "restarting" # Automatic recovery in progress
TERMINATING = "terminating" # Graceful shutdown in progress
TERMINATED = "terminated" # Process stopped and resources cleaned
class AgentSpecialization(Enum):
"""
Agent specialization types for enhanced system prompts.
"""
GENERAL = "general" # Default ADDER+ capabilities
FRONTEND = "frontend" # UI/UX development focus
BACKEND = "backend" # API and server development focus
DATABASE = "database" # Data modeling and query optimization
DEVOPS = "devops" # Infrastructure and deployment focus
TESTING = "testing" # Quality assurance and test automation
SECURITY = "security" # Security analysis and implementation
ARCHITECTURE = "architecture" # System design and documentation
class AgentHealthStatus(Enum):
"""
Agent health status for monitoring and diagnostics.
"""
HEALTHY = "healthy" # Agent is operating normally
DEGRADED = "degraded" # Agent has minor issues but functional
UNHEALTHY = "unhealthy" # Agent has significant issues
CRITICAL = "critical" # Agent is in critical state
UNKNOWN = "unknown" # Health status cannot be determined
# ============================================================================
# AGENT REQUEST TYPES - Agent Creation and Management Requests
# ============================================================================
@dataclass(frozen=True)
class AgentCreationRequest:
"""
Immutable request for agent creation with configuration.
Contracts:
Invariants:
- Name follows Agent_# format
- Session ID is valid and exists
- Specialization is valid enum value
"""
name: str
session_id: SessionId
specialization: AgentSpecialization = AgentSpecialization.GENERAL
system_prompt_suffix: str = ""
claude_config: Optional["ClaudeConfig"] = None
work_directory: Optional[Path] = None
def __post_init__(self):
"""Validate agent creation request."""
# Name validation
if not self.name or not self.name.strip():
raise ValueError("Agent name cannot be empty")
if not self.name.startswith("Agent_"):
raise ValueError("Agent name must follow Agent_# format")
# System prompt validation
if len(self.system_prompt_suffix) > 5000:
raise ValueError("System prompt suffix too long")
# Work directory validation
if self.work_directory and not self.work_directory.is_absolute():
raise ValueError("Work directory must be absolute path")
# ============================================================================
# AGENT CONFIGURATION TYPES - Immutable Configuration Management
# ============================================================================
@dataclass(frozen=True)
class ClaudeConfig:
"""
Immutable Claude Code process configuration.
Contracts:
Invariants:
- All paths are absolute and validated
- Resource limits are positive and realistic
- Model name follows Claude naming conventions
Security Implementation:
- Working Directory: Restricted to session boundaries
- Resource Limits: Prevent resource exhaustion attacks
- Command Validation: No shell injection in custom commands
"""
model: str = "sonnet-3.5"
no_color: bool = True
skip_permissions: bool = False # Security-conscious default
verbose: bool = False
output_format: str = "text"
working_directory: Optional[Path] = None
custom_commands: List[str] = field(default_factory=list)
environment_vars: Dict[str, str] = field(default_factory=dict)
timeout_seconds: int = 300 # 5 minutes default timeout
max_memory_mb: int = 512 # Memory limit per agent
max_cpu_percent: float = 25.0 # CPU limit percentage
def __post_init__(self):
"""Validate configuration parameters with comprehensive checks."""
# Model validation
valid_models = ["sonnet-3.5", "opus-3", "haiku-3"]
if self.model not in valid_models:
raise ValueError(
f"Invalid model {self.model}, must be one of {valid_models}"
)
# Resource limit validation
if self.timeout_seconds <= 0 or self.timeout_seconds > 3600:
raise ValueError(
f"Timeout {self.timeout_seconds} must be in range (0, 3600]"
)
if self.max_memory_mb <= 0 or self.max_memory_mb > 4096:
raise ValueError(
f"Memory limit {self.max_memory_mb} must be in range (0, 4096]"
)
if self.max_cpu_percent <= 0 or self.max_cpu_percent > 100:
raise ValueError(
f"CPU limit {self.max_cpu_percent} must be in range (0, 100]"
)
# Working directory validation
if self.working_directory and not self.working_directory.is_absolute():
raise ValueError(
f"Working directory must be absolute path: {self.working_directory}"
)
# Command security validation
dangerous_patterns = ["|", "&", ";", "`", "$", "(", ")", "<", ">", '"', "'"]
for cmd in self.custom_commands:
if any(pattern in cmd for pattern in dangerous_patterns):
raise ValueError(f"Custom command contains dangerous pattern: {cmd}")
def get_activation_command(self) -> str:
"""
Generate Claude Code activation command for session directory.
Returns:
str: Safe activation command with validated parameters
"""
cmd_parts = ["claude"]
if self.model != "sonnet-3.5":
cmd_parts.append(f"--model {self.model}")
if self.no_color:
cmd_parts.append("--no-color")
if self.skip_permissions:
cmd_parts.append("--skip-permissions")
if self.verbose:
cmd_parts.append("--verbose")
if self.output_format != "text":
cmd_parts.append(f"--format {self.output_format}")
return " ".join(cmd_parts)
@dataclass(frozen=True)
class ResourceMetrics:
"""
Immutable resource usage tracking for agent monitoring.
Contracts:
Invariants:
- All metrics are non-negative
- Timestamps are monotonically increasing
- Percentages are in range [0, 100]
"""
last_updated: datetime = field(default_factory=datetime.now)
cpu_percent: float = 0.0
memory_mb: float = 0.0
memory_percent: float = 0.0
disk_io_read_mb: float = 0.0
disk_io_write_mb: float = 0.0
network_bytes_sent: int = 0
network_bytes_recv: int = 0
process_uptime_seconds: int = 0
thread_count: int = 0
file_descriptor_count: int = 0
def __post_init__(self):
"""Validate resource metrics for consistency and ranges."""
# Percentage validation
if not (0 <= self.cpu_percent <= 100):
raise ValueError(
f"CPU percent {self.cpu_percent} must be in range [0, 100]"
)
if not (0 <= self.memory_percent <= 100):
raise ValueError(
f"Memory percent {self.memory_percent} must be in range [0, 100]"
)
# Non-negative validation
numeric_fields = [
self.memory_mb,
self.disk_io_read_mb,
self.disk_io_write_mb,
self.network_bytes_sent,
self.network_bytes_recv,
self.process_uptime_seconds,
self.thread_count,
self.file_descriptor_count,
]
for field_value in numeric_fields:
if field_value < 0:
raise ValueError(f"Resource metric {field_value} cannot be negative")
def is_healthy(self) -> bool:
"""
Determine if resource usage indicates healthy agent state.
Returns:
bool: True if all metrics within healthy ranges
"""
return (
self.cpu_percent < 80.0
and self.memory_percent < 80.0
and self.thread_count < 100
and self.file_descriptor_count < 500
)
def is_resource_stressed(self) -> bool:
"""
Determine if agent is under resource stress requiring intervention.
Returns:
bool: True if resource usage indicates stress
"""
return (
self.cpu_percent > 90.0
or self.memory_percent > 90.0
or self.thread_count > 200
or self.file_descriptor_count > 800
)
# ============================================================================
# MESSAGE TRACKING TYPES - Conversation Management
# ============================================================================
@dataclass(frozen=True)
class Message:
"""
Immutable message representation for conversation tracking.
Contracts:
Invariants:
- Message content is never empty after trimming
- Timestamps are monotonically increasing within conversation
- Message IDs are unique within agent context
"""
message_id: MessageId
timestamp: datetime
role: str # "user", "assistant", "system"
content: str
token_count: Optional[int] = None
processing_time_ms: Optional[int] = None
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
"""Validate message structure and content."""
# Role validation
valid_roles = {"user", "assistant", "system"}
if self.role not in valid_roles:
raise ValueError(f"Invalid role {self.role}, must be one of {valid_roles}")
# Content validation
if not self.content or not self.content.strip():
raise ValueError("Message content cannot be empty or whitespace only")
if len(self.content) > 100_000: # 100KB limit
raise ValueError(
f"Message content too large: {len(self.content)} characters"
)
# Token count validation
if self.token_count is not None and self.token_count <= 0:
raise ValueError(f"Token count {self.token_count} must be positive")
# Processing time validation
if self.processing_time_ms is not None and self.processing_time_ms < 0:
raise ValueError(
f"Processing time {self.processing_time_ms} cannot be negative"
)
def is_user_message(self) -> bool:
"""Check if message is from user."""
return self.role == "user"
def is_assistant_message(self) -> bool:
"""Check if message is from assistant."""
return self.role == "assistant"
def is_system_message(self) -> bool:
"""Check if message is system message."""
return self.role == "system"
def get_content_preview(self, max_length: int = 100) -> str:
"""
Get truncated content preview for logging/display.
Args:
max_length: Maximum length of preview
Returns:
str: Truncated content with ellipsis if needed
"""
if len(self.content) <= max_length:
return self.content
return self.content[: max_length - 3] + "..."
@dataclass(frozen=True)
class ConversationHistory:
"""
Immutable conversation history with bounded storage.
Contracts:
Invariants:
- Messages are ordered by timestamp
- No duplicate message IDs within history
- Total history size respects memory constraints
"""
messages: List[Message] = field(default_factory=list)
max_messages: int = 1000
max_total_tokens: int = 50_000
created_at: datetime = field(default_factory=datetime.now)
def __post_init__(self):
"""Validate conversation history constraints."""
if self.max_messages <= 0:
raise ValueError(f"Max messages {self.max_messages} must be positive")
if self.max_total_tokens <= 0:
raise ValueError(
f"Max total tokens {self.max_total_tokens} must be positive"
)
if len(self.messages) > self.max_messages:
raise ValueError(
f"Message count {len(self.messages)} exceeds limit {self.max_messages}"
)
# Validate message ordering
for i in range(1, len(self.messages)):
if self.messages[i].timestamp < self.messages[i - 1].timestamp:
raise ValueError("Messages must be ordered by timestamp")
# Validate unique message IDs
message_ids = [msg.message_id for msg in self.messages]
if len(message_ids) != len(set(message_ids)):
raise ValueError("Duplicate message IDs found in history")
def add_message(self, message: Message) -> "ConversationHistory":
"""
Add message to history with automatic cleanup if needed.
Args:
message: Message to add
Returns:
ConversationHistory: New history with message added
"""
new_messages = list(self.messages)
new_messages.append(message)
# Trim if exceeding limits
while len(new_messages) > self.max_messages:
new_messages.pop(0) # Remove oldest message
# Check token limit (approximate)
total_tokens = sum(
msg.token_count or 0 for msg in new_messages if msg.token_count
)
while total_tokens > self.max_total_tokens and new_messages:
removed = new_messages.pop(0)
total_tokens -= removed.token_count or 0
return ConversationHistory(
messages=new_messages,
max_messages=self.max_messages,
max_total_tokens=self.max_total_tokens,
created_at=self.created_at,
)
def get_recent_messages(self, count: int = 10) -> List[Message]:
"""
Get most recent messages from history.
Args:
count: Number of recent messages to return
Returns:
List[Message]: Recent messages in chronological order
"""
if count <= 0:
return []
return self.messages[-count:] if len(self.messages) >= count else self.messages
def get_message_count(self) -> int:
"""Get total message count in history."""
return len(self.messages)
def get_total_tokens(self) -> int:
"""Get approximate total token count."""
return sum(msg.token_count or 0 for msg in self.messages if msg.token_count)
# ============================================================================
# AGENT INFO - Basic Agent Information
# ============================================================================
@dataclass(frozen=True)
class AgentInfo:
"""
Basic agent information for display and identification purposes.
This is a lightweight subset of AgentState for use cases where
complete state information is not needed.
"""
agent_id: AgentId
name: str
status: AgentStatus
specialization: AgentSpecialization
session_id: SessionId
created_at: datetime = field(default_factory=datetime.now)
last_activity: datetime = field(default_factory=datetime.now)
def __post_init__(self):
"""Validate agent info fields."""
if not self.name or not self.name.startswith("Agent_"):
raise ValueError(f"Agent name must start with 'Agent_': {self.name}")
if self.last_activity < self.created_at:
raise ValueError("Last activity cannot be before creation time")
# ============================================================================
# AGENT STATE - Complete Agent Information
# ============================================================================
@dataclass(frozen=True)
class AgentState:
"""
Immutable agent state containing all agent information.
Architecture:
- Pattern: Immutable State for thread safety
- Security: Isolated state with encrypted persistence
- Performance: Minimal copying with structural sharing
- Integration: Complete agent context for all operations
Contracts:
Preconditions:
- agent_id is valid and unique within session
- session_id corresponds to existing session
- name follows Agent_# format and matches agent_id
Postconditions:
- State remains immutable after creation
- All timestamps are consistent and ordered
- Resource limits are enforced
Invariants:
- Agent name and ID are always consistent
- Status transitions follow defined state machine
- Conversation history respects size limits
Security Implementation:
- Process Isolation: Each agent runs in separate process
- Resource Boundaries: CPU, memory, and I/O limits enforced
- Communication Security: Only through controlled channels
- State Encryption: Sensitive data encrypted at rest
"""
# Core Identity
agent_id: AgentId
session_id: SessionId
name: str # Agent_# format
# Process Information
process_id: Optional[ProcessId] = None
iterm_tab_id: Optional[TabId] = None
# Status and Configuration
status: AgentStatus = AgentStatus.CREATED
specialization: AgentSpecialization = AgentSpecialization.GENERAL
system_prompt_suffix: str = ""
claude_config: ClaudeConfig = field(default_factory=ClaudeConfig)
# Timing Information
created_at: datetime = field(default_factory=datetime.now)
last_heartbeat: datetime = field(default_factory=datetime.now)
last_status_change: datetime = field(default_factory=datetime.now)
# Communication and History
conversation_history: ConversationHistory = field(
default_factory=ConversationHistory
)
# Performance and Monitoring
resource_usage: ResourceMetrics = field(default_factory=ResourceMetrics)
error_count: int = 0
restart_count: int = 0
total_messages_processed: int = 0
# Task and Work Tracking
current_task: Optional[str] = None
task_assignment_time: Optional[datetime] = None
work_directory: Optional[Path] = None
def __post_init__(self):
"""Validate agent state consistency and security constraints."""
# Name and ID consistency validation
from .ids import extract_agent_info
try:
agent_info = extract_agent_info(self.agent_id)
if agent_info.display_name != self.name:
raise ValueError(
f"Agent name {self.name} inconsistent with ID {self.agent_id}"
)
except ValueError as e:
raise ValueError(f"Invalid agent ID: {e}")
# Process validation
if self.status in [AgentStatus.ACTIVE, AgentStatus.IDLE]:
if not self.process_id or not self.iterm_tab_id:
raise ValueError(
f"Active agent {self.name} must have process and tab IDs"
)
# Timestamp consistency
if self.last_heartbeat < self.created_at:
raise ValueError("Last heartbeat cannot be before creation time")
if self.last_status_change < self.created_at:
raise ValueError("Last status change cannot be before creation time")
# Task assignment validation
if self.current_task and not self.task_assignment_time:
raise ValueError("Current task requires assignment time")
if self.task_assignment_time and self.task_assignment_time < self.created_at:
raise ValueError("Task assignment time cannot be before creation time")
# Work directory validation
if self.work_directory and not self.work_directory.is_absolute():
raise ValueError(
f"Work directory must be absolute path: {self.work_directory}"
)
# Counter validation
if (
self.error_count < 0
or self.restart_count < 0
or self.total_messages_processed < 0
):
raise ValueError("Counters cannot be negative")
def with_status(self, new_status: AgentStatus) -> "AgentState":
"""
Create new state with updated status and timestamp.
Args:
new_status: New agent status
Returns:
AgentState: New state with updated status
"""
return AgentState(
agent_id=self.agent_id,
session_id=self.session_id,
name=self.name,
process_id=self.process_id,
iterm_tab_id=self.iterm_tab_id,
status=new_status,
specialization=self.specialization,
system_prompt_suffix=self.system_prompt_suffix,
claude_config=self.claude_config,
created_at=self.created_at,
last_heartbeat=self.last_heartbeat,
last_status_change=datetime.now(),
conversation_history=self.conversation_history,
resource_usage=self.resource_usage,
error_count=self.error_count,
restart_count=self.restart_count,
total_messages_processed=self.total_messages_processed,
current_task=self.current_task,
task_assignment_time=self.task_assignment_time,
work_directory=self.work_directory,
)
def with_process_info(self, process_id: ProcessId, tab_id: TabId) -> "AgentState":
"""
Create new state with process and tab information.
Args:
process_id: Claude Code process ID
tab_id: iTerm2 tab ID
Returns:
AgentState: New state with process information
"""
return AgentState(
agent_id=self.agent_id,
session_id=self.session_id,
name=self.name,
process_id=process_id,
iterm_tab_id=tab_id,
status=self.status,
specialization=self.specialization,
system_prompt_suffix=self.system_prompt_suffix,
claude_config=self.claude_config,
created_at=self.created_at,
last_heartbeat=datetime.now(),
last_status_change=self.last_status_change,
conversation_history=self.conversation_history,
resource_usage=self.resource_usage,
error_count=self.error_count,
restart_count=self.restart_count,
total_messages_processed=self.total_messages_processed,
current_task=self.current_task,
task_assignment_time=self.task_assignment_time,
work_directory=self.work_directory,
)
def with_message(self, message: Message) -> "AgentState":
"""
Create new state with added message to conversation history.
Args:
message: Message to add to history
Returns:
AgentState: New state with updated conversation
"""
new_history = self.conversation_history.add_message(message)
new_message_count = self.total_messages_processed + 1
return AgentState(
agent_id=self.agent_id,
session_id=self.session_id,
name=self.name,
process_id=self.process_id,
iterm_tab_id=self.iterm_tab_id,
status=self.status,
specialization=self.specialization,
system_prompt_suffix=self.system_prompt_suffix,
claude_config=self.claude_config,
created_at=self.created_at,
last_heartbeat=datetime.now(),
last_status_change=self.last_status_change,
conversation_history=new_history,
resource_usage=self.resource_usage,
error_count=self.error_count,
restart_count=self.restart_count,
total_messages_processed=new_message_count,
current_task=self.current_task,
task_assignment_time=self.task_assignment_time,
work_directory=self.work_directory,
)
def with_resource_update(self, metrics: ResourceMetrics) -> "AgentState":
"""
Create new state with updated resource metrics.
Args:
metrics: New resource metrics
Returns:
AgentState: New state with updated metrics
"""
return AgentState(
agent_id=self.agent_id,
session_id=self.session_id,
name=self.name,
process_id=self.process_id,
iterm_tab_id=self.iterm_tab_id,
status=self.status,
specialization=self.specialization,
system_prompt_suffix=self.system_prompt_suffix,
claude_config=self.claude_config,
created_at=self.created_at,
last_heartbeat=datetime.now(),
last_status_change=self.last_status_change,
conversation_history=self.conversation_history,
resource_usage=metrics,
error_count=self.error_count,
restart_count=self.restart_count,
total_messages_processed=self.total_messages_processed,
current_task=self.current_task,
task_assignment_time=self.task_assignment_time,
work_directory=self.work_directory,
)
def with_task_assignment(self, task: str) -> "AgentState":
"""
Create new state with task assignment.
Args:
task: Task description to assign
Returns:
AgentState: New state with task assignment
"""
return AgentState(
agent_id=self.agent_id,
session_id=self.session_id,
name=self.name,
process_id=self.process_id,
iterm_tab_id=self.iterm_tab_id,
status=self.status,
specialization=self.specialization,
system_prompt_suffix=self.system_prompt_suffix,
claude_config=self.claude_config,
created_at=self.created_at,
last_heartbeat=self.last_heartbeat,
last_status_change=self.last_status_change,
conversation_history=self.conversation_history,
resource_usage=self.resource_usage,
error_count=self.error_count,
restart_count=self.restart_count,
total_messages_processed=self.total_messages_processed,
current_task=task,
task_assignment_time=datetime.now(),
work_directory=self.work_directory,
)
def with_error_increment(self) -> "AgentState":
"""
Create new state with incremented error count.
Returns:
AgentState: New state with error count incremented
"""
return AgentState(
agent_id=self.agent_id,
session_id=self.session_id,
name=self.name,
process_id=self.process_id,
iterm_tab_id=self.iterm_tab_id,
status=self.status,
specialization=self.specialization,
system_prompt_suffix=self.system_prompt_suffix,
claude_config=self.claude_config,
created_at=self.created_at,
last_heartbeat=self.last_heartbeat,
last_status_change=self.last_status_change,
conversation_history=self.conversation_history,
resource_usage=self.resource_usage,
error_count=self.error_count + 1,
restart_count=self.restart_count,
total_messages_processed=self.total_messages_processed,
current_task=self.current_task,
task_assignment_time=self.task_assignment_time,
work_directory=self.work_directory,
)
def is_active(self) -> bool:
"""Check if agent is in active state."""
return self.status in [AgentStatus.ACTIVE, AgentStatus.IDLE]
def is_healthy(self) -> bool:
"""
Determine if agent is healthy based on status and metrics.
Returns:
bool: True if agent is healthy
"""
if self.status in [AgentStatus.ERROR, AgentStatus.TERMINATED]:
return False
# Check heartbeat freshness (within last 5 minutes)
heartbeat_age = datetime.now() - self.last_heartbeat
if heartbeat_age > timedelta(minutes=5):
return False
# Check resource health
if not self.resource_usage.is_healthy():
return False
# Check error rate (max 5 errors before unhealthy)
if self.error_count > 5:
return False
return True
def needs_restart(self) -> bool:
"""
Determine if agent needs restart based on status and metrics.
Returns:
bool: True if agent should be restarted
"""
# Status-based restart conditions
if self.status == AgentStatus.ERROR:
return True
# Resource-based restart conditions
if self.resource_usage.is_resource_stressed():
return True
# High error rate
if self.error_count > 10:
return True
# Excessive restarts (avoid restart loops)
if self.restart_count > 3:
return False
return False
def get_uptime(self) -> timedelta:
"""
Get agent uptime since creation.
Returns:
timedelta: Time since agent creation
"""
return datetime.now() - self.created_at
def get_idle_time(self) -> timedelta:
"""
Get time since last heartbeat.
Returns:
timedelta: Time since last heartbeat
"""
return datetime.now() - self.last_heartbeat
# Result types for agent operations
@dataclass(frozen=True)
class AgentCreationResult:
"""Result of agent creation operation."""
agent_state: AgentState
success: bool
error_message: Optional[str] = None
warnings: List[str] = field(default_factory=list)
@property
def agent_id(self) -> AgentId:
"""Get agent ID from result."""
return self.agent_state.agent_id
@property
def agent_name(self) -> str:
"""Get agent name from result."""
return self.agent_state.name
@dataclass(frozen=True)
class AgentDeletionResult:
"""Result of agent deletion operation."""
agent_id: AgentId
success: bool
error_message: Optional[str] = None
cleanup_performed: bool = True
resources_released: bool = True
@dataclass(frozen=True)
class AgentInfo:
"""Basic agent information for lightweight operations."""
agent_id: AgentId
name: str
status: AgentStatus
specialization: AgentSpecialization
created_at: datetime
last_heartbeat: datetime
session_id: SessionId
@classmethod
def from_agent_state(cls, agent_state: "AgentState") -> "AgentInfo":
"""Create AgentInfo from full AgentState."""
return cls(
agent_id=agent_state.agent_id,
name=agent_state.name,
status=agent_state.status,
specialization=agent_state.specialization,
created_at=agent_state.created_at,
last_heartbeat=agent_state.last_heartbeat,
session_id=agent_state.session_id,
)
# Aliases for backwards compatibility
ConversationMessage = Message