Skip to main content
Glama

Gemini MCP Server

file_utils.py32.7 kB
""" File reading utilities with directory support and token management This module provides secure file access functionality for the MCP server. It implements critical security measures to prevent unauthorized file access and manages token limits to ensure efficient API usage. Key Features: - Path validation and sandboxing to prevent directory traversal attacks - Support for both individual files and recursive directory reading - Token counting and management to stay within API limits - Automatic file type detection and filtering - Comprehensive error handling with informative messages Security Model: - All file access is restricted to PROJECT_ROOT and its subdirectories - Absolute paths are required to prevent ambiguity - Symbolic links are resolved to ensure they stay within bounds CONVERSATION MEMORY INTEGRATION: This module works with the conversation memory system to support efficient multi-turn file handling: 1. DEDUPLICATION SUPPORT: - File reading functions are called by conversation-aware tools - Supports newest-first file prioritization by providing accurate token estimation - Enables efficient file content caching and token budget management 2. TOKEN BUDGET OPTIMIZATION: - Provides accurate token estimation for file content before reading - Supports the dual prioritization strategy by enabling precise budget calculations - Enables tools to make informed decisions about which files to include 3. CROSS-TOOL FILE PERSISTENCE: - File reading results are used across different tools in conversation chains - Consistent file access patterns support conversation continuation scenarios - Error handling preserves conversation flow when files become unavailable """ import json import logging import os from datetime import datetime, timezone from pathlib import Path from typing import Optional from .file_types import BINARY_EXTENSIONS, CODE_EXTENSIONS, IMAGE_EXTENSIONS, TEXT_EXTENSIONS from .security_config import EXCLUDED_DIRS, is_dangerous_path from .token_utils import DEFAULT_CONTEXT_WINDOW, estimate_tokens def _is_builtin_custom_models_config(path_str: str) -> bool: """ Check if path points to the server's built-in custom_models.json config file. This only matches the server's internal config, not user-specified CUSTOM_MODELS_CONFIG_PATH. We identify the built-in config by checking if it resolves to the server's conf directory. Args: path_str: Path to check Returns: True if this is the server's built-in custom_models.json config file """ try: path = Path(path_str) # Get the server root by going up from this file: utils/file_utils.py -> server_root server_root = Path(__file__).parent.parent builtin_config = server_root / "conf" / "custom_models.json" # Check if the path resolves to the same file as our built-in config # This handles both relative and absolute paths to the same file return path.resolve() == builtin_config.resolve() except Exception: # If path resolution fails, it's not our built-in config return False logger = logging.getLogger(__name__) def is_mcp_directory(path: Path) -> bool: """ Check if a directory is the MCP server's own directory. This prevents the MCP from including its own code when scanning projects where the MCP has been cloned as a subdirectory. Args: path: Directory path to check Returns: True if this is the MCP server directory or a subdirectory """ if not path.is_dir(): return False # Get the directory where the MCP server is running from # __file__ is utils/file_utils.py, so parent.parent is the MCP root mcp_server_dir = Path(__file__).parent.parent.resolve() # Check if the given path is the MCP server directory or a subdirectory try: path.resolve().relative_to(mcp_server_dir) logger.info(f"Detected MCP server directory at {path}, will exclude from scanning") return True except ValueError: # Not a subdirectory of MCP server return False def get_user_home_directory() -> Optional[Path]: """ Get the user's home directory. Returns: User's home directory path """ return Path.home() def is_home_directory_root(path: Path) -> bool: """ Check if the given path is the user's home directory root. This prevents scanning the entire home directory which could include sensitive data and non-project files. Args: path: Directory path to check Returns: True if this is the home directory root """ user_home = get_user_home_directory() if not user_home: return False try: resolved_path = path.resolve() resolved_home = user_home.resolve() # Check if this is exactly the home directory if resolved_path == resolved_home: logger.warning( f"Attempted to scan user home directory root: {path}. Please specify a subdirectory instead." ) return True # Also check common home directory patterns path_str = str(resolved_path).lower() home_patterns = [ "/users/", # macOS "/home/", # Linux "c:\\users\\", # Windows "c:/users/", # Windows with forward slashes ] for pattern in home_patterns: if pattern in path_str: # Extract the user directory path # e.g., /Users/fahad or /home/username parts = path_str.split(pattern) if len(parts) > 1: # Get the part after the pattern after_pattern = parts[1] # Check if we're at the user's root (no subdirectories) if "/" not in after_pattern and "\\" not in after_pattern: logger.warning( f"Attempted to scan user home directory root: {path}. " f"Please specify a subdirectory instead." ) return True except Exception as e: logger.debug(f"Error checking if path is home directory: {e}") return False def detect_file_type(file_path: str) -> str: """ Detect file type for appropriate processing strategy. This function is intended for specific file type handling (e.g., image processing, binary file analysis, or enhanced file filtering). Args: file_path: Path to the file to analyze Returns: str: "text", "binary", or "image" """ path = Path(file_path) # Check extension first (fast) extension = path.suffix.lower() if extension in TEXT_EXTENSIONS: return "text" elif extension in IMAGE_EXTENSIONS: return "image" elif extension in BINARY_EXTENSIONS: return "binary" # Fallback: check magic bytes for text vs binary # This is helpful for files without extensions or unknown extensions try: with open(path, "rb") as f: chunk = f.read(1024) # Simple heuristic: if we can decode as UTF-8, likely text chunk.decode("utf-8") return "text" except UnicodeDecodeError: return "binary" except (FileNotFoundError, PermissionError) as e: logger.warning(f"Could not access file {file_path} for type detection: {e}") return "unknown" def should_add_line_numbers(file_path: str, include_line_numbers: Optional[bool] = None) -> bool: """ Determine if line numbers should be added to a file. Args: file_path: Path to the file include_line_numbers: Explicit preference, or None for auto-detection Returns: bool: True if line numbers should be added """ if include_line_numbers is not None: return include_line_numbers # Default: DO NOT add line numbers # Tools that want line numbers must explicitly request them return False def _normalize_line_endings(content: str) -> str: """ Normalize line endings for consistent line numbering. Args: content: File content with potentially mixed line endings Returns: str: Content with normalized LF line endings """ # Normalize all line endings to LF for consistent counting return content.replace("\r\n", "\n").replace("\r", "\n") def _add_line_numbers(content: str) -> str: """ Add line numbers to text content for precise referencing. Args: content: Text content to number Returns: str: Content with line numbers in format " 45│ actual code line" Supports files up to 99,999 lines with dynamic width allocation """ # Normalize line endings first normalized_content = _normalize_line_endings(content) lines = normalized_content.split("\n") # Dynamic width allocation based on total line count # This supports files of any size by computing required width total_lines = len(lines) width = len(str(total_lines)) width = max(width, 4) # Minimum padding for readability # Format with dynamic width and clear separator numbered_lines = [f"{i + 1:{width}d}│ {line}" for i, line in enumerate(lines)] return "\n".join(numbered_lines) def resolve_and_validate_path(path_str: str) -> Path: """ Resolves and validates a path against security policies. This function ensures safe file access by: 1. Requiring absolute paths (no ambiguity) 2. Resolving symlinks to prevent deception 3. Blocking access to dangerous system directories Args: path_str: Path string (must be absolute) Returns: Resolved Path object that is safe to access Raises: ValueError: If path is not absolute or otherwise invalid PermissionError: If path is in a dangerous location """ # Step 1: Create a Path object user_path = Path(path_str) # Step 2: Security Policy - Require absolute paths # Relative paths could be interpreted differently depending on working directory if not user_path.is_absolute(): raise ValueError(f"Relative paths are not supported. Please provide an absolute path.\nReceived: {path_str}") # Step 3: Resolve the absolute path (follows symlinks, removes .. and .) # This is critical for security as it reveals the true destination of symlinks resolved_path = user_path.resolve() # Step 4: Check against dangerous paths if is_dangerous_path(resolved_path): logger.warning(f"Access denied - dangerous path: {resolved_path}") raise PermissionError(f"Access to system directory denied: {path_str}") # Step 5: Check if it's the home directory root if is_home_directory_root(resolved_path): raise PermissionError( f"Cannot scan entire home directory: {path_str}\n" f"Please specify a subdirectory within your home folder." ) return resolved_path def expand_paths(paths: list[str], extensions: Optional[set[str]] = None) -> list[str]: """ Expand paths to individual files, handling both files and directories. This function recursively walks directories to find all matching files. It automatically filters out hidden files and common non-code directories like __pycache__ to avoid including generated or system files. Args: paths: List of file or directory paths (must be absolute) extensions: Optional set of file extensions to include (defaults to CODE_EXTENSIONS) Returns: List of individual file paths, sorted for consistent ordering """ if extensions is None: extensions = CODE_EXTENSIONS expanded_files = [] seen = set() for path in paths: try: # Validate each path for security before processing path_obj = resolve_and_validate_path(path) except (ValueError, PermissionError): # Skip invalid paths silently to allow partial success continue if not path_obj.exists(): continue # Safety checks for directory scanning if path_obj.is_dir(): # Check 1: Prevent scanning user's home directory root if is_home_directory_root(path_obj): logger.warning(f"Skipping home directory root: {path}. Please specify a project subdirectory instead.") continue # Check 2: Skip if this is the MCP's own directory if is_mcp_directory(path_obj): logger.info( f"Skipping MCP server directory: {path}. The MCP server code is excluded from project scans." ) continue if path_obj.is_file(): # Add file directly if str(path_obj) not in seen: expanded_files.append(str(path_obj)) seen.add(str(path_obj)) elif path_obj.is_dir(): # Walk directory recursively to find all files for root, dirs, files in os.walk(path_obj): # Filter directories in-place to skip hidden and excluded directories # This prevents descending into .git, .venv, __pycache__, node_modules, etc. original_dirs = dirs[:] dirs[:] = [] for d in original_dirs: # Skip hidden directories if d.startswith("."): continue # Skip excluded directories if d in EXCLUDED_DIRS: continue # Skip MCP directories found during traversal dir_path = Path(root) / d if is_mcp_directory(dir_path): logger.debug(f"Skipping MCP directory during traversal: {dir_path}") continue dirs.append(d) for file in files: # Skip hidden files (e.g., .DS_Store, .gitignore) if file.startswith("."): continue file_path = Path(root) / file # Filter by extension if specified if not extensions or file_path.suffix.lower() in extensions: full_path = str(file_path) # Use set to prevent duplicates if full_path not in seen: expanded_files.append(full_path) seen.add(full_path) # Sort for consistent ordering across different runs # This makes output predictable and easier to debug expanded_files.sort() return expanded_files def read_file_content( file_path: str, max_size: int = 1_000_000, *, include_line_numbers: Optional[bool] = None ) -> tuple[str, int]: """ Read a single file and format it for inclusion in AI prompts. This function handles various error conditions gracefully and always returns formatted content, even for errors. This ensures the AI model gets context about what files were attempted but couldn't be read. Args: file_path: Path to file (must be absolute) max_size: Maximum file size to read (default 1MB to prevent memory issues) include_line_numbers: Whether to add line numbers. If None, auto-detects based on file type Returns: Tuple of (formatted_content, estimated_tokens) Content is wrapped with clear delimiters for AI parsing """ logger.debug(f"[FILES] read_file_content called for: {file_path}") try: # Validate path security before any file operations path = resolve_and_validate_path(file_path) logger.debug(f"[FILES] Path validated and resolved: {path}") except (ValueError, PermissionError) as e: # Return error in a format that provides context to the AI logger.debug(f"[FILES] Path validation failed for {file_path}: {type(e).__name__}: {e}") error_msg = str(e) content = f"\n--- ERROR ACCESSING FILE: {file_path} ---\nError: {error_msg}\n--- END FILE ---\n" tokens = estimate_tokens(content) logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens") return content, tokens try: # Validate file existence and type if not path.exists(): logger.debug(f"[FILES] File does not exist: {file_path}") content = f"\n--- FILE NOT FOUND: {file_path} ---\nError: File does not exist\n--- END FILE ---\n" return content, estimate_tokens(content) if not path.is_file(): logger.debug(f"[FILES] Path is not a file: {file_path}") content = f"\n--- NOT A FILE: {file_path} ---\nError: Path is not a file\n--- END FILE ---\n" return content, estimate_tokens(content) # Check file size to prevent memory exhaustion stat_result = path.stat() file_size = stat_result.st_size logger.debug(f"[FILES] File size for {file_path}: {file_size:,} bytes") if file_size > max_size: logger.debug(f"[FILES] File too large: {file_path} ({file_size:,} > {max_size:,} bytes)") modified_at = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") content = ( f"\n--- FILE TOO LARGE: {file_path} (Last modified: {modified_at}) ---\n" f"File size: {file_size:,} bytes (max: {max_size:,})\n" "--- END FILE ---\n" ) return content, estimate_tokens(content) # Determine if we should add line numbers add_line_numbers = should_add_line_numbers(file_path, include_line_numbers) logger.debug(f"[FILES] Line numbers for {file_path}: {'enabled' if add_line_numbers else 'disabled'}") # Read the file with UTF-8 encoding, replacing invalid characters # This ensures we can handle files with mixed encodings logger.debug(f"[FILES] Reading file content for {file_path}") with open(path, encoding="utf-8", errors="replace") as f: file_content = f.read() logger.debug(f"[FILES] Successfully read {len(file_content)} characters from {file_path}") # Add line numbers if requested or auto-detected if add_line_numbers: file_content = _add_line_numbers(file_content) logger.debug(f"[FILES] Added line numbers to {file_path}") else: # Still normalize line endings for consistency file_content = _normalize_line_endings(file_content) # Format with clear delimiters that help the AI understand file boundaries # Using consistent markers makes it easier for the model to parse # NOTE: These markers ("--- BEGIN FILE: ... ---") are distinct from git diff markers # ("--- BEGIN DIFF: ... ---") to allow AI to distinguish between complete file content # vs. partial diff content when files appear in both sections modified_at = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") formatted = ( f"\n--- BEGIN FILE: {file_path} (Last modified: {modified_at}) ---\n" f"{file_content}\n" f"--- END FILE: {file_path} ---\n" ) tokens = estimate_tokens(formatted) logger.debug(f"[FILES] Formatted content for {file_path}: {len(formatted)} chars, {tokens} tokens") return formatted, tokens except Exception as e: logger.debug(f"[FILES] Exception reading file {file_path}: {type(e).__name__}: {e}") content = f"\n--- ERROR READING FILE: {file_path} ---\nError: {str(e)}\n--- END FILE ---\n" tokens = estimate_tokens(content) logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens") return content, tokens def read_files( file_paths: list[str], code: Optional[str] = None, max_tokens: Optional[int] = None, reserve_tokens: int = 50_000, *, include_line_numbers: bool = False, ) -> str: """ Read multiple files and optional direct code with smart token management. This function implements intelligent token budgeting to maximize the amount of relevant content that can be included in an AI prompt while staying within token limits. It prioritizes direct code and reads files until the token budget is exhausted. Args: file_paths: List of file or directory paths (absolute paths required) code: Optional direct code to include (prioritized over files) max_tokens: Maximum tokens to use (defaults to DEFAULT_CONTEXT_WINDOW) reserve_tokens: Tokens to reserve for prompt and response (default 50K) include_line_numbers: Whether to add line numbers to file content Returns: str: All file contents formatted for AI consumption """ if max_tokens is None: max_tokens = DEFAULT_CONTEXT_WINDOW logger.debug(f"[FILES] read_files called with {len(file_paths)} paths") logger.debug( f"[FILES] Token budget: max={max_tokens:,}, reserve={reserve_tokens:,}, available={max_tokens - reserve_tokens:,}" ) content_parts = [] total_tokens = 0 available_tokens = max_tokens - reserve_tokens files_skipped = [] # Priority 1: Handle direct code if provided # Direct code is prioritized because it's explicitly provided by the user if code: formatted_code = f"\n--- BEGIN DIRECT CODE ---\n{code}\n--- END DIRECT CODE ---\n" code_tokens = estimate_tokens(formatted_code) if code_tokens <= available_tokens: content_parts.append(formatted_code) total_tokens += code_tokens available_tokens -= code_tokens # Priority 2: Process file paths if file_paths: # Expand directories to get all individual files logger.debug(f"[FILES] Expanding {len(file_paths)} file paths") all_files = expand_paths(file_paths) logger.debug(f"[FILES] After expansion: {len(all_files)} individual files") if not all_files and file_paths: # No files found but paths were provided logger.debug("[FILES] No files found from provided paths") content_parts.append(f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n") else: # Read files sequentially until token limit is reached logger.debug(f"[FILES] Reading {len(all_files)} files with token budget {available_tokens:,}") for i, file_path in enumerate(all_files): if total_tokens >= available_tokens: logger.debug(f"[FILES] Token budget exhausted, skipping remaining {len(all_files) - i} files") files_skipped.extend(all_files[i:]) break file_content, file_tokens = read_file_content(file_path, include_line_numbers=include_line_numbers) logger.debug(f"[FILES] File {file_path}: {file_tokens:,} tokens") # Check if adding this file would exceed limit if total_tokens + file_tokens <= available_tokens: content_parts.append(file_content) total_tokens += file_tokens logger.debug(f"[FILES] Added file {file_path}, total tokens: {total_tokens:,}") else: # File too large for remaining budget logger.debug( f"[FILES] File {file_path} too large for remaining budget ({file_tokens:,} tokens, {available_tokens - total_tokens:,} remaining)" ) files_skipped.append(file_path) # Add informative note about skipped files to help users understand # what was omitted and why if files_skipped: logger.debug(f"[FILES] {len(files_skipped)} files skipped due to token limits") skip_note = "\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n" skip_note += f"Total skipped: {len(files_skipped)}\n" # Show first 10 skipped files as examples for _i, file_path in enumerate(files_skipped[:10]): skip_note += f" - {file_path}\n" if len(files_skipped) > 10: skip_note += f" ... and {len(files_skipped) - 10} more\n" skip_note += "--- END SKIPPED FILES ---\n" content_parts.append(skip_note) result = "\n\n".join(content_parts) if content_parts else "" logger.debug(f"[FILES] read_files complete: {len(result)} chars, {total_tokens:,} tokens used") return result def estimate_file_tokens(file_path: str) -> int: """ Estimate tokens for a file using file-type aware ratios. Args: file_path: Path to the file Returns: Estimated token count for the file """ try: if not os.path.exists(file_path) or not os.path.isfile(file_path): return 0 file_size = os.path.getsize(file_path) # Get the appropriate ratio for this file type from .file_types import get_token_estimation_ratio ratio = get_token_estimation_ratio(file_path) return int(file_size / ratio) except Exception: return 0 def check_files_size_limit(files: list[str], max_tokens: int, threshold_percent: float = 1.0) -> tuple[bool, int, int]: """ Check if a list of files would exceed token limits. Args: files: List of file paths to check max_tokens: Maximum allowed tokens threshold_percent: Percentage of max_tokens to use as threshold (0.0-1.0) Returns: Tuple of (within_limit, total_estimated_tokens, file_count) """ if not files: return True, 0, 0 total_estimated_tokens = 0 file_count = 0 threshold = int(max_tokens * threshold_percent) for file_path in files: try: estimated_tokens = estimate_file_tokens(file_path) total_estimated_tokens += estimated_tokens if estimated_tokens > 0: # Only count accessible files file_count += 1 except Exception: # Skip files that can't be accessed for size check continue within_limit = total_estimated_tokens <= threshold return within_limit, total_estimated_tokens, file_count def read_json_file(file_path: str) -> Optional[dict]: """ Read and parse a JSON file with proper error handling. Args: file_path: Path to the JSON file Returns: Parsed JSON data as dict, or None if file doesn't exist or invalid """ try: if not os.path.exists(file_path): return None with open(file_path, encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError): return None def write_json_file(file_path: str, data: dict, indent: int = 2) -> bool: """ Write data to a JSON file with proper formatting. Args: file_path: Path to write the JSON file data: Dictionary data to serialize indent: JSON indentation level Returns: True if successful, False otherwise """ try: os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=indent, ensure_ascii=False) return True except (OSError, TypeError): return False def get_file_size(file_path: str) -> int: """ Get file size in bytes with proper error handling. Args: file_path: Path to the file Returns: File size in bytes, or 0 if file doesn't exist or error """ try: if os.path.exists(file_path) and os.path.isfile(file_path): return os.path.getsize(file_path) return 0 except OSError: return 0 def ensure_directory_exists(file_path: str) -> bool: """ Ensure the parent directory of a file path exists. Args: file_path: Path to file (directory will be created for parent) Returns: True if directory exists or was created, False on error """ try: directory = os.path.dirname(file_path) if directory: os.makedirs(directory, exist_ok=True) return True except OSError: return False def is_text_file(file_path: str) -> bool: """ Check if a file is likely a text file based on extension and content. Args: file_path: Path to the file Returns: True if file appears to be text, False otherwise """ from .file_types import is_text_file as check_text_type return check_text_type(file_path) def read_file_safely(file_path: str, max_size: int = 10 * 1024 * 1024) -> Optional[str]: """ Read a file with size limits and encoding handling. Args: file_path: Path to the file max_size: Maximum file size in bytes (default 10MB) Returns: File content as string, or None if file too large or unreadable """ try: if not os.path.exists(file_path) or not os.path.isfile(file_path): return None file_size = os.path.getsize(file_path) if file_size > max_size: return None with open(file_path, encoding="utf-8", errors="ignore") as f: return f.read() except OSError: return None def check_total_file_size(files: list[str], model_name: str) -> Optional[dict]: """ Check if total file sizes would exceed token threshold before embedding. IMPORTANT: This performs STRICT REJECTION at MCP boundary. No partial inclusion - either all files fit or request is rejected. This forces the CLI to make better file selection decisions. This function MUST be called with the effective model name (after resolution). It should never receive 'auto' or None - model resolution happens earlier. Args: files: List of file paths to check model_name: The resolved model name for context-aware thresholds (required) Returns: Dict with `code_too_large` response if too large, None if acceptable """ if not files: return None # Validate we have a proper model name (not auto or None) if not model_name or model_name.lower() == "auto": raise ValueError( f"check_total_file_size called with unresolved model: '{model_name}'. " "Model must be resolved before file size checking." ) logger.info(f"File size check: Using model '{model_name}' for token limit calculation") from utils.model_context import ModelContext model_context = ModelContext(model_name) token_allocation = model_context.calculate_token_allocation() # Dynamic threshold based on model capacity context_window = token_allocation.total_tokens if context_window >= 1_000_000: # Gemini-class models threshold_percent = 0.8 # Can be more generous elif context_window >= 500_000: # Mid-range models threshold_percent = 0.7 # Moderate else: # OpenAI-class models (200K) threshold_percent = 0.6 # Conservative max_file_tokens = int(token_allocation.file_tokens * threshold_percent) # Use centralized file size checking (threshold already applied to max_file_tokens) within_limit, total_estimated_tokens, file_count = check_files_size_limit(files, max_file_tokens) if not within_limit: return { "status": "code_too_large", "content": ( f"The selected files are too large for analysis " f"(estimated {total_estimated_tokens:,} tokens, limit {max_file_tokens:,}). " f"Please select fewer, more specific files that are most relevant " f"to your question, then invoke the tool again." ), "content_type": "text", "metadata": { "total_estimated_tokens": total_estimated_tokens, "limit": max_file_tokens, "file_count": file_count, "threshold_percent": threshold_percent, "model_context_window": context_window, "model_name": model_name, "instructions": "Reduce file selection and try again - all files must fit within budget. If this persists, please use a model with a larger context window where available.", }, } return None # Proceed with ALL files

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BeehiveInnovations/gemini-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server