"""
Agent Creation Validation Module - Agent Orchestration Platform
This module implements comprehensive validation for agent creation requests with security-first design,
input sanitization, and defensive programming patterns for maximum protection against malicious inputs.
Architecture Integration:
- Design Patterns: Command pattern for validation operations, Chain of Responsibility for validation pipeline
- Security Model: Defense-in-depth with whitelist validation, input sanitization, and security contracts
- Performance Profile: O(1) validation operations with efficient regex and constraint checking
Technical Decisions:
- Whitelist Validation: Only known-safe inputs allowed with comprehensive pattern matching
- Security-First Design: All inputs treated as potentially malicious until proven safe
- Immutable Validation: Validation functions are pure with no side effects
- Comprehensive Logging: All validation failures logged with security context
Dependencies & Integration:
- External: re for pattern matching, typing for type safety
- Internal: Security contracts, input validation utilities, audit logging system
Quality Assurance:
- Test Coverage: Property-based testing for all validation scenarios
- Error Handling: Comprehensive error classification with security-preserving failure modes
- Contract Validation: All validation operations protected by security contracts
Author: Adder_5 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import asyncio
import functools
import re
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from typing import Any, Dict, List, Optional, Pattern, Union
# Import boundary enforcement
from src.boundaries.audit import AuditCategory, AuditLevel, get_audit_logger
# Import security contracts
from src.contracts.security import set_security_context, validate_agent_operation
from src.models.agent import (
AgentCreationRequest,
AgentSpecialization,
AgentStatus,
ClaudeConfig,
ResourceMetrics,
)
# Import type system
from src.models.ids import AgentId, SessionId, validate_agent_name
from src.models.security import SecurityContext, SecurityLevel
from src.models.session import SessionId
from src.models.validation import ValidationError, ValidationResult
from src.utils.contracts_shim import ensure, require
from src.validators.input import sanitize_user_input, validate_file_path
class AgentCreationValidationError(ValidationError):
"""Specific validation errors for agent creation operations."""
pass
class SecurityValidationError(AgentCreationValidationError):
"""Security-specific validation failures."""
pass
class ResourceValidationError(AgentCreationValidationError):
"""Resource limit and capacity validation failures."""
pass
# Validation Patterns and Constants
AGENT_NAME_PATTERN: Pattern = re.compile(r"^Agent_\d+$")
AGENT_NAME_MIN_LENGTH = 7 # "Agent_1"
AGENT_NAME_MAX_LENGTH = 20 # "Agent_999999999999"
SPECIALIZATION_MAX_LENGTH = 200
SYSTEM_PROMPT_SUFFIX_MAX_LENGTH = 5000
SESSION_ID_PATTERN: Pattern = re.compile(r"^[a-zA-Z0-9_-]+$")
# Security whitelists
ALLOWED_SPECIALIZATIONS = {
"DEVELOPMENT",
"TESTING",
"ANALYSIS",
"DOCUMENTATION",
"SECURITY",
"PERFORMANCE",
"INTEGRATION",
"GENERAL",
}
ALLOWED_CLAUDE_MODELS = {"sonnet-3.5", "sonnet-4", "opus-3", "haiku-3"}
DANGEROUS_PATTERNS = [
# Command injection patterns
re.compile(r"[;&|`$(){}[\]<>]"),
# Path traversal patterns
re.compile(r"\.\.[\\/]"),
# Script injection patterns
re.compile(r"<script[^>]*>"),
# SQL injection patterns
re.compile(r"(union|select|insert|update|delete|drop)\s", re.IGNORECASE),
# Shell metacharacters
re.compile(r"[!#$&*;?~|^(){}[\]<>]"),
]
@dataclass(frozen=True)
class ValidationContext:
"""
Immutable validation context with security constraints.
Provides comprehensive context for validation operations with
security boundaries and resource limits for defensive validation.
"""
session_id: SessionId
security_level: SecurityLevel
max_agents_in_session: int
current_agent_count: int
available_memory_mb: int
available_cpu_percent: float
allowed_specializations: set = field(
default_factory=lambda: ALLOWED_SPECIALIZATIONS.copy()
)
security_constraints: Dict[str, Any] = field(default_factory=dict)
def has_capacity_for_agent(self, requested_memory_mb: int = 512) -> bool:
"""Check if system has capacity for new agent."""
return (
self.current_agent_count < self.max_agents_in_session
and self.available_memory_mb >= requested_memory_mb
and self.available_cpu_percent >= 25.0 # Minimum 25% CPU available
)
def get_security_constraints_for_level(self) -> Dict[str, Any]:
"""Get security constraints based on security level."""
base_constraints = {
"require_sanitization": True,
"validate_all_inputs": True,
"log_all_operations": True,
}
if self.security_level == SecurityLevel.HIGH:
base_constraints.update(
{
"strict_pattern_matching": True,
"comprehensive_logging": True,
"enhanced_monitoring": True,
"resource_limits_enforced": True,
}
)
elif self.security_level == SecurityLevel.MAXIMUM:
base_constraints.update(
{
"strict_pattern_matching": True,
"comprehensive_logging": True,
"enhanced_monitoring": True,
"resource_limits_enforced": True,
"cryptographic_validation": True,
"audit_all_access": True,
}
)
return base_constraints
@dataclass(frozen=True)
class AgentCreationValidationResult:
"""
Immutable result of agent creation validation with comprehensive status.
Provides detailed validation results with security context and
specific failure information for comprehensive error handling.
"""
is_valid: bool
validated_request: Optional[AgentCreationRequest] = None
security_clearance: bool = False
resource_clearance: bool = False
validation_warnings: List[str] = field(default_factory=list)
validation_errors: List[str] = field(default_factory=list)
security_violations: List[str] = field(default_factory=list)
sanitized_inputs: Dict[str, str] = field(default_factory=dict)
validation_metadata: Dict[str, Any] = field(default_factory=dict)
def get_failure_summary(self) -> str:
"""Get comprehensive failure summary for logging."""
if self.is_valid:
return "Validation successful"
failures = []
if self.validation_errors:
failures.append(f"Validation errors: {', '.join(self.validation_errors)}")
if self.security_violations:
failures.append(
f"Security violations: {', '.join(self.security_violations)}"
)
if not self.security_clearance:
failures.append("Security clearance failed")
if not self.resource_clearance:
failures.append("Resource clearance failed")
return "; ".join(failures)
class AgentCreationValidator:
"""
Comprehensive agent creation validator with security-first design.
Implements defense-in-depth validation with comprehensive input sanitization,
security boundary enforcement, and resource validation for maximum protection.
Contracts:
Preconditions:
- All validation methods receive sanitized inputs
- Security context is properly initialized
- Resource limits are current and accurate
Postconditions:
- All validation results include security status
- Failed validations log security events
- Resource validation reflects current system state
Invariants:
- Validation results are immutable
- Security violations always fail validation
- Resource limits are always enforced
"""
def __init__(self):
"""Initialize agent creation validator."""
self._audit_logger = None
self._validation_cache: Dict[str, AgentCreationValidationResult] = {}
self._cache_ttl_seconds = 60 # Cache validated requests briefly
async def initialize(self) -> None:
"""Initialize validator with audit logging."""
try:
self._audit_logger = get_audit_logger()
except Exception:
# Continue without audit logging if unavailable
pass
@require(lambda context: context.session_id is not None)
@require(lambda agent_name: agent_name is not None and len(agent_name.strip()) > 0)
@ensure(lambda result: isinstance(result, AgentCreationValidationResult))
async def validate_agent_creation_request(
self,
session_id: SessionId,
agent_name: str,
specialization: Optional[str] = None,
system_prompt_suffix: str = "",
claude_config: Optional[Dict[str, Any]] = None,
validation_context: Optional[ValidationContext] = None,
) -> AgentCreationValidationResult:
"""
Comprehensive validation of agent creation request with security enforcement.
Contracts:
Preconditions:
- session_id is valid and not None
- agent_name is not empty after sanitization
- validation_context contains current system state
Postconditions:
- Returns comprehensive validation result
- All security violations are logged
- Sanitized inputs are provided in result
Invariants:
- Validation is deterministic for same inputs
- Security violations always cause validation failure
- Resource limits are always checked
Security Implementation:
- Input Sanitization: All string inputs sanitized before validation
- Pattern Validation: Agent names validated against strict patterns
- Whitelist Validation: Specializations validated against allowed set
- Resource Validation: System capacity validated before approval
- Audit Logging: All validation attempts logged with security context
Args:
session_id: Target session for agent creation
agent_name: Proposed agent name (must follow Agent_# format)
specialization: Optional agent specialization
system_prompt_suffix: Additional system prompt content
claude_config: Claude Code configuration options
validation_context: Current system state and constraints
Returns:
AgentCreationValidationResult: Comprehensive validation result
Raises:
AgentCreationValidationError: Critical validation framework failure
"""
validation_start = datetime.utcnow()
validation_errors = []
security_violations = []
validation_warnings = []
sanitized_inputs = {}
try:
# Initialize validation context if not provided
if validation_context is None:
validation_context = ValidationContext(
session_id=session_id,
security_level=SecurityLevel.HIGH,
max_agents_in_session=8,
current_agent_count=0,
available_memory_mb=2048,
available_cpu_percent=75.0,
)
# Phase 1: Input Sanitization and Basic Validation
sanitization_result = await self._sanitize_and_validate_inputs(
agent_name, specialization, system_prompt_suffix, claude_config
)
if not sanitization_result["success"]:
security_violations.extend(sanitization_result["security_violations"])
validation_errors.extend(sanitization_result["validation_errors"])
else:
sanitized_inputs = sanitization_result["sanitized_inputs"]
# Phase 2: Agent Name Validation
name_validation_result = await self._validate_agent_name(
sanitized_inputs.get("agent_name", agent_name), validation_context
)
if not name_validation_result["is_valid"]:
validation_errors.extend(name_validation_result["errors"])
# Phase 3: Specialization Validation
if specialization:
spec_validation_result = await self._validate_specialization(
sanitized_inputs.get("specialization", specialization),
validation_context,
)
if not spec_validation_result["is_valid"]:
validation_errors.extend(spec_validation_result["errors"])
if spec_validation_result["warnings"]:
validation_warnings.extend(spec_validation_result["warnings"])
# Phase 4: Claude Configuration Validation
config_validation_result = await self._validate_claude_config(
claude_config, validation_context
)
if not config_validation_result["is_valid"]:
validation_errors.extend(config_validation_result["errors"])
if config_validation_result["warnings"]:
validation_warnings.extend(config_validation_result["warnings"])
# Phase 5: System Prompt Validation
if system_prompt_suffix:
prompt_validation_result = await self._validate_system_prompt_suffix(
sanitized_inputs.get("system_prompt_suffix", system_prompt_suffix),
validation_context,
)
if not prompt_validation_result["is_valid"]:
security_violations.extend(
prompt_validation_result["security_violations"]
)
# Phase 6: Resource and Capacity Validation
resource_validation_result = await self._validate_resource_availability(
validation_context
)
resource_clearance = resource_validation_result["has_capacity"]
if not resource_clearance:
validation_errors.extend(resource_validation_result["errors"])
# Phase 7: Security Clearance Assessment
security_clearance = (
len(security_violations) == 0
and len(validation_errors) == 0
and resource_clearance
)
# Phase 8: Create Validated Request if Successful
validated_request = None
if security_clearance and resource_clearance:
try:
validated_request = AgentCreationRequest(
session_id=session_id,
agent_name=sanitized_inputs.get("agent_name", agent_name),
specialization=(
AgentSpecialization(
sanitized_inputs.get("specialization", specialization)
)
if specialization
else None
),
system_prompt_suffix=sanitized_inputs.get(
"system_prompt_suffix", system_prompt_suffix
),
claude_config=(
ClaudeConfig(**config_validation_result["validated_config"])
if claude_config
else ClaudeConfig()
),
)
except Exception as e:
validation_errors.append(f"Failed to create validated request: {e}")
security_clearance = False
# Create validation result
validation_result = AgentCreationValidationResult(
is_valid=security_clearance
and resource_clearance
and len(validation_errors) == 0,
validated_request=validated_request,
security_clearance=security_clearance,
resource_clearance=resource_clearance,
validation_warnings=validation_warnings,
validation_errors=validation_errors,
security_violations=security_violations,
sanitized_inputs=sanitized_inputs,
validation_metadata={
"validation_duration_ms": (
datetime.utcnow() - validation_start
).total_seconds()
* 1000,
"security_level": validation_context.security_level.value,
"phases_completed": 8,
"validation_timestamp": validation_start.isoformat(),
},
)
# Log validation result
await self._log_validation_result(validation_result, validation_context)
return validation_result
except Exception as e:
# Create failure result for unexpected errors
failure_result = AgentCreationValidationResult(
is_valid=False,
security_clearance=False,
resource_clearance=False,
validation_errors=[f"Validation framework error: {e}"],
validation_metadata={
"validation_duration_ms": (
datetime.utcnow() - validation_start
).total_seconds()
* 1000,
"phases_completed": 0,
"framework_error": str(e),
"validation_timestamp": validation_start.isoformat(),
},
)
# Log critical validation failure
if self._audit_logger:
await self._audit_logger.log_event(
level=AuditLevel.CRITICAL,
category=AuditCategory.SECURITY_VALIDATION,
operation="agent_creation_validation_failure",
resource_type="validation_framework",
resource_id="agent_creation_validator",
success=False,
error_message=str(e),
)
return failure_result
async def _sanitize_and_validate_inputs(
self,
agent_name: str,
specialization: Optional[str],
system_prompt_suffix: str,
claude_config: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
"""Sanitize and validate all input parameters."""
security_violations = []
validation_errors = []
sanitized_inputs = {}
try:
# Sanitize agent name
sanitized_agent_name = sanitize_user_input(agent_name.strip())
# Check for dangerous patterns in agent name
if any(
pattern.search(sanitized_agent_name) for pattern in DANGEROUS_PATTERNS
):
security_violations.append(
f"Agent name contains dangerous patterns: {agent_name}"
)
else:
sanitized_inputs["agent_name"] = sanitized_agent_name
# Sanitize specialization if provided
if specialization:
sanitized_specialization = sanitize_user_input(
specialization.strip().upper()
)
# Check for dangerous patterns
if any(
pattern.search(sanitized_specialization)
for pattern in DANGEROUS_PATTERNS
):
security_violations.append(
f"Specialization contains dangerous patterns: {specialization}"
)
else:
sanitized_inputs["specialization"] = sanitized_specialization
# Sanitize system prompt suffix
if system_prompt_suffix:
sanitized_prompt_suffix = sanitize_user_input(system_prompt_suffix)
# Check length limits
if len(sanitized_prompt_suffix) > SYSTEM_PROMPT_SUFFIX_MAX_LENGTH:
validation_errors.append(
f"System prompt suffix exceeds maximum length: {len(sanitized_prompt_suffix)} > {SYSTEM_PROMPT_SUFFIX_MAX_LENGTH}"
)
# Check for dangerous patterns
if any(
pattern.search(sanitized_prompt_suffix)
for pattern in DANGEROUS_PATTERNS
):
security_violations.append(
"System prompt suffix contains dangerous patterns"
)
else:
sanitized_inputs["system_prompt_suffix"] = sanitized_prompt_suffix
return {
"success": len(security_violations) == 0
and len(validation_errors) == 0,
"sanitized_inputs": sanitized_inputs,
"security_violations": security_violations,
"validation_errors": validation_errors,
}
except Exception as e:
return {
"success": False,
"sanitized_inputs": {},
"security_violations": [f"Input sanitization failed: {e}"],
"validation_errors": [],
}
async def _validate_agent_name(
self, agent_name: str, validation_context: ValidationContext
) -> Dict[str, Any]:
"""Validate agent name format and uniqueness."""
errors = []
try:
# Check length constraints
if (
len(agent_name) < AGENT_NAME_MIN_LENGTH
or len(agent_name) > AGENT_NAME_MAX_LENGTH
):
errors.append(
f"Agent name length {len(agent_name)} must be between {AGENT_NAME_MIN_LENGTH} and {AGENT_NAME_MAX_LENGTH}"
)
# Check pattern matching
if not AGENT_NAME_PATTERN.match(agent_name):
errors.append(
f"Agent name '{agent_name}' must follow 'Agent_#' format (e.g., 'Agent_1', 'Agent_42')"
)
# Validate agent name using type system
try:
validate_agent_name(agent_name)
except Exception as e:
errors.append(f"Agent name validation failed: {e}")
return {"is_valid": len(errors) == 0, "errors": errors}
except Exception as e:
return {"is_valid": False, "errors": [f"Agent name validation error: {e}"]}
async def _validate_specialization(
self, specialization: str, validation_context: ValidationContext
) -> Dict[str, Any]:
"""Validate agent specialization against whitelist."""
errors = []
warnings = []
try:
# Check against allowed specializations
if specialization not in validation_context.allowed_specializations:
errors.append(
f"Specialization '{specialization}' not in allowed set: {validation_context.allowed_specializations}"
)
# Check length constraints
if len(specialization) > SPECIALIZATION_MAX_LENGTH:
errors.append(
f"Specialization too long: {len(specialization)} > {SPECIALIZATION_MAX_LENGTH}"
)
# Provide helpful suggestions for common typos
if specialization not in validation_context.allowed_specializations:
close_matches = [
spec
for spec in validation_context.allowed_specializations
if spec.lower().startswith(specialization.lower()[:3])
]
if close_matches:
warnings.append(f"Did you mean one of: {close_matches}?")
return {
"is_valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
}
except Exception as e:
return {
"is_valid": False,
"errors": [f"Specialization validation error: {e}"],
"warnings": [],
}
async def _validate_claude_config(
self,
claude_config: Optional[Dict[str, Any]],
validation_context: ValidationContext,
) -> Dict[str, Any]:
"""Validate Claude Code configuration parameters."""
errors = []
warnings = []
validated_config = {}
try:
if claude_config is None:
# Use default configuration
validated_config = {
"model": "sonnet-3.5",
"no_color": True,
"skip_permissions": False,
"verbose": False,
"output_format": "text",
}
else:
# Validate provided configuration
model = claude_config.get("model", "sonnet-3.5")
if model not in ALLOWED_CLAUDE_MODELS:
errors.append(
f"Claude model '{model}' not in allowed set: {ALLOWED_CLAUDE_MODELS}"
)
else:
validated_config["model"] = model
# Validate boolean parameters
validated_config["no_color"] = bool(claude_config.get("no_color", True))
validated_config["skip_permissions"] = bool(
claude_config.get("skip_permissions", False)
)
validated_config["verbose"] = bool(claude_config.get("verbose", False))
# Validate output format
output_format = claude_config.get("output_format", "text")
if output_format not in ["text", "json", "markdown"]:
warnings.append(
f"Output format '{output_format}' not recommended, using 'text'"
)
validated_config["output_format"] = "text"
else:
validated_config["output_format"] = output_format
# Security check: skip_permissions should not be True in production
if validated_config[
"skip_permissions"
] and validation_context.security_level in [
SecurityLevel.HIGH,
SecurityLevel.MAXIMUM,
]:
errors.append(
"skip_permissions=True not allowed at HIGH or MAXIMUM security levels"
)
return {
"is_valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
"validated_config": validated_config,
}
except Exception as e:
return {
"is_valid": False,
"errors": [f"Claude config validation error: {e}"],
"warnings": [],
"validated_config": {},
}
async def _validate_system_prompt_suffix(
self, system_prompt_suffix: str, validation_context: ValidationContext
) -> Dict[str, Any]:
"""Validate system prompt suffix for security violations."""
security_violations = []
try:
# Check for dangerous patterns
for pattern in DANGEROUS_PATTERNS:
if pattern.search(system_prompt_suffix):
security_violations.append(
f"System prompt contains dangerous pattern: {pattern.pattern}"
)
# Check for potential prompt injection attempts
prompt_injection_patterns = [
re.compile(r"ignore\s+previous\s+instructions", re.IGNORECASE),
re.compile(r"system\s*:\s*you\s+are", re.IGNORECASE),
re.compile(r"forget\s+everything", re.IGNORECASE),
re.compile(r"new\s+instructions", re.IGNORECASE),
]
for pattern in prompt_injection_patterns:
if pattern.search(system_prompt_suffix):
security_violations.append(
f"Potential prompt injection detected: {pattern.pattern}"
)
# Check for executable content
executable_patterns = [
re.compile(r"<script", re.IGNORECASE),
re.compile(r"javascript:", re.IGNORECASE),
re.compile(r"eval\s*\(", re.IGNORECASE),
]
for pattern in executable_patterns:
if pattern.search(system_prompt_suffix):
security_violations.append(
f"Executable content detected: {pattern.pattern}"
)
return {
"is_valid": len(security_violations) == 0,
"security_violations": security_violations,
}
except Exception as e:
return {
"is_valid": False,
"security_violations": [f"System prompt validation error: {e}"],
}
async def _validate_resource_availability(
self, validation_context: ValidationContext
) -> Dict[str, Any]:
"""Validate system resource availability for new agent."""
errors = []
try:
# Check agent capacity
if not validation_context.has_capacity_for_agent():
errors.append(
f"Session at capacity: {validation_context.current_agent_count}/{validation_context.max_agents_in_session} agents"
)
# Check memory availability
if validation_context.available_memory_mb < 512:
errors.append(
f"Insufficient memory: {validation_context.available_memory_mb}MB < 512MB required"
)
# Check CPU availability
if validation_context.available_cpu_percent < 25.0:
errors.append(
f"Insufficient CPU: {validation_context.available_cpu_percent}% < 25% required"
)
return {"has_capacity": len(errors) == 0, "errors": errors}
except Exception as e:
return {
"has_capacity": False,
"errors": [f"Resource validation error: {e}"],
}
async def _log_validation_result(
self,
validation_result: AgentCreationValidationResult,
validation_context: ValidationContext,
) -> None:
"""Log validation result with comprehensive audit trail."""
if not self._audit_logger:
return
try:
audit_level = (
AuditLevel.INFO if validation_result.is_valid else AuditLevel.WARNING
)
if validation_result.security_violations:
audit_level = AuditLevel.ERROR
await self._audit_logger.log_event(
level=audit_level,
category=AuditCategory.SECURITY_VALIDATION,
operation="agent_creation_validation",
resource_type="agent_creation_request",
resource_id=str(validation_context.session_id),
success=validation_result.is_valid,
error_message=(
validation_result.get_failure_summary()
if not validation_result.is_valid
else None
),
metadata={
"security_clearance": validation_result.security_clearance,
"resource_clearance": validation_result.resource_clearance,
"validation_errors_count": len(validation_result.validation_errors),
"security_violations_count": len(
validation_result.security_violations
),
"warnings_count": len(validation_result.validation_warnings),
"security_level": validation_context.security_level.value,
"validation_duration_ms": validation_result.validation_metadata.get(
"validation_duration_ms", 0
),
},
)
except Exception:
# Best effort logging
pass
# Validation decorator for agent creation operations
def validate_agent_creation_request(func):
"""
Decorator for comprehensive agent creation validation.
Provides automatic validation of agent creation requests with
security enforcement and comprehensive error handling.
"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
validator = AgentCreationValidator()
await validator.initialize()
# Extract validation parameters
session_id = kwargs.get("session_id")
agent_name = kwargs.get("agent_name")
specialization = kwargs.get("specialization")
system_prompt_suffix = kwargs.get("system_prompt_suffix", "")
claude_config = kwargs.get("claude_config")
if not session_id or not agent_name:
raise AgentCreationValidationError(
"session_id and agent_name are required for validation"
)
# Perform validation
validation_result = await validator.validate_agent_creation_request(
session_id=session_id,
agent_name=agent_name,
specialization=specialization,
system_prompt_suffix=system_prompt_suffix,
claude_config=claude_config,
)
# Check validation result
if not validation_result.is_valid:
raise AgentCreationValidationError(
f"Agent creation validation failed: {validation_result.get_failure_summary()}"
)
# Update kwargs with validated request
if validation_result.validated_request:
kwargs.update(
{
"session_id": validation_result.validated_request.session_id,
"agent_name": validation_result.validated_request.agent_name,
"specialization": validation_result.validated_request.specialization,
"system_prompt_suffix": validation_result.validated_request.system_prompt_suffix,
"claude_config": validation_result.validated_request.claude_config,
}
)
return await func(*args, **kwargs)
return wrapper
# Export validation functionality
__all__ = [
"AgentCreationValidationError",
"SecurityValidationError",
"ResourceValidationError",
"ValidationContext",
"AgentCreationValidationResult",
"AgentCreationValidator",
"validate_agent_creation_request",
"AGENT_NAME_PATTERN",
"ALLOWED_SPECIALIZATIONS",
"ALLOWED_CLAUDE_MODELS",
]