"""
Manager Protocol Definitions
Architecture Integration:
- Design Patterns: Protocol pattern for dependency injection and interface segregation
- Security Model: Protocol-level security constraints with comprehensive validation
- Performance Profile: O(1) protocol method dispatch, O(log n) resource operations
Technical Decisions:
- Protocol over ABC: Structural typing for better flexibility and testing
- Async-first design: All operations are async for non-blocking concurrent execution
- Immutable parameters: All input parameters are immutable types for thread safety
Dependencies & Integration:
- External: typing_extensions for Protocol support
- Internal: Types module for all domain objects and security contexts
Quality Assurance:
- Test Coverage: Protocol compliance testing with mock implementations
- Error Handling: Comprehensive error protocols with security-focused recovery
Author: Adder_4 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
from __future__ import annotations
import asyncio
from abc import abstractmethod
from datetime import datetime
from pathlib import Path
from typing import (
AsyncContextManager,
AsyncIterator,
Dict,
List,
Optional,
Protocol,
Set,
)
from src.models.agent import (
AgentCreationRequest,
AgentId,
AgentState,
AgentStatus,
ResourceMetrics,
)
from src.models.communication import Message, MessageResult
from src.models.ids import SessionId
from src.models.mcp_results import AgentCreationResult, SessionCreationResult
from src.models.monitoring import HealthStatus, PerformanceMetrics
from src.models.security import SecurityContext, SecurityLevel
from src.models.session import SessionCreationRequest, SessionState
class AgentManagerProtocol(Protocol):
"""
Protocol for agent lifecycle management with comprehensive security and monitoring.
Contracts:
Preconditions:
- All agent_id parameters must be valid branded types
- Security context must be authenticated and authorized
- Resource limits must be enforced for all operations
Postconditions:
- All operations maintain audit trails with cryptographic integrity
- Agent state changes are persisted atomically
- Resource cleanup occurs on all error paths
Invariants:
- Agent count never exceeds configured limits
- Each agent has unique ID within system scope
- All agents are associated with exactly one session
Security Implementation:
- Input Validation: All parameters validated against security schema
- Authorization: Session-scoped permissions required for agent operations
- Resource Limits: CPU, memory, and process limits enforced per agent
- Audit Trail: All operations logged with tamper-resistant signatures
"""
@abstractmethod
async def create_agent(
self, request: AgentCreationRequest, security_context: SecurityContext
) -> AgentCreationResult:
"""
Create new agent with comprehensive validation and resource allocation.
Args:
request: Validated agent creation parameters with security constraints
security_context: Authenticated session context with permissions
Returns:
AgentCreationResult with agent_id, status, and resource allocation
Raises:
SecurityError: Authorization failure or security rule violation
ResourceLimitError: Agent limit exceeded or insufficient resources
ValidationError: Invalid request parameters or configuration
"""
...
@abstractmethod
async def get_agent_state(
self, agent_id: AgentId, security_context: SecurityContext
) -> Optional[AgentState]:
"""
Retrieve current agent state with security filtering.
Args:
agent_id: Unique agent identifier
security_context: Security context for permission validation
Returns:
AgentState if agent exists and accessible, None otherwise
"""
...
@abstractmethod
async def update_agent_config(
self, agent_id: AgentId, config: AgentConfig, security_context: SecurityContext
) -> bool:
"""
Update agent configuration with validation and persistence.
Args:
agent_id: Target agent identifier
config: New configuration with validated parameters
security_context: Security context for authorization
Returns:
True if update successful, False if agent not found
"""
...
@abstractmethod
async def delete_agent(
self, agent_id: AgentId, force: bool, security_context: SecurityContext
) -> bool:
"""
Remove agent with graceful shutdown and resource cleanup.
Args:
agent_id: Target agent identifier
force: Skip graceful shutdown if True
security_context: Security context for authorization
Returns:
True if deletion successful, False if agent not found
"""
...
@abstractmethod
async def list_agents(
self, session_id: Optional[SessionId], security_context: SecurityContext
) -> List[AgentState]:
"""
List agents with security filtering and permission enforcement.
Args:
session_id: Optional session filter
security_context: Security context for permission validation
Returns:
List of AgentState objects accessible to security context
"""
...
@abstractmethod
async def get_agent_metrics(
self, agent_id: AgentId, security_context: SecurityContext
) -> Optional[ResourceMetrics]:
"""
Retrieve real-time resource metrics for agent.
Args:
agent_id: Target agent identifier
security_context: Security context for authorization
Returns:
ResourceMetrics if agent exists and accessible, None otherwise
"""
...
@abstractmethod
async def health_check(self) -> HealthStatus:
"""
Perform comprehensive health check of agent management system.
Returns:
HealthStatus with system state and diagnostic information
"""
...
class SessionManagerProtocol(Protocol):
"""
Protocol for session management with codebase association and security boundaries.
Contracts:
Preconditions:
- All path parameters must be validated and within boundaries
- Session operations require appropriate security clearance
- Resource limits enforced for session creation and management
Postconditions:
- Session state persisted with encryption and integrity protection
- All associated agents properly managed within session boundaries
- Filesystem boundaries enforced throughout session lifecycle
Invariants:
- Each session associated with exactly one root directory
- Session count never exceeds configured limits
- All sessions have unique identifiers system-wide
"""
@abstractmethod
async def create_session(
self, request: SessionCreationRequest, security_context: SecurityContext
) -> SessionCreationResult:
"""
Create new session with codebase association and security boundaries.
Args:
request: Validated session creation parameters
security_context: Security context with directory permissions
Returns:
SessionCreationResult with session_id and configuration
"""
...
@abstractmethod
async def get_session_state(
self, session_id: SessionId, security_context: SecurityContext
) -> Optional[SessionState]:
"""
Retrieve session state with security filtering.
Args:
session_id: Target session identifier
security_context: Security context for permission validation
Returns:
SessionState if session exists and accessible, None otherwise
"""
...
@abstractmethod
async def delete_session(
self,
session_id: SessionId,
cleanup_agents: bool,
security_context: SecurityContext,
) -> bool:
"""
Remove session with comprehensive cleanup and state preservation.
Args:
session_id: Target session identifier
cleanup_agents: Whether to remove associated agents
security_context: Security context for authorization
Returns:
True if deletion successful, False if session not found
"""
...
@abstractmethod
async def list_sessions(
self, security_context: SecurityContext
) -> List[SessionState]:
"""
List sessions with security filtering.
Args:
security_context: Security context for permission validation
Returns:
List of SessionState objects accessible to security context
"""
...
class ITermManagerProtocol(Protocol):
"""
Protocol for iTerm2 integration with tab management and health monitoring.
Contracts:
Preconditions:
- iTerm2 application must be running and accessible
- All tab operations require valid session context
- Resource limits enforced for tab creation and management
Postconditions:
- Tab state synchronized with internal agent state
- All operations maintain iTerm2 connectivity health
- Recovery mechanisms activated on connection failures
Invariants:
- One tab per agent maximum
- Tab count never exceeds system limits
- All tabs associated with valid sessions
"""
@abstractmethod
async def create_tab(
self,
agent_id: AgentId,
session_id: SessionId,
working_directory: Path,
security_context: SecurityContext,
) -> str:
"""
Create iTerm2 tab for agent with security boundaries.
Args:
agent_id: Agent identifier for tab association
session_id: Session context for directory boundaries
working_directory: Validated working directory path
security_context: Security context for resource allocation
Returns:
iTerm2 tab identifier string
"""
...
@abstractmethod
async def send_text(
self, tab_id: str, text: str, security_context: SecurityContext
) -> MessageResult:
"""
Send text to iTerm2 tab with input validation.
Args:
tab_id: Target iTerm2 tab identifier
text: Validated text content to send
security_context: Security context for authorization
Returns:
MessageResult with delivery status and metadata
"""
...
@abstractmethod
async def close_tab(self, tab_id: str, security_context: SecurityContext) -> bool:
"""
Close iTerm2 tab with graceful cleanup.
Args:
tab_id: Target iTerm2 tab identifier
security_context: Security context for authorization
Returns:
True if closure successful, False if tab not found
"""
...
@abstractmethod
async def get_tab_status(
self, tab_id: str, security_context: SecurityContext
) -> Optional[Dict[str, str]]:
"""
Retrieve iTerm2 tab status and health information.
Args:
tab_id: Target iTerm2 tab identifier
security_context: Security context for authorization
Returns:
Dictionary with tab status information if accessible
"""
...
@abstractmethod
async def health_check(self) -> HealthStatus:
"""
Check iTerm2 connectivity and system health.
Returns:
HealthStatus with connectivity and diagnostic information
"""
...
class ClaudeManagerProtocol(Protocol):
"""
Protocol for Claude Code process orchestration with resource management.
Contracts:
Preconditions:
- Claude Code CLI must be available and accessible
- All process operations require security authorization
- Resource limits enforced for process creation and execution
Postconditions:
- Process state synchronized with agent lifecycle
- Resource cleanup performed on all termination paths
- Audit trails maintained for all process operations
Invariants:
- One Claude Code process per agent maximum
- Process count never exceeds system limits
- All processes confined to session directory boundaries
"""
@abstractmethod
async def spawn_process(
self,
agent_id: AgentId,
config: AgentConfig,
working_directory: Path,
security_context: SecurityContext,
) -> int:
"""
Spawn Claude Code process with configuration and monitoring.
Args:
agent_id: Agent identifier for process association
config: Agent configuration with Claude Code parameters
working_directory: Validated working directory path
security_context: Security context for resource allocation
Returns:
Process ID (PID) of spawned Claude Code process
"""
...
@abstractmethod
async def send_message(
self, process_id: int, message: Message, security_context: SecurityContext
) -> MessageResult:
"""
Send message to Claude Code process with validation.
Args:
process_id: Target process identifier
message: Validated message content
security_context: Security context for authorization
Returns:
MessageResult with delivery status and response metadata
"""
...
@abstractmethod
async def terminate_process(
self, process_id: int, force: bool, security_context: SecurityContext
) -> bool:
"""
Terminate Claude Code process with cleanup.
Args:
process_id: Target process identifier
force: Skip graceful shutdown if True
security_context: Security context for authorization
Returns:
True if termination successful, False if process not found
"""
...
@abstractmethod
async def get_process_metrics(
self, process_id: int, security_context: SecurityContext
) -> Optional[ResourceMetrics]:
"""
Retrieve real-time metrics for Claude Code process.
Args:
process_id: Target process identifier
security_context: Security context for authorization
Returns:
ResourceMetrics if process exists and accessible
"""
...
class StateManagerProtocol(Protocol):
"""
Protocol for encrypted state persistence and recovery operations.
Contracts:
Preconditions:
- All state operations require valid encryption context
- State objects must be serializable and validated
- Recovery operations require authentication and authorization
Postconditions:
- All state persisted with encryption and integrity protection
- Recovery operations restore complete and consistent state
- Audit trails maintained for all state modifications
Invariants:
- State encryption keys never exposed in plaintext
- All state operations are atomic and consistent
- Recovery procedures maintain referential integrity
"""
@abstractmethod
async def save_state(
self, state_id: str, state_data: Dict, security_context: SecurityContext
) -> bool:
"""
Persist state data with encryption and integrity protection.
Args:
state_id: Unique identifier for state object
state_data: Validated state data dictionary
security_context: Security context with encryption keys
Returns:
True if save successful, False otherwise
"""
...
@abstractmethod
async def load_state(
self, state_id: str, security_context: SecurityContext
) -> Optional[Dict]:
"""
Load and decrypt state data with validation.
Args:
state_id: Unique identifier for state object
security_context: Security context with decryption keys
Returns:
Decrypted state data dictionary if successful, None otherwise
"""
...
@abstractmethod
async def delete_state(
self, state_id: str, security_context: SecurityContext
) -> bool:
"""
Securely delete state data with cryptographic wiping.
Args:
state_id: Unique identifier for state object
security_context: Security context for authorization
Returns:
True if deletion successful, False if state not found
"""
...
class OrchestrationCoordinatorProtocol(Protocol):
"""
Protocol for coordinating all managers in unified orchestration operations.
Contracts:
Preconditions:
- All component managers must be initialized and healthy
- Operations require appropriate cross-manager authorization
- Resource coordination maintains system-wide limits
Postconditions:
- All operations maintain consistency across managers
- Failure recovery coordinates across all components
- Resource allocation balanced across system components
Invariants:
- System-wide resource limits never exceeded
- Cross-manager operations are atomic
- Health status reflects all component managers
"""
@abstractmethod
async def initialize_system(self, security_context: SecurityContext) -> bool:
"""
Initialize all component managers and establish coordination.
Args:
security_context: System-level security context
Returns:
True if initialization successful, False otherwise
"""
...
@abstractmethod
async def shutdown_system(
self, graceful: bool, security_context: SecurityContext
) -> bool:
"""
Coordinate graceful shutdown of all system components.
Args:
graceful: Perform graceful shutdown if True
security_context: System-level security context
Returns:
True if shutdown successful, False otherwise
"""
...
@abstractmethod
async def get_system_status(
self, security_context: SecurityContext
) -> Dict[str, HealthStatus]:
"""
Retrieve comprehensive system health status.
Args:
security_context: Security context for authorization
Returns:
Dictionary mapping component names to health status
"""
...
@abstractmethod
async def get_performance_metrics(
self, security_context: SecurityContext
) -> PerformanceMetrics:
"""
Aggregate performance metrics across all system components.
Args:
security_context: Security context for authorization
Returns:
PerformanceMetrics with system-wide statistics
"""
...
# Context manager protocols for resource management
class AsyncManagerContext(Protocol):
"""Protocol for async context manager with resource lifecycle management."""
async def __aenter__(self) -> AsyncManagerContext:
"""Enter async context with resource allocation."""
...
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit async context with guaranteed resource cleanup."""
...
# Event protocols for monitoring and notifications
class EventHandlerProtocol(Protocol):
"""Protocol for handling system events with security and performance constraints."""
@abstractmethod
async def handle_agent_created(
self, agent_id: AgentId, session_id: SessionId, metadata: Dict[str, str]
) -> None:
"""Handle agent creation event with audit logging."""
...
@abstractmethod
async def handle_agent_terminated(
self, agent_id: AgentId, exit_code: int, metadata: Dict[str, str]
) -> None:
"""Handle agent termination event with cleanup coordination."""
...
@abstractmethod
async def handle_session_created(
self, session_id: SessionId, root_path: Path, metadata: Dict[str, str]
) -> None:
"""Handle session creation event with resource allocation."""
...
@abstractmethod
async def handle_security_violation(
self, violation_type: str, context: SecurityContext, details: Dict[str, str]
) -> None:
"""Handle security violation with immediate response protocols."""
...