"""Command validation and output limiting for SSH sessions."""
import re
from typing import Optional, Tuple
class CommandValidator:
"""Validates commands for safety before execution."""
# Maximum output size in bytes (10MB)
MAX_OUTPUT_SIZE = 10 * 1024 * 1024
# Patterns that indicate streaming/indefinite commands
STREAMING_PATTERNS = []
# Patterns for background processes
BACKGROUND_PATTERNS = [
r'&\s*$', # Command ending with &
r'\bnohup\b',
r'\bdisown\b',
r'\bscreen\b',
r'\btmux\b',
]
# Potentially dangerous commands (optional - can be enabled/disabled)
DANGEROUS_PATTERNS = [
r'\brm\s+.*-rf\s+/(?!home|tmp)', # rm -rf on root paths
r'\bdd\s+.*of=/dev/', # dd to device files
r'\b:\(\)\{.*:\|:.*\};:', # fork bomb
r'\bmkfs\b',
r'\bformat\b',
]
@classmethod
def validate_command(cls, command: str, check_dangerous: bool = False) -> Tuple[bool, Optional[str]]:
"""
Validate a command for safety.
Args:
command: The command to validate
check_dangerous: Whether to check for dangerous patterns
Returns:
Tuple of (is_valid: bool, error_message: Optional[str])
"""
command_lower = command.lower().strip()
# Check for streaming patterns
for pattern in cls.STREAMING_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return False, f"Streaming/interactive command blocked: Matches pattern '{pattern}'. Use finite operations (e.g., 'tail -n 100' instead of 'tail -f')."
# Check for background processes
for pattern in cls.BACKGROUND_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return False, f"Background process blocked: Matches pattern '{pattern}'. Background processes are not allowed."
# Check for dangerous commands (optional)
if check_dangerous:
for pattern in cls.DANGEROUS_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return False, f"Dangerous command blocked: Matches pattern '{pattern}'. This operation is not allowed for safety."
return True, None
class OutputLimiter:
"""Limits output size to prevent memory issues."""
def __init__(self, max_size: int = CommandValidator.MAX_OUTPUT_SIZE):
self.max_size = max_size
self.current_size = 0
self.truncated = False
def add_chunk(self, chunk: str) -> Tuple[str, bool]:
"""
Add a chunk of output, enforcing size limits.
Args:
chunk: The chunk of output to add
Returns:
Tuple of (chunk_to_add: str, should_continue: bool)
"""
chunk_size = len(chunk.encode('utf-8'))
if self.current_size + chunk_size > self.max_size:
# Calculate how much we can still add
remaining = self.max_size - self.current_size
if remaining > 0:
# Truncate the chunk
truncated_chunk = chunk.encode('utf-8')[:remaining].decode('utf-8', errors='ignore')
self.current_size = self.max_size
self.truncated = True
truncation_msg = f"\n\n[OUTPUT TRUNCATED: Maximum output size of {self.max_size} bytes exceeded]"
return truncated_chunk + truncation_msg, False
else:
return "", False
self.current_size += chunk_size
return chunk, True