"""
Filesystem Boundary Enforcement and Path Validation
Architecture Integration:
- Design Patterns: Chain of Responsibility for validation layers, Guard for access control
- Security Model: Path traversal prevention, directory jailing, resource limits
- Performance Profile: O(1) path validation, O(log n) directory tree traversal
Technical Decisions:
- Path Normalization: Resolve all symbolic links and relative references
- Directory Jailing: Strict boundary enforcement preventing escape
- Resource Monitoring: Track file descriptor usage and disk space
Dependencies & Integration:
- External: pathlib for secure path operations, psutil for resource monitoring
- Internal: Types system for path validation and security contexts
Quality Assurance:
- Test Coverage: Property-based testing for path manipulation attempts
- Error Handling: Secure failure modes preventing information disclosure
Author: Adder_2 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import fnmatch
import os
import stat
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional, Set, Union
import psutil
from typing_extensions import NewType
from src.utils.contracts_shim import ensure, invariant, require
# Branded types for filesystem security
SecurePath = NewType("SecurePath", Path)
TrustedDirectory = NewType("TrustedDirectory", Path)
ValidatedPath = NewType("ValidatedPath", str)
class FilesystemSecurityError(Exception):
"""Base exception for filesystem security violations."""
def __init__(self, operation: str, path: str, reason: str):
self.operation = operation
self.path = path
self.reason = reason
def validate_directory_access(
directory: Union[str, Path], operation: str = "access"
) -> bool:
"""
Validate directory access permissions.
Args:
directory: Directory path to validate
operation: Operation being performed (read, write, etc.)
Returns:
bool: True if access is allowed
"""
try:
path = Path(directory).resolve()
# Check if path exists
if not path.exists():
return False
# Check if it's a directory
if not path.is_dir():
return False
# Check basic permissions
if operation == "read":
return os.access(path, os.R_OK)
elif operation == "write":
return os.access(path, os.W_OK)
elif operation == "execute":
return os.access(path, os.X_OK)
else:
# Default to read access check
return os.access(path, os.R_OK)
except (OSError, ValueError):
return False
class PathTraversalError(FilesystemSecurityError):
"""Exception for path traversal attempts."""
pass
class AccessViolationError(FilesystemSecurityError):
"""Exception for access boundary violations."""
pass
class ResourceLimitError(FilesystemSecurityError):
"""Exception for resource limit violations."""
pass
class PermissionLevel(Enum):
"""Filesystem permission levels for security boundaries."""
READ_ONLY = auto()
READ_WRITE = auto()
EXECUTE = auto()
FULL_ACCESS = auto()
@dataclass(frozen=True)
class ResourceLimits:
"""Resource usage limits for filesystem operations."""
max_file_size: int = 100 * 1024 * 1024 # 100MB default
max_total_files: int = 10000
max_directory_depth: int = 50
max_open_files: int = 1000
max_disk_usage: int = 1024 * 1024 * 1024 # 1GB default
def __post_init__(self):
"""Validate resource limits are reasonable."""
assert self.max_file_size > 0, "File size limit must be positive"
assert self.max_total_files > 0, "File count limit must be positive"
assert self.max_directory_depth > 0, "Directory depth must be positive"
@dataclass(frozen=True)
class AccessRule:
"""Filesystem access rule with pattern matching."""
pattern: str # Glob pattern for paths
permission: PermissionLevel
description: str
def matches(self, path: Path) -> bool:
"""Check if this rule applies to the given path."""
return fnmatch.fnmatch(str(path), self.pattern)
@dataclass
class SecurityContext:
"""Security context for filesystem operations."""
session_root: TrustedDirectory
allowed_directories: Set[TrustedDirectory] = field(default_factory=set)
blocked_patterns: Set[str] = field(default_factory=set)
access_rules: List[AccessRule] = field(default_factory=list)
resource_limits: ResourceLimits = field(default_factory=ResourceLimits)
audit_enabled: bool = True
def __post_init__(self):
"""Initialize security context with defaults."""
# Add session root to allowed directories
self.allowed_directories.add(self.session_root)
# Default blocked patterns for security
default_blocked = {
"/etc/*",
"/proc/*",
"/sys/*",
"/dev/*",
"/root/*",
"*/.*", # Hidden files
"*.key", # Key files
"*.pem", # Certificate files
"*.p12", # Certificate stores
}
self.blocked_patterns.update(default_blocked)
class FilesystemBoundaryEnforcer:
"""
Comprehensive filesystem boundary enforcement with security validation.
Contracts:
Preconditions:
- Security context must be properly initialized
- All paths must be validated before filesystem access
Postconditions:
- No access outside defined boundaries
- All operations respect resource limits
- Security violations are logged and blocked
Invariants:
- Session root directory cannot be escaped
- Resource limits are never exceeded
- All access attempts are audited
"""
def __init__(self, security_context: SecurityContext):
"""Initialize boundary enforcer with security context."""
self._context = security_context
self._current_usage: Dict[str, Any] = {
"file_count": 0,
"total_size": 0,
"open_files": 0,
}
self._audit_log: List[Dict[str, Any]] = []
# Ensure session root exists and is accessible
self._validate_session_root()
@require(lambda self: self._context.session_root.exists())
@ensure(lambda self: self._context.session_root.is_dir())
def _validate_session_root(self) -> None:
"""Validate session root directory is secure and accessible."""
root = self._context.session_root
if not root.exists():
raise AccessViolationError(
"initialization", str(root), "Session root does not exist"
)
if not root.is_dir():
raise AccessViolationError(
"initialization", str(root), "Session root is not a directory"
)
# Check permissions
try:
# Test read access
list(root.iterdir())
except PermissionError:
raise AccessViolationError(
"initialization", str(root), "Insufficient permissions on session root"
)
@require(lambda path: isinstance(path, (str, Path)))
@ensure(lambda result: isinstance(result, Path))
def validate_path(
self, path: Union[str, Path], operation: str = "access"
) -> SecurePath:
"""
Validate path for security compliance and boundary enforcement.
Args:
path: Path to validate
operation: Operation type for audit logging
Returns:
Validated secure path
Raises:
PathTraversalError: If path attempts traversal outside boundaries
AccessViolationError: If path violates access rules
"""
if isinstance(path, str):
path = Path(path)
# Convert to absolute path and resolve all symbolic links
try:
resolved_path = path.resolve()
except (OSError, RuntimeError) as e:
raise PathTraversalError(
operation, str(path), f"Path resolution failed: {e}"
)
# Check for path traversal attempts
self._check_path_traversal(resolved_path, operation)
# Check against blocked patterns
self._check_blocked_patterns(resolved_path, operation)
# Validate against access rules
self._validate_access_rules(resolved_path, operation)
# Log access attempt
self._audit_access(resolved_path, operation, "ALLOWED")
return SecurePath(resolved_path)
@require(lambda self, path: isinstance(path, Path))
def _check_path_traversal(self, path: Path, operation: str) -> None:
"""Check for path traversal outside session boundaries."""
# Ensure path is within session root
try:
path.relative_to(self._context.session_root)
except ValueError:
# Check if path is in any allowed directory
for allowed_dir in self._context.allowed_directories:
try:
path.relative_to(allowed_dir)
return # Path is within an allowed directory
except ValueError:
continue
# Path is not within any allowed boundary
raise PathTraversalError(
operation,
str(path),
f"Path outside session boundaries: {self._context.session_root}",
)
@require(lambda self, path: isinstance(path, Path))
def _check_blocked_patterns(self, path: Path, operation: str) -> None:
"""Check path against blocked patterns."""
path_str = str(path)
for pattern in self._context.blocked_patterns:
if fnmatch.fnmatch(path_str, pattern):
raise AccessViolationError(
operation, path_str, f"Path matches blocked pattern: {pattern}"
)
@require(lambda self, path: isinstance(path, Path))
def _validate_access_rules(self, path: Path, operation: str) -> None:
"""Validate path against access rules."""
# If no rules defined, allow access within boundaries
if not self._context.access_rules:
return
# Check each rule for matches
for rule in self._context.access_rules:
if rule.matches(path):
# TODO: Implement operation-specific permission checking
# For now, we just log the rule match
self._audit_access(path, operation, f"RULE_MATCH:{rule.description}")
return
# No matching rules - use default behavior (allow within boundaries)
return
def _audit_access(self, path: Path, operation: str, status: str) -> None:
"""Audit filesystem access attempt."""
if not self._context.audit_enabled:
return
audit_entry = {
"timestamp": datetime.utcnow().isoformat(),
"operation": operation,
"path": str(path),
"status": status,
"session_root": str(self._context.session_root),
}
self._audit_log.append(audit_entry)
# Keep audit log size manageable
if len(self._audit_log) > 10000:
self._audit_log = self._audit_log[-5000:] # Keep most recent 5000 entries
@require(lambda self, path: isinstance(path, (str, Path)))
def check_file_creation(
self, path: Union[str, Path], file_size: int = 0
) -> SecurePath:
"""
Validate file creation against resource limits and security rules.
Args:
path: Path for new file
file_size: Expected file size in bytes
Returns:
Validated secure path for file creation
Raises:
ResourceLimitError: If creation would exceed resource limits
"""
validated_path = self.validate_path(path, "file_creation")
# Check resource limits
limits = self._context.resource_limits
# Check file count limit
if self._current_usage["file_count"] >= limits.max_total_files:
raise ResourceLimitError(
"file_creation",
str(validated_path),
f"File count limit exceeded: {limits.max_total_files}",
)
# Check file size limit
if file_size > limits.max_file_size:
raise ResourceLimitError(
"file_creation",
str(validated_path),
f"File size limit exceeded: {limits.max_file_size} bytes",
)
# Check total disk usage
if self._current_usage["total_size"] + file_size > limits.max_disk_usage:
raise ResourceLimitError(
"file_creation",
str(validated_path),
f"Disk usage limit exceeded: {limits.max_disk_usage} bytes",
)
# Check directory depth
relative_path = validated_path.relative_to(self._context.session_root)
depth = len(relative_path.parts)
if depth > limits.max_directory_depth:
raise ResourceLimitError(
"file_creation",
str(validated_path),
f"Directory depth limit exceeded: {limits.max_directory_depth}",
)
return validated_path
@require(lambda self, directory: isinstance(directory, (str, Path)))
def check_directory_access(
self,
directory: Union[str, Path],
permission: PermissionLevel = PermissionLevel.READ_ONLY,
) -> SecurePath:
"""
Validate directory access with permission checking.
Args:
directory: Directory path to validate
permission: Required permission level
Returns:
Validated secure directory path
"""
validated_path = self.validate_path(
directory, f"directory_access:{permission.name}"
)
if not validated_path.is_dir():
raise AccessViolationError(
"directory_access", str(validated_path), "Path is not a directory"
)
# Check actual filesystem permissions
self._check_filesystem_permissions(validated_path, permission)
return validated_path
def _check_filesystem_permissions(
self, path: Path, required_permission: PermissionLevel
) -> None:
"""Check actual filesystem permissions."""
try:
path_stat = path.stat()
mode = path_stat.st_mode
# Check owner permissions (assuming we're the owner)
can_read = bool(mode & stat.S_IRUSR)
can_write = bool(mode & stat.S_IWUSR)
can_execute = bool(mode & stat.S_IXUSR)
if required_permission == PermissionLevel.READ_ONLY and not can_read:
raise AccessViolationError(
"permission_check", str(path), "Read permission denied"
)
elif required_permission == PermissionLevel.READ_WRITE and not (
can_read and can_write
):
raise AccessViolationError(
"permission_check", str(path), "Read/write permission denied"
)
elif required_permission == PermissionLevel.EXECUTE and not can_execute:
raise AccessViolationError(
"permission_check", str(path), "Execute permission denied"
)
elif required_permission == PermissionLevel.FULL_ACCESS and not (
can_read and can_write and can_execute
):
raise AccessViolationError(
"permission_check", str(path), "Full access permission denied"
)
except OSError as e:
raise AccessViolationError(
"permission_check", str(path), f"Permission check failed: {e}"
)
def update_resource_usage(self, file_path: Path, operation: str) -> None:
"""Update resource usage tracking."""
if operation == "file_created":
self._current_usage["file_count"] += 1
if file_path.exists():
self._current_usage["total_size"] += file_path.stat().st_size
elif operation == "file_deleted":
self._current_usage["file_count"] = max(
0, self._current_usage["file_count"] - 1
)
# Note: We don't decrease total_size as it's tracked separately
def get_resource_usage(self) -> Dict[str, Any]:
"""Get current resource usage statistics."""
# Update with actual filesystem stats
try:
total_size = 0
file_count = 0
for root, dirs, files in os.walk(self._context.session_root):
file_count += len(files)
for file in files:
file_path = Path(root) / file
try:
total_size += file_path.stat().st_size
except OSError:
continue # Skip files we can't stat
self._current_usage.update(
{
"file_count": file_count,
"total_size": total_size,
"open_files": len(psutil.Process().open_files()),
}
)
except Exception:
# If we can't get stats, keep current values
pass
return self._current_usage.copy()
def get_audit_log(self, limit: int = 1000) -> List[Dict[str, Any]]:
"""Get recent audit log entries."""
return self._audit_log[-limit:] if self._audit_log else []
def create_subdirectory_context(
self, subdirectory: Union[str, Path]
) -> "FilesystemBoundaryEnforcer":
"""
Create a new boundary enforcer for a subdirectory with inherited rules.
Args:
subdirectory: Subdirectory within current session root
Returns:
New FilesystemBoundaryEnforcer for the subdirectory
"""
validated_subdir = self.check_directory_access(
subdirectory, PermissionLevel.READ_WRITE
)
# Create new security context inheriting from current
new_context = SecurityContext(
session_root=TrustedDirectory(validated_subdir),
allowed_directories=set(), # Only the new subdirectory
blocked_patterns=self._context.blocked_patterns.copy(),
access_rules=self._context.access_rules.copy(),
resource_limits=self._context.resource_limits,
audit_enabled=self._context.audit_enabled,
)
return FilesystemBoundaryEnforcer(new_context)
# Utility functions for common filesystem security operations
@require(lambda path: isinstance(path, (str, Path)))
@ensure(lambda result: isinstance(result, bool))
def is_safe_filename(path: Union[str, Path]) -> bool:
"""
Check if filename is safe (no dangerous characters).
Args:
path: Filename or path to check
Returns:
True if filename is safe
"""
if isinstance(path, Path):
filename = path.name
else:
filename = Path(path).name
# Dangerous characters and patterns
dangerous = {"..", ".", "\x00", "/", "\\", ":", "*", "?", '"', "<", ">", "|"}
# Check for dangerous characters
for char in dangerous:
if char in filename:
return False
# Check for reserved names (Windows)
reserved_names = {
"CON",
"PRN",
"AUX",
"NUL",
"COM1",
"COM2",
"COM3",
"COM4",
"COM5",
"COM6",
"COM7",
"COM8",
"COM9",
"LPT1",
"LPT2",
"LPT3",
"LPT4",
"LPT5",
"LPT6",
"LPT7",
"LPT8",
"LPT9",
}
if filename.upper() in reserved_names:
return False
# Check length
if len(filename) > 255: # Common filesystem limit
return False
return True
@require(lambda path: isinstance(path, (str, Path)))
def sanitize_filename(path: Union[str, Path]) -> str:
"""
Sanitize filename by removing or replacing dangerous characters.
Args:
path: Filename or path to sanitize
Returns:
Sanitized filename
"""
if isinstance(path, Path):
filename = path.name
else:
filename = Path(path).name
# Replace dangerous characters with underscores
dangerous_chars = '/\\:*?"<>|\x00'
sanitized = filename
for char in dangerous_chars:
sanitized = sanitized.replace(char, "_")
# Remove leading/trailing dots and spaces
sanitized = sanitized.strip(". ")
# Ensure not empty
if not sanitized:
sanitized = "file"
# Truncate if too long
if len(sanitized) > 255:
sanitized = sanitized[:255]
return sanitized
# Export main classes and functions
__all__ = [
"FilesystemBoundaryEnforcer",
"SecurityContext",
"ResourceLimits",
"AccessRule",
"PermissionLevel",
"FilesystemSecurityError",
"PathTraversalError",
"AccessViolationError",
"ResourceLimitError",
"SecurePath",
"TrustedDirectory",
"ValidatedPath",
"is_safe_filename",
"sanitize_filename",
]