"""
Session State and Management Types - Agent Orchestration Platform
Architecture Integration:
- Design Patterns: Aggregate Root pattern for session management
- Security Model: Session isolation with filesystem boundaries
- Performance Profile: O(1) agent lookup with O(log n) operations
Technical Decisions:
- Session-Agent Aggregation: Session owns and manages agent lifecycles
- Git Integration: Automatic project analysis and state tracking
- Task File Monitoring: Real-time synchronization with development workflow
- Security Boundaries: Strict filesystem access control
Dependencies & Integration:
- External: pathlib for filesystem operations
- Internal: Depends on ids.py and agent.py for complete state model
Quality Assurance:
- Test Coverage: Property-based testing for session operations
- Error Handling: Comprehensive validation with filesystem security
Author: Adder_3 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import hashlib
import os
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
from .agent import AgentState, AgentStatus
from .ids import AgentId, SessionId
# ============================================================================
# SESSION STATUS ENUMERATION - Session Lifecycle Management
# ============================================================================
class SessionStatus(Enum):
"""
Session lifecycle status with explicit state transitions.
State Transitions:
CREATED → INITIALIZING → ACTIVE
ACTIVE ↔ IDLE
Any State → ERROR → RECOVERING → ACTIVE
Any State → TERMINATING → TERMINATED
"""
CREATED = "created" # Session configuration created
INITIALIZING = "initializing" # Setting up filesystem and Git integration
ACTIVE = "active" # Session running with active agents
IDLE = "idle" # Session running but no active agents
ERROR = "error" # Session has critical errors
RECOVERING = "recovering" # Automatic recovery in progress
TERMINATING = "terminating" # Graceful shutdown in progress
TERMINATED = "terminated" # Session stopped and cleaned up
class SecurityLevel(Enum):
"""
Session security levels with different isolation and validation.
"""
LOW = "low" # Basic validation, minimal isolation
MEDIUM = "medium" # Standard validation, filesystem boundaries
HIGH = "high" # Enhanced validation, strict isolation
MAXIMUM = "maximum" # Paranoid validation, full sandboxing
# ============================================================================
# GIT INTEGRATION TYPES - Project Analysis and Tracking
# ============================================================================
@dataclass(frozen=True)
class GitState:
"""
Immutable Git repository state for project context.
Contracts:
Invariants:
- Repository path is valid and accessible
- Branch names follow Git naming conventions
- Commit hashes are valid SHA-1 strings
"""
repository_path: Path
is_git_repo: bool = False
current_branch: Optional[str] = None
current_commit: Optional[str] = None
has_uncommitted_changes: bool = False
remote_url: Optional[str] = None
last_commit_date: Optional[datetime] = None
total_commits: int = 0
repository_size_mb: float = 0.0
def __post_init__(self):
"""Validate Git state consistency."""
if not self.repository_path.is_absolute():
raise ValueError(
f"Repository path must be absolute: {self.repository_path}"
)
if self.is_git_repo:
if not self.current_branch:
raise ValueError("Git repo must have current branch")
if not self.current_commit:
raise ValueError("Git repo must have current commit")
# Validate commit hash format (SHA-1: 40 hex characters)
if len(self.current_commit) != 40 or not all(
c in "0123456789abcdef" for c in self.current_commit.lower()
):
raise ValueError(f"Invalid commit hash format: {self.current_commit}")
if self.total_commits < 0:
raise ValueError(f"Total commits cannot be negative: {self.total_commits}")
if self.repository_size_mb < 0:
raise ValueError(
f"Repository size cannot be negative: {self.repository_size_mb}"
)
def is_clean_repo(self) -> bool:
"""Check if repository has no uncommitted changes."""
return self.is_git_repo and not self.has_uncommitted_changes
def get_project_identifier(self) -> str:
"""
Generate unique project identifier based on repository state.
Returns:
str: Unique identifier for the project
"""
if not self.is_git_repo:
return f"project_{self.repository_path.name}"
# Use remote URL if available, otherwise use path and current commit
if self.remote_url:
return f"git_{hashlib.md5(self.remote_url.encode()).hexdigest()[:8]}"
identifier_base = f"{self.repository_path.name}_{self.current_commit[:8]}"
return f"git_{hashlib.md5(identifier_base.encode()).hexdigest()[:8]}"
@dataclass(frozen=True)
class TaskFileTracker:
"""
Immutable task file monitoring for development workflow integration.
Contracts:
Invariants:
- All tracked paths are within session boundaries
- File modification times are accurate
- Task assignments are consistent with agent states
"""
todo_file_path: Path
tasks_directory: Path
last_todo_modified: Optional[datetime] = None
last_tasks_scan: Optional[datetime] = None
tracked_task_files: Set[str] = field(default_factory=set)
task_agent_assignments: Dict[str, str] = field(
default_factory=dict
) # task_id -> agent_name
def __post_init__(self):
"""Validate task file tracking configuration."""
if not self.todo_file_path.is_absolute():
raise ValueError(f"TODO file path must be absolute: {self.todo_file_path}")
if not self.tasks_directory.is_absolute():
raise ValueError(
f"Tasks directory must be absolute: {self.tasks_directory}"
)
# Validate that task files are within tasks directory
for task_file in self.tracked_task_files:
task_path = self.tasks_directory / task_file
if not str(task_path).startswith(str(self.tasks_directory)):
raise ValueError(f"Task file outside tasks directory: {task_file}")
def is_todo_modified(self) -> bool:
"""
Check if TODO.md has been modified since last scan.
Returns:
bool: True if TODO.md has been modified
"""
if not self.todo_file_path.exists():
return False
if self.last_todo_modified is None:
return True
current_mtime = datetime.fromtimestamp(self.todo_file_path.stat().st_mtime)
return current_mtime > self.last_todo_modified
def needs_rescan(self, max_age_minutes: int = 5) -> bool:
"""
Check if tasks directory needs rescanning.
Args:
max_age_minutes: Maximum age before rescan needed
Returns:
bool: True if rescan is needed
"""
if self.last_tasks_scan is None:
return True
age = datetime.now() - self.last_tasks_scan
return age > timedelta(minutes=max_age_minutes)
# ============================================================================
# SESSION METRICS - Performance and Resource Tracking
# ============================================================================
@dataclass(frozen=True)
class SessionMetrics:
"""
Immutable session-level performance and resource metrics.
Contracts:
Invariants:
- All metrics are non-negative
- Agent counts are consistent with actual agent states
- Resource totals are sum of individual agent resources
"""
last_updated: datetime = field(default_factory=datetime.now)
total_agents: int = 0
active_agents: int = 0
idle_agents: int = 0
error_agents: int = 0
total_cpu_percent: float = 0.0
total_memory_mb: float = 0.0
total_disk_io_mb: float = 0.0
total_network_bytes: int = 0
total_messages_processed: int = 0
total_errors: int = 0
total_restarts: int = 0
session_uptime_seconds: int = 0
filesystem_size_mb: float = 0.0
def __post_init__(self):
"""Validate session metrics consistency."""
# Agent count validation
if any(
count < 0
for count in [
self.total_agents,
self.active_agents,
self.idle_agents,
self.error_agents,
]
):
raise ValueError("Agent counts cannot be negative")
if (
self.active_agents + self.idle_agents + self.error_agents
> self.total_agents
):
raise ValueError("Sum of agent status counts exceeds total agents")
# Resource validation
if (
self.total_cpu_percent < 0 or self.total_cpu_percent > 800
): # Max 8 agents * 100%
raise ValueError(
f"Total CPU percent {self.total_cpu_percent} out of valid range"
)
if any(
value < 0
for value in [
self.total_memory_mb,
self.total_disk_io_mb,
self.filesystem_size_mb,
]
):
raise ValueError("Resource metrics cannot be negative")
if any(
count < 0
for count in [
self.total_messages_processed,
self.total_errors,
self.total_restarts,
]
):
raise ValueError("Counter metrics cannot be negative")
if self.session_uptime_seconds < 0:
raise ValueError("Session uptime cannot be negative")
def get_average_cpu_per_agent(self) -> float:
"""
Calculate average CPU usage per active agent.
Returns:
float: Average CPU percentage per active agent
"""
if self.active_agents == 0:
return 0.0
return self.total_cpu_percent / self.active_agents
def get_average_memory_per_agent(self) -> float:
"""
Calculate average memory usage per active agent.
Returns:
float: Average memory in MB per active agent
"""
if self.active_agents == 0:
return 0.0
return self.total_memory_mb / self.active_agents
def is_resource_stressed(self) -> bool:
"""
Determine if session is under resource stress.
Returns:
bool: True if session resources are stressed
"""
return (
self.total_cpu_percent > 600 # 75% of 8 agents
or self.total_memory_mb > 3072 # 75% of 4GB
or self.get_average_cpu_per_agent() > 80
or self.get_average_memory_per_agent() > 400
)
def get_error_rate(self) -> float:
"""
Calculate error rate as percentage of total messages.
Returns:
float: Error rate as percentage
"""
if self.total_messages_processed == 0:
return 0.0
return (self.total_errors / self.total_messages_processed) * 100
# ============================================================================
# SECURITY CONTEXT - Session Isolation and Boundaries
# ============================================================================
@dataclass(frozen=True)
class SecurityContext:
"""
Immutable security context for session isolation and protection.
Architecture:
- Pattern: Security Boundary pattern for isolation
- Security: Defense-in-depth with multiple validation layers
- Performance: O(1) permission checks with cached boundaries
- Integration: Filesystem and process isolation enforcement
Contracts:
Preconditions:
- session_root is absolute path and accessible
- security_level is valid enumeration value
- filesystem_boundaries contain valid, absolute paths
Postconditions:
- All security checks are deterministic
- Filesystem access is restricted to boundaries
- Process isolation is enforced
Invariants:
- Security level never decreases during session
- Filesystem boundaries never expand beyond session root
- Encryption keys remain consistent throughout session
Security Implementation:
- Filesystem Boundaries: Strict path validation and chroot-like isolation
- Process Isolation: Resource limits and access control per agent
- Encryption: Session state encrypted with unique keys
- Audit Trail: All security events logged with integrity protection
"""
session_root: Path
security_level: SecurityLevel
filesystem_boundaries: Set[Path]
allowed_executables: Set[str] = field(
default_factory=lambda: {"claude", "python", "git"}
)
max_file_size_mb: int = 100
max_total_files: int = 10000
network_access_allowed: bool = False
shell_access_allowed: bool = False
environment_restrictions: Dict[str, str] = field(default_factory=dict)
def __post_init__(self):
"""Validate security context configuration."""
# Path validation
if not self.session_root.is_absolute():
raise ValueError(f"Session root must be absolute path: {self.session_root}")
# Filesystem boundaries validation
for boundary in self.filesystem_boundaries:
if not boundary.is_absolute():
raise ValueError(f"Filesystem boundary must be absolute: {boundary}")
# Ensure boundary is within session root
try:
boundary.resolve().relative_to(self.session_root.resolve())
except ValueError:
raise ValueError(
f"Filesystem boundary outside session root: {boundary}"
)
# Resource limits validation
if self.max_file_size_mb <= 0 or self.max_file_size_mb > 1024:
raise ValueError(
f"Max file size {self.max_file_size_mb} must be in range (0, 1024]"
)
if self.max_total_files <= 0 or self.max_total_files > 100000:
raise ValueError(
f"Max total files {self.max_total_files} must be in range (0, 100000]"
)
# Executable validation
dangerous_executables = {"rm", "dd", "mkfs", "format", "del", "deltree"}
if any(exe in self.allowed_executables for exe in dangerous_executables):
raise ValueError("Dangerous executables not allowed in security context")
def is_path_allowed(self, path: Path) -> bool:
"""
Check if path access is allowed within security boundaries.
Args:
path: Path to validate
Returns:
bool: True if path access is allowed
"""
try:
resolved_path = path.resolve()
session_root_resolved = self.session_root.resolve()
# Must be within session root
resolved_path.relative_to(session_root_resolved)
# If no specific boundaries, session root is the boundary
if not self.filesystem_boundaries:
return True
# Check against specific boundaries
for boundary in self.filesystem_boundaries:
try:
resolved_path.relative_to(boundary.resolve())
return True
except ValueError:
continue
return False
except (ValueError, OSError):
return False
def is_executable_allowed(self, executable: str) -> bool:
"""
Check if executable is allowed in security context.
Args:
executable: Executable name to check
Returns:
bool: True if executable is allowed
"""
# Extract just the executable name without path
exe_name = Path(executable).name.lower()
return exe_name in {exe.lower() for exe in self.allowed_executables}
def get_environment_restrictions(self) -> Dict[str, str]:
"""
Get environment variable restrictions for agent processes.
Returns:
Dict[str, str]: Environment variable restrictions
"""
base_restrictions = {
"PATH": "/usr/local/bin:/usr/bin:/bin", # Restricted PATH
"HOME": str(self.session_root), # Isolated home directory
"TMPDIR": str(self.session_root / "tmp"), # Isolated temp directory
}
# Add session-specific restrictions
base_restrictions.update(self.environment_restrictions)
return base_restrictions
def validate_file_operation(self, file_path: Path, operation: str = "read") -> bool:
"""
Validate file operation against security policies.
Args:
file_path: Path for file operation
operation: Type of operation (read, write, execute, delete)
Returns:
bool: True if operation is allowed
"""
# Check path boundary
if not self.is_path_allowed(file_path):
return False
# Check file size limits for write operations
if operation in ["write", "append"] and file_path.exists():
file_size_mb = file_path.stat().st_size / (1024 * 1024)
if file_size_mb > self.max_file_size_mb:
return False
# Additional restrictions based on security level
if self.security_level == SecurityLevel.MAXIMUM:
# Maximum security: very restrictive
restricted_extensions = {".exe", ".sh", ".bat", ".cmd", ".ps1"}
if (
file_path.suffix.lower() in restricted_extensions
and operation == "execute"
):
return False
return True
# ============================================================================
# SESSION STATE - Complete Session Information
# ============================================================================
@dataclass(frozen=True)
class SessionState:
"""
Immutable session state containing all session and agent information.
Architecture:
- Pattern: Aggregate Root for session-agent relationship
- Security: Complete isolation with encrypted state persistence
- Performance: Efficient agent lookup with minimal state copying
- Integration: Central coordination point for all session operations
Contracts:
Preconditions:
- session_id is unique and valid
- root_path is accessible and within security boundaries
- all agent_ids are unique within session
Postconditions:
- Session state remains immutable after creation
- All agent states are consistent with session context
- Security context is enforced for all operations
Invariants:
- Agent count never exceeds maximum limits
- All agents belong to this session
- Security level never decreases
- Resource usage is sum of all agent resources
Security Implementation:
- Agent Isolation: Each agent runs in separate process and context
- Resource Boundaries: Enforced limits prevent resource exhaustion
- Filesystem Isolation: Strict path validation and access control
- State Encryption: All persistent state encrypted at rest
"""
# Core Identity
session_id: SessionId
name: str
root_path: Path
# Session Status and Configuration
status: SessionStatus = SessionStatus.CREATED
security_level: SecurityLevel = SecurityLevel.HIGH
created_at: datetime = field(default_factory=datetime.now)
last_updated: datetime = field(default_factory=datetime.now)
# Agent Management
agents: Dict[AgentId, AgentState] = field(default_factory=dict)
max_agents: int = 8
# Project and Development Context
git_state: GitState = None
task_tracker: TaskFileTracker = None
# Security and Isolation
security_context: SecurityContext = None
# Performance and Monitoring
metrics: SessionMetrics = field(default_factory=SessionMetrics)
# Operational Settings
auto_restart_agents: bool = True
health_check_interval_seconds: int = 30
max_idle_time_minutes: int = 60
def __post_init__(self):
"""Validate session state consistency and security constraints."""
# Basic validation
if not self.root_path.is_absolute():
raise ValueError(f"Session root path must be absolute: {self.root_path}")
if len(self.agents) > self.max_agents:
raise ValueError(
f"Agent count {len(self.agents)} exceeds maximum {self.max_agents}"
)
# Agent validation
for agent_id, agent_state in self.agents.items():
if agent_state.session_id != self.session_id:
raise ValueError(f"Agent {agent_id} belongs to different session")
if agent_state.agent_id != agent_id:
raise ValueError(
f"Agent ID mismatch in agents dict: {agent_id} vs {agent_state.agent_id}"
)
# Security context validation
if self.security_context:
if self.security_context.session_root != self.root_path:
raise ValueError("Security context root must match session root")
# Task tracker validation
if self.task_tracker:
# Ensure task files are within session boundaries
if not str(self.task_tracker.todo_file_path).startswith(
str(self.root_path)
):
raise ValueError("TODO file outside session boundaries")
if not str(self.task_tracker.tasks_directory).startswith(
str(self.root_path)
):
raise ValueError("Tasks directory outside session boundaries")
# Timestamp validation
if self.last_updated < self.created_at:
raise ValueError("Last updated cannot be before creation time")
# Configuration validation
if (
self.health_check_interval_seconds <= 0
or self.health_check_interval_seconds > 300
):
raise ValueError("Health check interval must be in range (0, 300]")
if self.max_idle_time_minutes <= 0 or self.max_idle_time_minutes > 1440:
raise ValueError("Max idle time must be in range (0, 1440] minutes")
def add_agent(self, agent_state: AgentState) -> "SessionState":
"""
Create new session state with added agent.
Args:
agent_state: Agent to add to session
Returns:
SessionState: New state with agent added
Raises:
ValueError: If agent cannot be added
"""
# Validation
if len(self.agents) >= self.max_agents:
raise ValueError(
f"Cannot add agent: session at maximum capacity {self.max_agents}"
)
if agent_state.agent_id in self.agents:
raise ValueError(f"Agent {agent_state.agent_id} already exists in session")
if agent_state.session_id != self.session_id:
raise ValueError(
f"Agent belongs to different session: {agent_state.session_id}"
)
# Create new agents dict
new_agents = dict(self.agents)
new_agents[agent_state.agent_id] = agent_state
# Update metrics
new_metrics = self._calculate_metrics(new_agents)
return SessionState(
session_id=self.session_id,
name=self.name,
root_path=self.root_path,
status=self.status,
security_level=self.security_level,
created_at=self.created_at,
last_updated=datetime.now(),
agents=new_agents,
max_agents=self.max_agents,
git_state=self.git_state,
task_tracker=self.task_tracker,
security_context=self.security_context,
metrics=new_metrics,
auto_restart_agents=self.auto_restart_agents,
health_check_interval_seconds=self.health_check_interval_seconds,
max_idle_time_minutes=self.max_idle_time_minutes,
)
def remove_agent(self, agent_id: AgentId) -> "SessionState":
"""
Create new session state with agent removed.
Args:
agent_id: Agent ID to remove
Returns:
SessionState: New state with agent removed
Raises:
ValueError: If agent doesn't exist
"""
if agent_id not in self.agents:
raise ValueError(f"Agent {agent_id} not found in session")
# Create new agents dict
new_agents = dict(self.agents)
del new_agents[agent_id]
# Update metrics
new_metrics = self._calculate_metrics(new_agents)
return SessionState(
session_id=self.session_id,
name=self.name,
root_path=self.root_path,
status=self.status,
security_level=self.security_level,
created_at=self.created_at,
last_updated=datetime.now(),
agents=new_agents,
max_agents=self.max_agents,
git_state=self.git_state,
task_tracker=self.task_tracker,
security_context=self.security_context,
metrics=new_metrics,
auto_restart_agents=self.auto_restart_agents,
health_check_interval_seconds=self.health_check_interval_seconds,
max_idle_time_minutes=self.max_idle_time_minutes,
)
def update_agent(self, agent_state: AgentState) -> "SessionState":
"""
Create new session state with updated agent.
Args:
agent_state: Updated agent state
Returns:
SessionState: New state with agent updated
Raises:
ValueError: If agent doesn't exist or validation fails
"""
if agent_state.agent_id not in self.agents:
raise ValueError(f"Agent {agent_state.agent_id} not found in session")
if agent_state.session_id != self.session_id:
raise ValueError(
f"Agent belongs to different session: {agent_state.session_id}"
)
# Create new agents dict
new_agents = dict(self.agents)
new_agents[agent_state.agent_id] = agent_state
# Update metrics
new_metrics = self._calculate_metrics(new_agents)
return SessionState(
session_id=self.session_id,
name=self.name,
root_path=self.root_path,
status=self.status,
security_level=self.security_level,
created_at=self.created_at,
last_updated=datetime.now(),
agents=new_agents,
max_agents=self.max_agents,
git_state=self.git_state,
task_tracker=self.task_tracker,
security_context=self.security_context,
metrics=new_metrics,
auto_restart_agents=self.auto_restart_agents,
health_check_interval_seconds=self.health_check_interval_seconds,
max_idle_time_minutes=self.max_idle_time_minutes,
)
def with_status(self, new_status: SessionStatus) -> "SessionState":
"""
Create new session state with updated status.
Args:
new_status: New session status
Returns:
SessionState: New state with updated status
"""
return SessionState(
session_id=self.session_id,
name=self.name,
root_path=self.root_path,
status=new_status,
security_level=self.security_level,
created_at=self.created_at,
last_updated=datetime.now(),
agents=self.agents,
max_agents=self.max_agents,
git_state=self.git_state,
task_tracker=self.task_tracker,
security_context=self.security_context,
metrics=self.metrics,
auto_restart_agents=self.auto_restart_agents,
health_check_interval_seconds=self.health_check_interval_seconds,
max_idle_time_minutes=self.max_idle_time_minutes,
)
def with_git_state(self, git_state: GitState) -> "SessionState":
"""
Create new session state with updated Git information.
Args:
git_state: Updated Git state
Returns:
SessionState: New state with updated Git information
"""
return SessionState(
session_id=self.session_id,
name=self.name,
root_path=self.root_path,
status=self.status,
security_level=self.security_level,
created_at=self.created_at,
last_updated=datetime.now(),
agents=self.agents,
max_agents=self.max_agents,
git_state=git_state,
task_tracker=self.task_tracker,
security_context=self.security_context,
metrics=self.metrics,
auto_restart_agents=self.auto_restart_agents,
health_check_interval_seconds=self.health_check_interval_seconds,
max_idle_time_minutes=self.max_idle_time_minutes,
)
def _calculate_metrics(
self, agents_dict: Dict[AgentId, AgentState]
) -> SessionMetrics:
"""
Calculate session metrics from agent states.
Args:
agents_dict: Dictionary of agent states
Returns:
SessionMetrics: Calculated metrics
"""
total_agents = len(agents_dict)
active_agents = sum(
1 for agent in agents_dict.values() if agent.status == AgentStatus.ACTIVE
)
idle_agents = sum(
1 for agent in agents_dict.values() if agent.status == AgentStatus.IDLE
)
error_agents = sum(
1 for agent in agents_dict.values() if agent.status == AgentStatus.ERROR
)
total_cpu = sum(
agent.resource_usage.cpu_percent for agent in agents_dict.values()
)
total_memory = sum(
agent.resource_usage.memory_mb for agent in agents_dict.values()
)
total_disk_io = sum(
agent.resource_usage.disk_io_read_mb + agent.resource_usage.disk_io_write_mb
for agent in agents_dict.values()
)
total_network = sum(
agent.resource_usage.network_bytes_sent
+ agent.resource_usage.network_bytes_recv
for agent in agents_dict.values()
)
total_messages = sum(
agent.total_messages_processed for agent in agents_dict.values()
)
total_errors = sum(agent.error_count for agent in agents_dict.values())
total_restarts = sum(agent.restart_count for agent in agents_dict.values())
session_uptime = int((datetime.now() - self.created_at).total_seconds())
# Calculate filesystem size if possible
filesystem_size = 0.0
try:
if self.root_path.exists():
filesystem_size = sum(
f.stat().st_size for f in self.root_path.rglob("*") if f.is_file()
)
filesystem_size = filesystem_size / (1024 * 1024) # Convert to MB
except (OSError, PermissionError):
pass # Can't calculate size, leave as 0
return SessionMetrics(
last_updated=datetime.now(),
total_agents=total_agents,
active_agents=active_agents,
idle_agents=idle_agents,
error_agents=error_agents,
total_cpu_percent=total_cpu,
total_memory_mb=total_memory,
total_disk_io_mb=total_disk_io,
total_network_bytes=total_network,
total_messages_processed=total_messages,
total_errors=total_errors,
total_restarts=total_restarts,
session_uptime_seconds=session_uptime,
filesystem_size_mb=filesystem_size,
)
def get_agent_by_name(self, agent_name: str) -> Optional[AgentState]:
"""
Find agent by name within session.
Args:
agent_name: Agent name to search for
Returns:
Optional[AgentState]: Agent state if found, None otherwise
"""
for agent in self.agents.values():
if agent.name == agent_name:
return agent
return None
def get_active_agents(self) -> List[AgentState]:
"""
Get list of all active agents in session.
Returns:
List[AgentState]: Active agents
"""
return [
agent
for agent in self.agents.values()
if agent.status == AgentStatus.ACTIVE
]
def get_idle_agents(self) -> List[AgentState]:
"""
Get list of all idle agents in session.
Returns:
List[AgentState]: Idle agents
"""
return [
agent for agent in self.agents.values() if agent.status == AgentStatus.IDLE
]
def get_error_agents(self) -> List[AgentState]:
"""
Get list of all agents in error state.
Returns:
List[AgentState]: Agents in error state
"""
return [
agent for agent in self.agents.values() if agent.status == AgentStatus.ERROR
]
def needs_health_check(self) -> bool:
"""
Determine if session needs health check based on last update time.
Returns:
bool: True if health check is needed
"""
age = datetime.now() - self.last_updated
return age.total_seconds() > self.health_check_interval_seconds
def has_capacity_for_agents(self, count: int = 1) -> bool:
"""
Check if session has capacity for additional agents.
Args:
count: Number of agents to add
Returns:
bool: True if session has capacity
"""
return len(self.agents) + count <= self.max_agents
def is_healthy(self) -> bool:
"""
Determine if session is healthy based on status and metrics.
Returns:
bool: True if session is healthy
"""
if self.status in [SessionStatus.ERROR, SessionStatus.TERMINATED]:
return False
# Check if metrics indicate stress
if self.metrics.is_resource_stressed():
return False
# Check individual agent health
unhealthy_agents = sum(
1 for agent in self.agents.values() if not agent.is_healthy()
)
if unhealthy_agents > len(self.agents) // 2: # More than half unhealthy
return False
return True
def get_uptime(self) -> timedelta:
"""
Get session uptime since creation.
Returns:
timedelta: Time since session creation
"""
return datetime.now() - self.created_at
def get_project_summary(self) -> Dict[str, Any]:
"""
Get comprehensive project summary for debugging and monitoring.
Returns:
Dict[str, Any]: Project summary information
"""
return {
"session_id": str(self.session_id),
"name": self.name,
"status": self.status.value,
"uptime": str(self.get_uptime()),
"agent_count": len(self.agents),
"active_agents": len(self.get_active_agents()),
"git_integrated": self.git_state is not None and self.git_state.is_git_repo,
"current_branch": self.git_state.current_branch if self.git_state else None,
"security_level": self.security_level.value,
"resource_health": (
"healthy" if not self.metrics.is_resource_stressed() else "stressed"
),
"total_messages": self.metrics.total_messages_processed,
"error_rate": f"{self.metrics.get_error_rate():.2f}%",
}
@dataclass(frozen=True)
class SessionCreationRequest:
"""Request to create a new session."""
name: str
session_directory: Optional[Path] = None
security_level: SecurityLevel = SecurityLevel.MEDIUM
max_agents: int = 10
enable_git_integration: bool = True
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class SessionCreationResult:
"""Result of session creation operation."""
session_state: SessionState
success: bool
error_message: Optional[str] = None
warnings: List[str] = field(default_factory=list)
@property
def session_id(self) -> SessionId:
"""Get session ID from result."""
return self.session_state.session_id
@property
def session_name(self) -> str:
"""Get session name from result."""
return self.session_state.name
@dataclass(frozen=True)
class SessionDeletionResult:
"""Result of session deletion operation."""
session_id: SessionId
success: bool
error_message: Optional[str] = None
cleanup_performed: bool = True
agents_terminated: int = 0
@dataclass(frozen=True)
class SessionStatusResult:
"""Result of session status query."""
session_state: Optional[SessionState]
success: bool
error_message: Optional[str] = None
last_updated: datetime = field(default_factory=datetime.now)
@property
def session_id(self) -> Optional[SessionId]:
"""Get session ID from result."""
return self.session_state.session_id if self.session_state else None
@property
def status(self) -> Optional[SessionStatus]:
"""Get session status from result."""
return self.session_state.status if self.session_state else None