Skip to main content
Glama
validation.py10.1 kB
"""Input validation and sanitization utilities.""" import re import os import logging from typing import Optional, List from pathlib import Path from ..exceptions import TrueNASValidationError logger = logging.getLogger(__name__) class PathValidator: """ Path validation to prevent path traversal attacks. Features: - Path traversal detection - Allowed path prefixes - Symlink detection - Absolute path enforcement """ def __init__(self, allowed_prefixes: Optional[List[str]] = None): """ Initialize path validator. Args: allowed_prefixes: List of allowed path prefixes (e.g., ["/mnt"]) """ self.allowed_prefixes = allowed_prefixes or ["/mnt"] logger.info(f"Path validator initialized with prefixes: {self.allowed_prefixes}") def validate(self, path: str, allow_relative: bool = False) -> str: """ Validate and normalize path. Args: path: Path to validate allow_relative: Whether to allow relative paths Returns: Normalized safe path Raises: TrueNASValidationError: If path is invalid or unsafe """ if not path: raise TrueNASValidationError("Path cannot be empty") # Check for path traversal attempts if ".." in path: raise TrueNASValidationError( "Path traversal detected", details={"path": path, "reason": "contains '..'"} ) # Check for null bytes if "\x00" in path: raise TrueNASValidationError( "Invalid path characters", details={"path": path, "reason": "contains null byte"} ) # Normalize path try: normalized = os.path.normpath(path) except Exception as e: raise TrueNASValidationError( f"Invalid path format: {e}", details={"path": path} ) # Check if absolute path is required if not allow_relative and not os.path.isabs(normalized): # Try to make it absolute with allowed prefix if self.allowed_prefixes: normalized = os.path.join(self.allowed_prefixes[0], normalized) else: raise TrueNASValidationError( "Absolute path required", details={"path": path} ) # Verify path starts with allowed prefix if self.allowed_prefixes: if not any(normalized.startswith(prefix) for prefix in self.allowed_prefixes): raise TrueNASValidationError( f"Path must start with one of: {self.allowed_prefixes}", details={"path": normalized, "allowed_prefixes": self.allowed_prefixes} ) logger.debug(f"Validated path: {path} -> {normalized}") return normalized def validate_dataset_path(self, pool: str, dataset: str) -> str: """ Validate ZFS dataset path. Args: pool: Pool name dataset: Dataset name Returns: Full dataset path (pool/dataset) Raises: TrueNASValidationError: If dataset path is invalid """ # Validate pool name if not re.match(r"^[a-zA-Z0-9_-]+$", pool): raise TrueNASValidationError( "Invalid pool name", details={"pool": pool, "reason": "contains invalid characters"} ) # Validate dataset name (can contain slashes for nested datasets) if not re.match(r"^[a-zA-Z0-9_/-]+$", dataset): raise TrueNASValidationError( "Invalid dataset name", details={"dataset": dataset, "reason": "contains invalid characters"} ) # Check for path traversal in dataset name if ".." in dataset or dataset.startswith("/"): raise TrueNASValidationError( "Invalid dataset name", details={"dataset": dataset, "reason": "path traversal attempt"} ) full_path = f"{pool}/{dataset}" logger.debug(f"Validated dataset path: {full_path}") return full_path class InputSanitizer: """ Input sanitization utilities. Features: - String sanitization - SQL injection prevention - Command injection prevention - XSS prevention """ @staticmethod def sanitize_string(value: str, max_length: int = 255, allow_special: bool = False) -> str: """ Sanitize string input. Args: value: String to sanitize max_length: Maximum allowed length allow_special: Whether to allow special characters Returns: Sanitized string Raises: TrueNASValidationError: If string is invalid """ if not isinstance(value, str): raise TrueNASValidationError( "Value must be a string", details={"type": type(value).__name__} ) # Trim whitespace sanitized = value.strip() # Check length if len(sanitized) == 0: raise TrueNASValidationError("String cannot be empty") if len(sanitized) > max_length: raise TrueNASValidationError( f"String exceeds maximum length of {max_length}", details={"length": len(sanitized), "max_length": max_length} ) # Check for null bytes if "\x00" in sanitized: raise TrueNASValidationError("String contains null bytes") # Restrict to safe characters if needed if not allow_special: if not re.match(r"^[a-zA-Z0-9_\s-]+$", sanitized): raise TrueNASValidationError( "String contains invalid characters", details={"allowed": "alphanumeric, underscore, hyphen, space"} ) return sanitized @staticmethod def sanitize_username(username: str) -> str: """ Sanitize username. Args: username: Username to sanitize Returns: Sanitized username """ username = username.strip() # Username validation rules if len(username) == 0: raise TrueNASValidationError("Username cannot be empty") if len(username) > 32: raise TrueNASValidationError("Username too long (max 32 characters)") # Must start with letter if not username[0].isalpha(): raise TrueNASValidationError("Username must start with a letter") # Only alphanumeric and underscore if not re.match(r"^[a-zA-Z][a-zA-Z0-9_-]*$", username): raise TrueNASValidationError( "Username can only contain letters, numbers, underscore, and hyphen" ) return username.lower() @staticmethod def sanitize_email(email: str) -> str: """ Sanitize and validate email address. Args: email: Email address to sanitize Returns: Sanitized email address """ email = email.strip().lower() # Basic email validation email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" if not re.match(email_pattern, email): raise TrueNASValidationError( "Invalid email address format", details={"email": email} ) if len(email) > 254: # RFC 5321 raise TrueNASValidationError("Email address too long") return email @staticmethod def sanitize_command(command: str, allowed_commands: Optional[List[str]] = None) -> str: """ Sanitize shell command to prevent command injection. Args: command: Command to sanitize allowed_commands: List of allowed command names Returns: Sanitized command Raises: TrueNASValidationError: If command is unsafe """ command = command.strip() # Check for command injection patterns dangerous_chars = [";", "&", "|", "`", "$", "(", ")", "<", ">", "\n", "\r"] if any(char in command for char in dangerous_chars): raise TrueNASValidationError( "Command contains dangerous characters", details={"command": command} ) # Check against allowed commands if allowed_commands: cmd_name = command.split()[0] if command else "" if cmd_name not in allowed_commands: raise TrueNASValidationError( f"Command not allowed. Allowed commands: {allowed_commands}", details={"command": cmd_name} ) return command @staticmethod def validate_port(port: int) -> int: """ Validate port number. Args: port: Port number to validate Returns: Validated port number """ if not isinstance(port, int): raise TrueNASValidationError("Port must be an integer") if port < 1 or port > 65535: raise TrueNASValidationError( "Port must be between 1 and 65535", details={"port": port} ) # Warn about privileged ports if port < 1024: logger.warning(f"Using privileged port: {port}") return port @staticmethod def validate_ip_address(ip: str) -> str: """ Validate IP address. Args: ip: IP address to validate Returns: Validated IP address """ import ipaddress try: # This will raise ValueError if invalid ipaddress.ip_address(ip) return ip except ValueError: raise TrueNASValidationError( "Invalid IP address", details={"ip": ip} )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vespo92/TrueNasCoreMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server