# TASK_8: get_session_status MCP Tool Implementation
**Created By**: ADDER_5 | **Priority**: MEDIUM | **Duration**: 3 hours
**Technique Focus**: Contracts + Defensive Programming + Testing
**Size Constraint**: Target <250 lines/module, Max 400 if splitting awkward
## π¦ Status & Assignment
**Status**: COMPLETE
**Assigned**: ADDER_5 (Completed comprehensive implementation)
**Dependencies**: TASK_4 (Agent & Session Management Core)
**Blocking**: Other MCP tool implementations
## π Required Reading (Complete before starting)
- [x] **Architecture**: `ARCHITECTURE.md` - get_session_status tool specification
- [x] **Core Management**: Results from TASK_4 - Agent and Session managers
- [x] **FastMCP Integration**: Results from TASK_3 - Server foundation
- [x] **Security Contracts**: Results from TASK_2 - Security validation framework
- [x] **Previous Tool**: Review TASK_5 create_agent implementation pattern
## π― Objective & Context
**Goal**: Implement get_session_status MCP tool for monitoring all agents within a session
**Context**: Core monitoring tool enabling Claude Desktop to track agent status, resource usage, and health
<thinking>
get_session_status Implementation Analysis:
1. Must validate session ID and access permissions
2. Requires aggregation of all agents within session
3. Needs real-time status from iTerm2 and Claude Code processes
4. Must include resource metrics (CPU, memory, file descriptors)
5. Requires comprehensive error handling for offline agents
6. Must provide actionable status information for orchestration
</thinking>
## β
Implementation Subtasks (Sequential completion)
### Phase 1: Input Validation & Security
- [x] **Subtask 1.1**: Implement session ID validation with security checks
- [x] **Subtask 1.2**: Add access control validation for session monitoring
- [x] **Subtask 1.3**: Create status aggregation with error handling
- [x] **Subtask 1.4**: Implement performance optimization for large sessions
### Phase 2: Core Implementation
- [x] **Subtask 2.1**: Integrate with OrchestrationCoordinator for status retrieval
- [x] **Subtask 2.2**: Add real-time status aggregation from all managers
- [x] **Subtask 2.3**: Implement resource usage monitoring and reporting
- [x] **Subtask 2.4**: Create comprehensive health status evaluation
### Phase 3: Error Handling & Recovery
- [x] **Subtask 3.1**: Add graceful handling of offline/unreachable agents
- [x] **Subtask 3.2**: Implement partial result returns for degraded sessions
- [x] **Subtask 3.3**: Create audit logging for all status queries
- [x] **Subtask 3.4**: Add performance monitoring for status operations
### Phase 4: Testing & Validation
- [ ] **Subtask 4.1**: Property-based testing for status aggregation
- [ ] **Subtask 4.2**: Integration testing with active agent sessions
- [ ] **Subtask 4.3**: Performance testing with multiple agents
- [ ] **Subtask 4.4**: Error scenario testing for degraded states
## π§ Implementation Files & Specifications
**Files to Create/Modify**:
- `src/interfaces/mcp_tools.py` - Add get_session_status tool (Target: <150 lines)
- `src/utils/status_aggregation.py` - Status aggregation utilities (Target: <200 lines)
- `src/utils/resource_monitoring.py` - Resource monitoring utilities (Target: <150 lines)
- `tests/interfaces/test_get_session_status.py` - Comprehensive tool testing
- `tests/properties/test_status_aggregation.py` - Property-based testing scenarios
**Key Requirements**:
- FastMCP tool decorator with automatic schema generation
- Complete input validation with security-focused approach
- Integration with OrchestrationCoordinator for status retrieval
- Real-time status aggregation with performance optimization
- Comprehensive error handling for partial results
## ποΈ Modularity Strategy
**Size Management**:
- Separate status aggregation logic into utility modules
- Use composition for complex monitoring workflows
- Break resource monitoring into reusable components
- Keep MCP tool implementation minimal and declarative
**Organization Principles**:
- Single responsibility for status aggregation workflow
- Clear separation between monitoring and reporting
- Minimal coupling with other MCP tools
- Maximum reusability of monitoring components
## β
Success Criteria & Verification
**Completion Requirements**:
- [ ] MCP tool properly registered and accessible from Claude Desktop
- [ ] Comprehensive status aggregation for all session agents
- [ ] Real-time resource usage monitoring working
- [ ] Complete error handling for degraded sessions
- [ ] Audit logging for all status queries
- [ ] Property-based testing covering edge cases
- [ ] Performance optimization for large sessions
**Quality Gates**:
- Functionality: Provides accurate real-time session status
- Security: All queries validated with security contracts
- Reliability: Graceful handling of offline/degraded agents
- Performance: Efficient aggregation for multiple agents
- Auditability: Complete logging of all status queries
## π Handoff Information
**Next Task Dependencies**: Other MCP tools can use status information
**Integration Points**: Uses OrchestrationCoordinator for status aggregation
**Future Considerations**: Extensible for additional monitoring metrics
## π Implementation Template
```python
# src/interfaces/mcp_tools.py (addition to existing file)
@mcp.tool()
@validate_session_query
@require_session_access
async def get_session_status(
self,
session_id: str,
include_metrics: bool = True,
include_health: bool = True,
ctx: Context = None
) -> dict:
"""
Get comprehensive status of all agents in a session.
Args:
session_id: Session identifier to query
include_metrics: Include resource usage metrics
include_health: Include health check results
Returns:
SessionStatusResult with agent details and metrics
Raises:
ValidationError: Invalid session ID
SecurityError: Insufficient access permissions
OperationError: Status retrieval failed
"""
try:
await ctx.info(f"Getting status for session {session_id}")
# Phase 1: Validation
session_state = await self.coordinator.session_manager.get_session(session_id)
if not session_state:
raise ValidationError(f"Session {session_id} not found")
# Phase 2: Agent Status Collection
await ctx.report_progress(25, 100, "Collecting agent status")
agent_statuses = []
agents = await self.coordinator.agent_manager.get_agents_by_session(session_id)
for agent in agents:
await ctx.report_progress(50, 100, f"Querying agent {agent.name}")
agent_status = await self._collect_agent_status(
agent,
include_metrics=include_metrics,
include_health=include_health
)
agent_statuses.append(agent_status)
# Phase 3: Session Aggregation
await ctx.report_progress(75, 100, "Aggregating session metrics")
session_metrics = await self._aggregate_session_metrics(agent_statuses)
await ctx.report_progress(100, 100, "Status collection complete")
await ctx.info(f"Retrieved status for {len(agent_statuses)} agents")
return {
"success": True,
"session_id": session_id,
"session_name": session_state.name,
"root_path": str(session_state.root_path),
"created_at": session_state.created_at.isoformat(),
"agent_count": len(agent_statuses),
"agents": agent_statuses,
"session_metrics": session_metrics if include_metrics else None,
"session_health": self._evaluate_session_health(agent_statuses) if include_health else None
}
except Exception as e:
await ctx.error(f"Status retrieval failed: {str(e)}")
raise
async def _collect_agent_status(
self,
agent: AgentState,
include_metrics: bool,
include_health: bool
) -> dict:
"""Collect comprehensive status for a single agent."""
base_status = {
"agent_id": str(agent.agent_id),
"agent_name": agent.name,
"specialization": agent.specialization,
"status": agent.status.value,
"created_at": agent.created_at.isoformat(),
"iterm_tab_id": agent.iterm_tab_id,
"process_id": agent.process_id
}
if include_metrics:
metrics = await self.coordinator.monitoring.get_agent_metrics(agent.agent_id)
base_status["metrics"] = {
"cpu_percent": metrics.cpu_percent,
"memory_mb": metrics.memory_mb,
"file_descriptors": metrics.file_descriptors,
"thread_count": metrics.thread_count
}
if include_health:
health = await self.coordinator.health_monitor.check_agent_health(agent.agent_id)
base_status["health"] = {
"status": health.status.value,
"last_check": health.last_check.isoformat(),
"check_results": health.check_results
}
return base_status
async def _aggregate_session_metrics(self, agent_statuses: List[dict]) -> dict:
"""Aggregate metrics across all agents in session."""
if not agent_statuses:
return {
"total_cpu_percent": 0.0,
"total_memory_mb": 0.0,
"total_file_descriptors": 0,
"active_agents": 0,
"degraded_agents": 0,
"failed_agents": 0
}
metrics = {
"total_cpu_percent": sum(
a.get("metrics", {}).get("cpu_percent", 0)
for a in agent_statuses
),
"total_memory_mb": sum(
a.get("metrics", {}).get("memory_mb", 0)
for a in agent_statuses
),
"total_file_descriptors": sum(
a.get("metrics", {}).get("file_descriptors", 0)
for a in agent_statuses
),
"active_agents": sum(1 for a in agent_statuses if a["status"] == "ACTIVE"),
"degraded_agents": sum(1 for a in agent_statuses if a["status"] == "DEGRADED"),
"failed_agents": sum(1 for a in agent_statuses if a["status"] == "FAILED")
}
return metrics
def _evaluate_session_health(self, agent_statuses: List[dict]) -> str:
"""Evaluate overall session health based on agent statuses."""
if not agent_statuses:
return "EMPTY"
failed_count = sum(1 for a in agent_statuses if a["status"] == "FAILED")
degraded_count = sum(1 for a in agent_statuses if a["status"] == "DEGRADED")
if failed_count > len(agent_statuses) * 0.5:
return "CRITICAL"
elif failed_count > 0 or degraded_count > len(agent_statuses) * 0.3:
return "DEGRADED"
else:
return "HEALTHY"
```
### **Status Aggregation Module**
```python
# src/utils/status_aggregation.py
from typing import List, Dict, Optional
from dataclasses import dataclass
from src.types.agent import AgentStatus
from src.types.monitoring import ResourceMetrics
@dataclass
class SessionStatusAggregator:
"""Aggregates status information across session agents."""
def aggregate_agent_statuses(
self,
agent_statuses: List[AgentStatus]
) -> Dict[str, Any]:
"""Aggregate multiple agent statuses into session view."""
# Implementation with efficient aggregation
def calculate_session_health(
self,
agent_statuses: List[AgentStatus],
resource_metrics: List[ResourceMetrics]
) -> SessionHealth:
"""Calculate overall session health score."""
# Implementation with health evaluation logic
```
This get_session_status implementation provides comprehensive session monitoring with real-time status aggregation, resource metrics, and health evaluation.