"""
Mock Claude Code Manager for isolated testing of Agent Orchestration Platform.
This module provides a comprehensive mock implementation of Claude Code integration that:
- Simulates Claude Code process spawning and management
- Mocks command execution and response generation
- Provides ADDER+ system prompt injection simulation
- Implements configuration and model selection
- Supports async operations matching the real Claude Code manager interface
- Enables testing without requiring actual Claude Code CLI installation
Designed to match the real Claude Code manager interface for seamless testing.
Author: Adder_1 | Created: 2025-06-26 | Testing Infrastructure Task
"""
import asyncio
import time
import secrets
import json
import re
from typing import Dict, List, Any, Optional, Callable, Union
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock
from pathlib import Path
import logging
# ============================================================================
# Mock Claude Code Data Structures
# ============================================================================
@dataclass
class MockClaudeProcess:
"""Mock representation of a Claude Code process."""
process_id: int
agent_id: str
session_id: str
working_directory: str
model: str = "sonnet-3.5"
status: str = "ACTIVE" # STARTING, ACTIVE, IDLE, ERROR, TERMINATED
created_at: datetime = field(default_factory=datetime.now)
last_activity: datetime = field(default_factory=datetime.now)
message_history: List[Dict[str, str]] = field(default_factory=list)
system_prompt: Optional[str] = None
configuration: Dict[str, Any] = field(default_factory=dict)
resource_usage: Dict[str, float] = field(default_factory=dict)
output_buffer: List[str] = field(default_factory=list)
error_buffer: List[str] = field(default_factory=list)
performance_metrics: Dict[str, float] = field(default_factory=dict)
@dataclass
class MockClaudeResponse:
"""Mock response from Claude Code."""
content: str
model: str
usage: Dict[str, int] = field(default_factory=lambda: {"input_tokens": 0, "output_tokens": 0})
processing_time: float = 0.0
status: str = "success"
error: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class MockClaudeConfig:
"""Mock Claude Code configuration."""
model: str = "sonnet-3.5"
no_color: bool = True
skip_permissions: bool = False
verbose: bool = False
output_format: str = "text"
timeout: int = 300
max_tokens: Optional[int] = None
temperature: float = 0.7
working_directory: Optional[str] = None
custom_args: List[str] = field(default_factory=list)
# ============================================================================
# Mock Claude Code Manager Implementation
# ============================================================================
class MockClaudeCodeManager:
"""
Comprehensive mock Claude Code manager for testing agent orchestration.
Provides all the functionality of the real Claude Code manager interface
while running entirely in-memory for fast, isolated testing.
"""
def __init__(self, config: Dict[str, Any] = None):
self.config = config or {}
self.processes: Dict[int, MockClaudeProcess] = {}
self.next_process_id = 1000
self.logger = logging.getLogger(__name__)
self.is_initialized = False
self.operation_count = 0
# Response templates for different types of interactions
self.response_templates = self._initialize_response_templates()
# Configuration
self.simulate_delays = self.config.get('simulate_delays', True)
self.base_response_time = self.config.get('base_response_time', 1.0)
self.failure_rate = self.config.get('failure_rate', 0.0)
self.model_capabilities = self._initialize_model_capabilities()
def _initialize_response_templates(self) -> Dict[str, Dict[str, Any]]:
"""Initialize response templates for different interaction types."""
return {
"task_analysis": {
"patterns": [r"TODO\.md", r"task", r"analyze", r"assign"],
"response": "🚀 **INITIATED** - {agent_name}: Analyzing TODO.md for task assignment\n\nI'll examine the current task status and identify the next priority task for assignment.\n\n⚡ **ANALYSIS COMPLETE** - Found {task_count} tasks with {available_count} available for assignment."
},
"implementation": {
"patterns": [r"implement", r"create", r"build", r"develop"],
"response": "⚡ **PROGRESS** - {agent_name}: Beginning implementation of {task_name}\n\nI'll implement this using ADDER+ techniques including:\n- Design by Contract with comprehensive validation\n- Defensive programming with security boundaries\n- Type-safe implementation with branded types\n- Property-based testing for edge cases\n\n✅ **IMPLEMENTATION READY** - Core functionality completed with technique integration."
},
"security_focus": {
"patterns": [r"security", r"validation", r"contract", r"defensive"],
"response": "🔒 **SECURITY ANALYSIS** - {agent_name}: Implementing comprehensive security framework\n\n<thinking>\nSecurity Analysis:\n1. Input validation requires whitelist approach\n2. Authentication boundaries need cryptographic validation\n3. Resource limits must be enforced at OS level\n4. Audit trails require tamper-resistant logging\n</thinking>\n\n✅ **SECURITY FRAMEWORK COMPLETE** - All boundaries enforced with comprehensive validation."
},
"error_handling": {
"patterns": [r"error", r"exception", r"fail", r"problem"],
"response": "🚨 **ERROR ANALYSIS** - {agent_name}: Investigating error condition\n\nI'll perform systematic root cause analysis:\n1. Reproduce the error condition\n2. Analyze system state and dependencies\n3. Implement defensive error handling\n4. Add comprehensive logging and recovery\n\n🔄 **RESOLUTION IMPLEMENTED** - Error handled with graceful recovery and prevention."
},
"testing": {
"patterns": [r"test", r"property", r"hypothesis", r"benchmark"],
"response": "🧪 **TESTING FRAMEWORK** - {agent_name}: Implementing comprehensive test coverage\n\nApplying ADDER+ testing methodology:\n- Property-based testing with Hypothesis\n- Security validation with penetration testing\n- Performance benchmarking with concurrency\n- Integration testing with mock infrastructure\n\n✅ **TESTING COMPLETE** - 95%+ coverage with property-based validation."
},
"general": {
"patterns": [],
"response": "⚡ **PROCESSING** - {agent_name}: Analyzing request and implementing solution\n\nI'll approach this systematically using ADDER+ principles:\n1. Context analysis and requirement understanding\n2. Technique selection and implementation strategy\n3. Quality verification and comprehensive testing\n4. Documentation and integration validation\n\n✅ **COMPLETE** - Implementation delivered with technique compliance."
}
}
def _initialize_model_capabilities(self) -> Dict[str, Dict[str, Any]]:
"""Initialize model capability configurations."""
return {
"sonnet-3.5": {
"max_tokens": 8192,
"reasoning_quality": "high",
"coding_ability": "excellent",
"response_time": 2.0,
"cost_per_token": 0.003
},
"opus-3": {
"max_tokens": 4096,
"reasoning_quality": "excellent",
"coding_ability": "excellent",
"response_time": 4.0,
"cost_per_token": 0.015
},
"haiku-3": {
"max_tokens": 4096,
"reasoning_quality": "good",
"coding_ability": "good",
"response_time": 0.5,
"cost_per_token": 0.0005
}
}
async def initialize(self) -> bool:
"""
Initialize the mock Claude Code manager.
Returns:
True if initialization successful
"""
if self.failure_rate > 0 and secrets.randbelow(100) < (self.failure_rate * 100):
raise ConnectionError("Mock Claude Code initialization failed")
self.is_initialized = True
self.logger.info("Mock Claude Code manager initialized")
return True
async def cleanup(self) -> None:
"""Clean up mock Claude Code manager resources."""
# Terminate all processes
for process in self.processes.values():
process.status = "TERMINATED"
self.processes.clear()
self.is_initialized = False
self.logger.info("Mock Claude Code manager cleaned up")
async def spawn_process(
self,
agent_id: str,
session_id: str,
working_directory: str,
config: MockClaudeConfig = None
) -> Dict[str, Any]:
"""
Spawn a new Claude Code process for an agent.
Args:
agent_id: Agent identifier
session_id: Session identifier
working_directory: Working directory for the process
config: Claude Code configuration
Returns:
Dictionary with process information
"""
await self._simulate_processing_delay(0.5)
await self._check_initialization()
await self._simulate_random_failure("spawn_process")
config = config or MockClaudeConfig()
process_id = self.next_process_id
self.next_process_id += 1
# Create mock process
process = MockClaudeProcess(
process_id=process_id,
agent_id=agent_id,
session_id=session_id,
working_directory=working_directory,
model=config.model,
status="STARTING",
configuration={
"model": config.model,
"no_color": config.no_color,
"verbose": config.verbose,
"timeout": config.timeout,
"working_directory": working_directory
},
resource_usage={
"memory_mb": 128.0,
"cpu_percent": 5.0,
"tokens_used": 0
}
)
# Simulate startup time
await asyncio.sleep(0.1)
process.status = "ACTIVE"
process.output_buffer.append(f"Claude Code process started for {agent_id}")
process.output_buffer.append(f"Model: {config.model}")
process.output_buffer.append(f"Working directory: {working_directory}")
self.processes[process_id] = process
self.logger.info(f"Spawned Claude Code process {process_id} for agent {agent_id}")
return {
"success": True,
"process_id": process_id,
"agent_id": agent_id,
"model": config.model,
"working_directory": working_directory,
"status": "ACTIVE"
}
async def send_system_prompt(
self,
process_id: int,
system_prompt: str,
agent_name: str = None
) -> Dict[str, Any]:
"""
Send system prompt to a Claude Code process.
Args:
process_id: Target process ID
system_prompt: System prompt content
agent_name: Agent name for prompt injection
Returns:
Dictionary with injection results
"""
await self._simulate_processing_delay(0.2)
await self._check_initialization()
process = self.processes.get(process_id)
if not process:
return {"success": False, "error": "Process not found"}
# Inject agent name into system prompt if provided
if agent_name:
system_prompt = system_prompt.replace("[Agent_Name]", agent_name)
system_prompt = system_prompt.replace("{agent_name}", agent_name)
# Store system prompt
process.system_prompt = system_prompt
process.message_history.append({
"role": "system",
"content": system_prompt,
"timestamp": datetime.now().isoformat()
})
process.output_buffer.append(f"System prompt injected ({len(system_prompt)} characters)")
process.last_activity = datetime.now()
self.logger.debug(f"Injected system prompt into process {process_id}")
return {
"success": True,
"process_id": process_id,
"prompt_length": len(system_prompt),
"agent_name": agent_name
}
async def send_message(
self,
process_id: int,
message: str,
wait_for_response: bool = True
) -> Dict[str, Any]:
"""
Send message to Claude Code process and optionally wait for response.
Args:
process_id: Target process ID
message: Message content
wait_for_response: Whether to wait for response
Returns:
Dictionary with message results and optional response
"""
await self._simulate_processing_delay(0.1)
await self._check_initialization()
await self._simulate_random_failure("send_message")
process = self.processes.get(process_id)
if not process:
return {"success": False, "error": "Process not found"}
if process.status != "ACTIVE":
return {"success": False, "error": f"Process status is {process.status}"}
# Store user message
process.message_history.append({
"role": "user",
"content": message,
"timestamp": datetime.now().isoformat()
})
process.output_buffer.append(f"USER: {message[:100]}...")
process.last_activity = datetime.now()
response_data = {
"success": True,
"process_id": process_id,
"message_sent": True,
"message_length": len(message)
}
if wait_for_response:
# Generate mock response
response = await self._generate_response(process, message)
# Store assistant response
process.message_history.append({
"role": "assistant",
"content": response.content,
"timestamp": datetime.now().isoformat()
})
process.output_buffer.append(f"ASSISTANT: {response.content[:100]}...")
# Update resource usage
process.resource_usage["tokens_used"] += response.usage["input_tokens"] + response.usage["output_tokens"]
process.resource_usage["memory_mb"] += 5.0 # Simulate memory usage increase
# Update performance metrics
process.performance_metrics.update({
"total_messages": len([m for m in process.message_history if m["role"] == "user"]),
"total_responses": len([m for m in process.message_history if m["role"] == "assistant"]),
"average_response_time": response.processing_time,
"total_tokens": process.resource_usage["tokens_used"]
})
response_data.update({
"response_received": True,
"response": {
"content": response.content,
"model": response.model,
"usage": response.usage,
"processing_time": response.processing_time,
"status": response.status
}
})
self.logger.debug(f"Sent message to process {process_id}, wait_for_response={wait_for_response}")
return response_data
async def get_process_status(self, process_id: int) -> Dict[str, Any]:
"""
Get detailed status of a Claude Code process.
Args:
process_id: Process to check
Returns:
Dictionary with process status information
"""
await self._simulate_processing_delay(0.05)
process = self.processes.get(process_id)
if not process:
return {
"exists": False,
"error": "Process not found"
}
uptime = datetime.now() - process.created_at
last_activity_age = datetime.now() - process.last_activity
return {
"exists": True,
"process_id": process_id,
"agent_id": process.agent_id,
"session_id": process.session_id,
"status": process.status,
"model": process.model,
"working_directory": process.working_directory,
"uptime_seconds": uptime.total_seconds(),
"last_activity_seconds_ago": last_activity_age.total_seconds(),
"message_count": len(process.message_history),
"resource_usage": process.resource_usage.copy(),
"performance_metrics": process.performance_metrics.copy(),
"responsive": last_activity_age.total_seconds() < 600, # 10 minutes
"configuration": process.configuration.copy()
}
async def terminate_process(self, process_id: int) -> bool:
"""
Terminate a Claude Code process.
Args:
process_id: Process to terminate
Returns:
True if termination successful
"""
await self._simulate_processing_delay(0.2)
await self._check_initialization()
process = self.processes.get(process_id)
if not process:
return False
process.status = "TERMINATED"
process.output_buffer.append(f"Process {process_id} terminated at {datetime.now().isoformat()}")
# Remove from active processes
del self.processes[process_id]
self.logger.info(f"Terminated Claude Code process {process_id}")
return True
async def list_processes(self) -> List[Dict[str, Any]]:
"""
List all active Claude Code processes.
Returns:
List of process information dictionaries
"""
await self._simulate_processing_delay(0.05)
processes = []
for process in self.processes.values():
processes.append({
"process_id": process.process_id,
"agent_id": process.agent_id,
"session_id": process.session_id,
"status": process.status,
"model": process.model,
"uptime_seconds": (datetime.now() - process.created_at).total_seconds(),
"message_count": len(process.message_history),
"resource_usage": process.resource_usage.copy()
})
return processes
async def get_conversation_history(
self,
process_id: int,
limit: int = 50
) -> List[Dict[str, str]]:
"""
Get conversation history for a process.
Args:
process_id: Process to get history for
limit: Maximum number of messages to return
Returns:
List of message dictionaries
"""
await self._simulate_processing_delay(0.05)
process = self.processes.get(process_id)
if not process:
return []
return process.message_history[-limit:] if process.message_history else []
async def check_health(self) -> Dict[str, Any]:
"""
Check overall health of mock Claude Code integration.
Returns:
Health status dictionary
"""
if not self.is_initialized:
return {
"status": "NOT_INITIALIZED",
"message": "Mock Claude Code manager not initialized"
}
active_processes = len([p for p in self.processes.values() if p.status == "ACTIVE"])
total_processes = len(self.processes)
# Calculate average resource usage
if self.processes:
avg_memory = sum(p.resource_usage.get("memory_mb", 0) for p in self.processes.values()) / len(self.processes)
avg_cpu = sum(p.resource_usage.get("cpu_percent", 0) for p in self.processes.values()) / len(self.processes)
total_tokens = sum(p.resource_usage.get("tokens_used", 0) for p in self.processes.values())
else:
avg_memory = avg_cpu = total_tokens = 0
health_score = 100
if avg_memory > 512: # High memory usage
health_score -= 20
if avg_cpu > 50: # High CPU usage
health_score -= 15
if self.failure_rate > 0.1:
health_score -= int(self.failure_rate * 100)
return {
"status": "HEALTHY" if health_score > 70 else "DEGRADED" if health_score > 30 else "UNHEALTHY",
"health_score": health_score,
"active_processes": active_processes,
"total_processes": total_processes,
"average_memory_mb": avg_memory,
"average_cpu_percent": avg_cpu,
"total_tokens_used": total_tokens,
"operations_count": self.operation_count,
"failure_rate": self.failure_rate
}
# ========================================================================
# Helper Methods
# ========================================================================
async def _generate_response(
self,
process: MockClaudeProcess,
message: str
) -> MockClaudeResponse:
"""Generate a realistic mock response from Claude."""
# Determine response type based on message content
response_type = self._classify_message(message)
template = self.response_templates.get(response_type, self.response_templates["general"])
# Get model capabilities
model_caps = self.model_capabilities.get(process.model, self.model_capabilities["sonnet-3.5"])
# Simulate processing time based on model and message complexity
base_time = model_caps["response_time"]
complexity_factor = min(len(message) / 1000, 2.0) # Up to 2x for long messages
processing_time = base_time * (0.5 + complexity_factor)
if self.simulate_delays:
await asyncio.sleep(min(processing_time, 0.5)) # Cap at 0.5s for testing
# Generate response content
agent_name = process.agent_id.replace("agent_", "Agent_").replace("_", "_")
response_content = template["response"].format(
agent_name=agent_name,
task_name="Current Task",
task_count=secrets.randbelow(10) + 5,
available_count=secrets.randbelow(3) + 1
)
# Add realistic variation
if "implementation" in response_type.lower():
response_content += f"\\n\\n📁 **FILES CREATED**: {secrets.randbelow(5) + 2} modules"
elif "testing" in response_type.lower():
response_content += f"\\n\\n🧪 **TESTS ADDED**: {secrets.randbelow(20) + 10} test cases"
# Calculate token usage (rough simulation)
input_tokens = len(message.split()) * 1.3 # Rough token estimation
output_tokens = len(response_content.split()) * 1.3
return MockClaudeResponse(
content=response_content,
model=process.model,
usage={
"input_tokens": int(input_tokens),
"output_tokens": int(output_tokens)
},
processing_time=processing_time,
status="success",
metadata={
"response_type": response_type,
"model_capabilities": model_caps,
"complexity_factor": complexity_factor
}
)
def _classify_message(self, message: str) -> str:
"""Classify message type for appropriate response generation."""
message_lower = message.lower()
for response_type, template in self.response_templates.items():
if response_type == "general":
continue
for pattern in template["patterns"]:
if re.search(pattern, message_lower):
return response_type
return "general"
async def _simulate_processing_delay(self, base_delay: float) -> None:
"""Simulate processing delay if configured."""
if self.simulate_delays:
actual_delay = base_delay * self.base_response_time
await asyncio.sleep(min(actual_delay, 1.0)) # Cap at 1 second for testing
self.operation_count += 1
async def _check_initialization(self) -> None:
"""Check if manager is initialized."""
if not self.is_initialized:
raise RuntimeError("Mock Claude Code manager not initialized")
async def _simulate_random_failure(self, operation: str) -> None:
"""Simulate random failures for testing error handling."""
if self.failure_rate > 0:
if secrets.randbelow(100) < (self.failure_rate * 100):
raise ConnectionError(f"Mock Claude Code failure during {operation}")
def configure_behavior(self, **kwargs) -> None:
"""Configure mock behavior for testing scenarios."""
if 'simulate_delays' in kwargs:
self.simulate_delays = kwargs['simulate_delays']
if 'base_response_time' in kwargs:
self.base_response_time = kwargs['base_response_time']
if 'failure_rate' in kwargs:
self.failure_rate = kwargs['failure_rate']
def get_mock_state(self) -> Dict[str, Any]:
"""Get current mock state for testing inspection."""
return {
"processes": {pid: {
"process_id": p.process_id,
"agent_id": p.agent_id,
"status": p.status,
"model": p.model,
"message_count": len(p.message_history),
"resource_usage": p.resource_usage.copy()
} for pid, p in self.processes.items()},
"operation_count": self.operation_count,
"simulate_delays": self.simulate_delays,
"failure_rate": self.failure_rate
}
# ============================================================================
# Mock Claude Code Manager Factory and Utilities
# ============================================================================
def create_mock_claude_manager(
simulate_delays: bool = True,
base_response_time: float = 1.0,
failure_rate: float = 0.0
) -> MockClaudeCodeManager:
"""
Factory function to create configured mock Claude Code manager.
Args:
simulate_delays: Enable processing delays
base_response_time: Base response time multiplier
failure_rate: Rate of random failures (0.0 to 1.0)
Returns:
Configured MockClaudeCodeManager instance
"""
config = {
'simulate_delays': simulate_delays,
'base_response_time': base_response_time,
'failure_rate': failure_rate
}
return MockClaudeCodeManager(config)
def create_performance_claude_mock() -> MockClaudeCodeManager:
"""Create a high-performance mock for benchmarking."""
return create_mock_claude_manager(
simulate_delays=False,
base_response_time=0.1,
failure_rate=0.0
)
def create_realistic_claude_mock() -> MockClaudeCodeManager:
"""Create a realistic mock for development testing."""
return create_mock_claude_manager(
simulate_delays=True,
base_response_time=1.0,
failure_rate=0.01 # 1% failure rate
)
def create_stress_test_claude_mock() -> MockClaudeCodeManager:
"""Create a mock configured for stress testing."""
return create_mock_claude_manager(
simulate_delays=True,
base_response_time=2.0,
failure_rate=0.05 # 5% failure rate
)
# Export main components
__all__ = [
'MockClaudeCodeManager', 'MockClaudeProcess', 'MockClaudeResponse', 'MockClaudeConfig',
'create_mock_claude_manager', 'create_performance_claude_mock',
'create_realistic_claude_mock', 'create_stress_test_claude_mock'
]