Skip to main content
Glama
security.py31.9 kB
"""Security module for MCP filesystem server. This module handles path validation, normalization, and security checks to ensure that all file operations are restricted to allowed directories. """ import platform import re import logging from pathlib import Path from typing import Any, List, Optional, Set, Tuple, Union import anyio try: # Prefer MCP's structured logger when available from mcp.server.fastmcp.utilities.logging import get_logger # type: ignore except ModuleNotFoundError: # Fall back to stdlib logging when MCP runtime isn't installed (e.g., unit tests) def get_logger(name: str) -> logging.Logger: # type: ignore return logging.getLogger(name) logger = get_logger(__name__) # Force INFO level logging for debugging logger.setLevel(logging.INFO) class PathValidator: """Security class for validating and normalizing file paths. Supports virtual root mode where a single allowed directory is presented as "/" to the client, hiding the real physical path. """ def __init__( self, allowed_dirs: List[Union[str, Path]], virtual_root: Optional[Path] = None, ): """Initialize with a list of allowed directories. Args: allowed_dirs: List of directories that are allowed for file operations. Paths are normalized to absolute paths. virtual_root: If set, this directory is presented as "/" to clients. All paths are translated relative to this root. Typically used for session isolation. """ self.allowed_dirs: Set[str] = set() self.virtual_root: Optional[Path] = None # Set up virtual root if provided if virtual_root is not None: self.virtual_root = Path(virtual_root).expanduser().resolve() logger.info(f"Virtual root enabled: {self.virtual_root}") # Normalize and validate allowed directories for directory in allowed_dirs: try: # Convert to Path object and resolve to absolute path abs_path = Path(directory).expanduser().resolve() # Check if it's actually a directory if not abs_path.is_dir(): logger.warning( f"Allowed path is not a directory: {abs_path}", extra={"path": str(abs_path)}, ) continue # Add to allowed set in normalized form self.allowed_dirs.add(self._normalize_case(str(abs_path))) logger.debug(f"Added allowed directory: {abs_path}") except (PermissionError, FileNotFoundError) as e: logger.error( f"Error accessing allowed directory {directory}: {e}", extra={"error": str(e), "path": str(directory)}, ) if not self.allowed_dirs: logger.warning("No valid allowed directories provided!") def _normalize_case(self, path: str) -> str: """Normalize path case based on platform. On Windows, convert to lowercase for case-insensitive comparison. On other platforms, keep the original case. Args: path: Path to normalize Returns: Normalized path """ if platform.system() == "Windows": return path.lower() return path async def validate_path( self, requested_path: str ) -> Tuple[Path, bool]: """Validate if a path is within allowed directories. If virtual_root is set, the requested_path is treated as a virtual path and translated to a real path before validation. IMPORTANT: This method ONLY accepts virtual path strings (str). Do NOT pass Path objects - this will cause path duplication errors. Always use virtual path strings like "/file.txt" or "folder/file.txt". Args: requested_path: Virtual path string to validate (NOT a Path object) Returns: Tuple of (resolved_real_path, is_allowed) Raises: TypeError: If requested_path is not a string ValueError: If path is invalid or outside allowed directories """ # Type check: enforce str only if not isinstance(requested_path, str): raise TypeError( f"validate_path expects str (virtual path string), got {type(requested_path).__name__}. " f"Do not pass Path objects - use virtual path strings like '/file.txt'. " f"Got value: {requested_path}" ) # URL decode the path to handle spaces and special characters. # # NOTE: We must be careful here: # - Some clients incorrectly *store* encoded names on disk (e.g. "new%205.json") # - Some clients correctly send encoded paths to refer to a real-space name (e.g. "new%205.json" -> "new 5.json") # # To be compatible with both, we compute both raw and decoded candidates. # If one exists on disk, we prefer the existing one; otherwise default to decoded. from urllib.parse import unquote, quote requested_path_raw = requested_path requested_path_decoded = unquote(requested_path) # Legacy compatibility: if a file was mistakenly persisted with URL-encoded characters # (e.g. "new%205.json"), a user may still pass the decoded form ("new 5.json"). # We generate an encoded candidate and prefer it only if it exists. requested_path_encoded: Optional[str] = None if requested_path_raw == requested_path_decoded: encoded = quote(requested_path_raw, safe="/") if encoded != requested_path_raw: requested_path_encoded = encoded try: logger.info( "validate_path: INPUT requested_path_raw=%s, requested_path_decoded=%s, virtual_root=%s", requested_path_raw, requested_path_decoded, self.virtual_root, ) def _to_abs_path(candidate_virtual_path: str) -> Path: """Convert a virtual/real candidate path string into an absolute Path following existing rules.""" if self.virtual_root is not None: real_path = self.virtual_to_real(candidate_virtual_path) try: return real_path.resolve() except (OSError, ValueError): abs_path_local = real_path.absolute() if ".." in str(abs_path_local): try: parent_resolved = abs_path_local.parent.resolve() abs_path_local = parent_resolved / abs_path_local.name except (OSError, ValueError): logger.warning(f"Path contains .. and cannot be resolved: {abs_path_local}") raise ValueError( "Path traversal not allowed: path contains '..' and cannot be safely resolved" ) return abs_path_local # No virtual root try: return Path(candidate_virtual_path).resolve() except (OSError, ValueError): abs_path_local = Path(candidate_virtual_path).absolute() if ".." in str(abs_path_local): try: parent_resolved = abs_path_local.parent.resolve() abs_path_local = parent_resolved / abs_path_local.name except (OSError, ValueError): logger.warning(f"Path contains .. and cannot be resolved: {abs_path_local}") raise ValueError( "Path traversal not allowed: path contains '..' and cannot be safely resolved" ) return abs_path_local chosen_virtual_path = requested_path_decoded chosen_abs_path: Optional[Path] = None raw_abs_path: Optional[Path] = None decoded_abs_path: Optional[Path] = None encoded_abs_path: Optional[Path] = None # Try compute both; if decoding introduces traversal, it will raise and we fail fast (safer). decoded_abs_path = _to_abs_path(requested_path_decoded) if requested_path_raw != requested_path_decoded: raw_abs_path = _to_abs_path(requested_path_raw) if requested_path_encoded: encoded_abs_path = _to_abs_path(requested_path_encoded) raw_exists = raw_abs_path.exists() if raw_abs_path is not None else False decoded_exists = decoded_abs_path.exists() encoded_exists = encoded_abs_path.exists() if encoded_abs_path is not None else False # Prefer the path that actually exists. # - decoded wins when present (the "correct" representation) # - raw wins for truly-encoded-on-disk legacy names when caller passes encoded # - encoded wins for legacy names when caller passes decoded if decoded_exists: chosen_virtual_path = requested_path_decoded chosen_abs_path = decoded_abs_path elif raw_exists: chosen_virtual_path = requested_path_raw chosen_abs_path = raw_abs_path elif encoded_exists and requested_path_encoded: chosen_virtual_path = requested_path_encoded chosen_abs_path = encoded_abs_path else: # For creation/nonexistent targets, default to decoded so new files are created with real characters. chosen_virtual_path = requested_path_decoded chosen_abs_path = decoded_abs_path abs_path = chosen_abs_path requested_path = chosen_virtual_path logger.info( "validate_path: Chosen requested_path=%s, abs_path=%s (raw_exists=%s, decoded_exists=%s, encoded_exists=%s)", requested_path, abs_path, (raw_abs_path.exists() if raw_abs_path is not None else None), (decoded_abs_path.exists() if decoded_abs_path is not None else None), (encoded_abs_path.exists() if encoded_abs_path is not None else None), ) normalized = self._normalize_case(str(abs_path)) # Check if path is within allowed directories for allowed_dir in self.allowed_dirs: if normalized.startswith(allowed_dir): logger.info(f"validate_path: Path ALLOWED, returning {abs_path}") return abs_path, True # Handle case where path doesn't exist yet but parent directory does if not abs_path.exists(): parent_path = abs_path.parent try: parent_abs = parent_path.resolve() parent_normalized = self._normalize_case(str(parent_abs)) for allowed_dir in self.allowed_dirs: if parent_normalized.startswith(allowed_dir): return abs_path, True except (FileNotFoundError, PermissionError): pass logger.warning( f"Access denied - path outside allowed directories: {abs_path}", extra={"path": str(abs_path)}, ) return abs_path, False except (FileNotFoundError, PermissionError) as e: logger.error( f"Error validating path: {e}", extra={"error": str(e), "path": str(requested_path)}, ) return Path(requested_path), False async def resolve_symlinks(self, path: Path) -> Tuple[Path, bool]: """Safely resolve symlinks to ensure target is within allowed directories. Args: path: Path that might contain symlinks Returns: Tuple of (resolved_path, is_allowed) """ try: # Try to resolve symlinks real_path = await anyio.to_thread.run_sync(Path.resolve, path) normalized = self._normalize_case(str(real_path)) # Check if resolved path is within allowed directories for allowed_dir in self.allowed_dirs: if normalized.startswith(allowed_dir): return real_path, True logger.warning( f"Access denied - symlink target outside allowed directories: {real_path}", extra={"path": str(real_path), "original": str(path)}, ) return real_path, False except (FileNotFoundError, PermissionError) as e: logger.error( f"Error resolving symlinks: {e}", extra={"error": str(e), "path": str(path)}, ) return path, False def get_allowed_dirs(self) -> List[str]: """Get the list of allowed directories. If virtual_root is enabled, returns ["/"] instead of actual paths. Returns: List of allowed directory paths (virtual if virtual_root is set) """ if self.virtual_root is not None: return ["/"] return sorted(list(self.allowed_dirs)) def virtual_to_real(self, virtual_path: Union[str, Path]) -> Path: """Convert a virtual path to a real filesystem path. If virtual_root is not set, returns the path as-is. Args: virtual_path: Path as seen by the client (e.g., "/todo.txt" or "todo.txt") Returns: Real filesystem path """ if self.virtual_root is None: return Path(virtual_path) # Normalize the virtual path vpath = str(virtual_path) logger.info(f"virtual_to_real: INPUT virtual_path={virtual_path}, virtual_root={self.virtual_root}") # Handle absolute virtual paths (starting with /) if vpath.startswith("/"): vpath = vpath[1:] # Remove leading / # Handle empty path (root) if not vpath or vpath == ".": result = self.virtual_root logger.info(f"virtual_to_real: OUTPUT (root) = {result}") return result # Join with virtual root result = self.virtual_root / vpath logger.info(f"virtual_to_real: OUTPUT = {result}") return result def real_to_virtual(self, real_path: Union[str, Path], strict: bool = True) -> str: """Convert a real filesystem path to a virtual path. If virtual_root is not set, returns only the filename to avoid leaking paths. Args: real_path: Real filesystem path strict: If True, raises ValueError when path is not under virtual_root. If False, returns a sanitized placeholder path. Returns: Virtual path as seen by the client Raises: ValueError: If strict=True and path is not under virtual_root """ if self.virtual_root is None: # WARNING: virtual_root is None - return only filename to avoid path leakage logger.warning( f"real_to_virtual called with virtual_root=None, returning filename only: {real_path}" ) # Return only the filename as a relative path return "/" + Path(real_path).name # Use absolute() instead of resolve() to avoid issues with non-existent files # resolve() may fail or behave unexpectedly if the file doesn't exist yet real = Path(real_path).absolute() virtual_root_abs = self.virtual_root.absolute() logger.info(f"real_to_virtual: real_path={real_path}, real_abs={real}, virtual_root_abs={virtual_root_abs}") try: # Get relative path from virtual root relative = real.relative_to(virtual_root_abs) # Return as absolute virtual path result = "/" + str(relative) logger.info(f"real_to_virtual: SUCCESS - result={result}") return result except ValueError as e: # Path is not under virtual root - this is a security issue! # NEVER expose the real path to the client logger.error( f"SECURITY: Attempted to expose path outside virtual root: {real_path}, error: {e}", extra={"path": str(real_path), "virtual_root": str(self.virtual_root)}, ) if strict: raise ValueError( "Path conversion failed: path is outside the allowed directory" ) # Return a safe placeholder that doesn't expose any real path info return "/[path_not_available]" def is_path_allowed(self, path: Union[str, Path]) -> bool: """Quick check if a path is within allowed directories. Args: path: Path to check Returns: True if path is allowed, False otherwise """ try: abs_path = Path(path).expanduser().resolve() normalized = self._normalize_case(str(abs_path)) for allowed_dir in self.allowed_dirs: if normalized.startswith(allowed_dir): return True return False except (FileNotFoundError, PermissionError): return False async def find_matching_files( self, root_path: Union[str, Path], pattern: str, recursive: bool = True, exclude_patterns: Optional[List[str]] = None, ) -> List[Path]: """Find files matching a pattern within allowed directories. Args: root_path: Starting directory for search pattern: Glob pattern to match against filenames recursive: Whether to search subdirectories exclude_patterns: Optional patterns to exclude Returns: List of matching file paths Raises: ValueError: If root_path is outside allowed directories """ abs_path, allowed = await self.validate_path(root_path) if not allowed: raise ValueError(f"Search path outside allowed directories: {root_path}") if not abs_path.is_dir(): raise ValueError(f"Search path is not a directory: {abs_path}") results = [] exclude_regexes = [] # Compile exclude patterns if provided if exclude_patterns: for exclude in exclude_patterns: try: exclude_regexes.append(re.compile(exclude)) except re.error: logger.warning(f"Invalid exclude pattern: {exclude}") # Use glob for pattern matching glob_pattern = "**/" + pattern if recursive else pattern for matched_path in abs_path.glob(glob_pattern): # Skip if matched by exclude pattern path_str = str(matched_path) excluded = False for exclude_re in exclude_regexes: if exclude_re.search(path_str): excluded = True break if not excluded: # Verify path is still allowed (e.g., in case of symlinks) if self.is_path_allowed(matched_path): results.append(matched_path) return results def sanitize_error(self, error: Exception, user_path: Optional[str] = None) -> str: """Sanitize error message to avoid exposing real filesystem paths. Args: error: The exception to sanitize user_path: The user-provided path (virtual path) to use in error message Returns: A sanitized error message that doesn't expose real paths """ error_msg = str(error) # Remove real path references if virtual root is enabled if self.virtual_root: virtual_root_str = str(self.virtual_root) # Replace any occurrence of the real path with the virtual representation error_msg = error_msg.replace(virtual_root_str, "") # Also handle normalized variations error_msg = error_msg.replace(virtual_root_str.rstrip("/"), "") error_msg = error_msg.replace(virtual_root_str.rstrip("\\"), "") # Remove any allowed directory paths from error messages for allowed_dir in self.allowed_dirs: error_msg = error_msg.replace(allowed_dir, "") error_msg = error_msg.replace(allowed_dir.rstrip("/"), "") error_msg = error_msg.replace(allowed_dir.rstrip("\\"), "") # Clean up double slashes that might result from path removal error_msg = re.sub(r"/{2,}", "/", error_msg) error_msg = re.sub(r"\\{2,}", "\\", error_msg) # If user provided a path, use it for clearer messaging if user_path and "No such file or directory" in error_msg: return f"File not found: {user_path}" if user_path and "Permission denied" in error_msg: return f"Permission denied: {user_path}" if user_path and "Is a directory" in error_msg: return f"Is a directory: {user_path}" if user_path and "Not a directory" in error_msg: return f"Not a directory: {user_path}" return error_msg.strip() def sanitize_error_simple(error: Exception, user_path: Optional[str] = None) -> str: """Simplified error sanitization without PathValidator context. Use this when you don't have access to the PathValidator instance. Args: error: The exception to sanitize user_path: The user-provided path to use in error message Returns: A sanitized error message """ error_type = type(error).__name__ if isinstance(error, FileNotFoundError): if user_path: return f"File not found: {user_path}" return "File not found" if isinstance(error, PermissionError): if user_path: return f"Permission denied: {user_path}" return "Permission denied" if isinstance(error, IsADirectoryError): if user_path: return f"Is a directory: {user_path}" return "Is a directory" if isinstance(error, NotADirectoryError): if user_path: return f"Not a directory: {user_path}" return "Not a directory" if isinstance(error, FileExistsError): if user_path: return f"File already exists: {user_path}" return "File already exists" # For other errors, return the error type without the message # to avoid leaking sensitive information return f"{error_type}: Operation failed" class ResponseSanitizer: """Security tool to sanitize tool responses and prevent path leakage. This class provides methods to check and sanitize all tool responses, ensuring that no physical paths are leaked to clients. """ # Default sensitive path patterns - these should NEVER appear in responses DEFAULT_SENSITIVE_PATTERNS: List[str] = [ "user_data", # Common user data directory pattern "/home/", # Linux home directories "/root/", # Root home directory "/var/", # System directories "/tmp/", # Temp directories "/etc/", # Config directories "/app/", # Application directories "C:\\Users\\", # Windows user directories "C:\\Windows\\", # Windows system directories ] def __init__( self, sensitive_patterns: Optional[List[str]] = None, additional_patterns: Optional[List[str]] = None, ): """Initialize the response sanitizer. Args: sensitive_patterns: Custom list of patterns to check (replaces defaults) additional_patterns: Additional patterns to add to defaults """ if sensitive_patterns is not None: self._patterns = list(sensitive_patterns) else: self._patterns = list(self.DEFAULT_SENSITIVE_PATTERNS) if additional_patterns: self._patterns.extend(additional_patterns) # Compile regex patterns for efficient matching self._compiled_patterns: List[re.Pattern] = [] for pattern in self._patterns: # Escape special regex characters and make case-insensitive escaped = re.escape(pattern) try: self._compiled_patterns.append( re.compile(escaped, re.IGNORECASE) ) except re.error as e: logger.warning(f"Invalid pattern '{pattern}': {e}") def check_for_sensitive_paths(self, value: Any, path: str = "") -> List[str]: """Check a value for sensitive path patterns. Args: value: The value to check (can be str, dict, list, or any type) path: The path to this value in the response structure (for logging) Returns: List of found sensitive patterns with their locations """ violations: List[str] = [] if isinstance(value, str): for i, pattern in enumerate(self._compiled_patterns): if pattern.search(value): violations.append( f"Pattern '{self._patterns[i]}' found at '{path}': {value[:100]}..." if len(value) > 100 else f"Pattern '{self._patterns[i]}' found at '{path}': {value}" ) elif isinstance(value, dict): for key, val in value.items(): child_path = f"{path}.{key}" if path else key violations.extend(self.check_for_sensitive_paths(val, child_path)) elif isinstance(value, (list, tuple)): for i, item in enumerate(value): child_path = f"{path}[{i}]" violations.extend(self.check_for_sensitive_paths(item, child_path)) return violations def contains_sensitive_path(self, value: Any) -> bool: """Quick check if a value contains any sensitive path patterns. Args: value: The value to check Returns: True if any sensitive pattern is found """ return len(self.check_for_sensitive_paths(value)) > 0 def sanitize_response( self, response: Any, replacement: str = "[REDACTED]", raise_on_violation: bool = True, ) -> Any: """Sanitize a response by removing or replacing sensitive paths. Args: response: The response to sanitize replacement: String to replace sensitive values with raise_on_violation: If True, raise SecurityError on violation Returns: Sanitized response Raises: PathLeakageError: If raise_on_violation=True and sensitive path found """ violations = self.check_for_sensitive_paths(response) if violations: logger.error( f"SECURITY: Path leakage detected in response: {violations}" ) if raise_on_violation: raise PathLeakageError( "Security violation: response contains sensitive path information", violations=violations, ) # If not raising, sanitize the response return self._sanitize_value(response, replacement) return response def _sanitize_value(self, value: Any, replacement: str) -> Any: """Recursively sanitize a value by replacing sensitive strings. Args: value: The value to sanitize replacement: String to replace sensitive values with Returns: Sanitized value """ if isinstance(value, str): result = value for pattern in self._compiled_patterns: if pattern.search(result): # Replace the entire string if it contains sensitive data return replacement return result elif isinstance(value, dict): return { k: self._sanitize_value(v, replacement) for k, v in value.items() } elif isinstance(value, list): return [self._sanitize_value(item, replacement) for item in value] elif isinstance(value, tuple): return tuple(self._sanitize_value(item, replacement) for item in value) return value def add_pattern(self, pattern: str) -> None: """Add a new sensitive pattern to check. Args: pattern: The pattern string to add """ if pattern not in self._patterns: self._patterns.append(pattern) try: self._compiled_patterns.append( re.compile(re.escape(pattern), re.IGNORECASE) ) except re.error as e: logger.warning(f"Invalid pattern '{pattern}': {e}") class PathLeakageError(Exception): """Exception raised when a response contains sensitive path information.""" def __init__(self, message: str, violations: Optional[List[str]] = None): super().__init__(message) self.violations = violations or [] # Global sanitizer instance - can be configured at startup _global_sanitizer: Optional[ResponseSanitizer] = None def get_response_sanitizer() -> ResponseSanitizer: """Get the global response sanitizer instance. Returns: The global ResponseSanitizer instance """ global _global_sanitizer if _global_sanitizer is None: _global_sanitizer = ResponseSanitizer() return _global_sanitizer def configure_response_sanitizer( sensitive_patterns: Optional[List[str]] = None, additional_patterns: Optional[List[str]] = None, ) -> ResponseSanitizer: """Configure the global response sanitizer. Args: sensitive_patterns: Custom list of patterns (replaces defaults) additional_patterns: Additional patterns to add to defaults Returns: The configured ResponseSanitizer instance """ global _global_sanitizer _global_sanitizer = ResponseSanitizer( sensitive_patterns=sensitive_patterns, additional_patterns=additional_patterns, ) return _global_sanitizer def sanitize_tool_response(response: Any, raise_on_violation: bool = True) -> Any: """Convenience function to sanitize a tool response. This should be called before returning any tool response to ensure no sensitive paths are leaked. Args: response: The tool response to sanitize raise_on_violation: If True, raise error on violation Returns: Sanitized response Raises: PathLeakageError: If raise_on_violation=True and sensitive path found """ return get_response_sanitizer().sanitize_response( response, raise_on_violation=raise_on_violation )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/answerlink/MCP-Workspace-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server