file_utils.py•32.7 kB
"""
File reading utilities with directory support and token management
This module provides secure file access functionality for the MCP server.
It implements critical security measures to prevent unauthorized file access
and manages token limits to ensure efficient API usage.
Key Features:
- Path validation and sandboxing to prevent directory traversal attacks
- Support for both individual files and recursive directory reading
- Token counting and management to stay within API limits
- Automatic file type detection and filtering
- Comprehensive error handling with informative messages
Security Model:
- All file access is restricted to PROJECT_ROOT and its subdirectories
- Absolute paths are required to prevent ambiguity
- Symbolic links are resolved to ensure they stay within bounds
CONVERSATION MEMORY INTEGRATION:
This module works with the conversation memory system to support efficient
multi-turn file handling:
1. DEDUPLICATION SUPPORT:
- File reading functions are called by conversation-aware tools
- Supports newest-first file prioritization by providing accurate token estimation
- Enables efficient file content caching and token budget management
2. TOKEN BUDGET OPTIMIZATION:
- Provides accurate token estimation for file content before reading
- Supports the dual prioritization strategy by enabling precise budget calculations
- Enables tools to make informed decisions about which files to include
3. CROSS-TOOL FILE PERSISTENCE:
- File reading results are used across different tools in conversation chains
- Consistent file access patterns support conversation continuation scenarios
- Error handling preserves conversation flow when files become unavailable
"""
import json
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from .file_types import BINARY_EXTENSIONS, CODE_EXTENSIONS, IMAGE_EXTENSIONS, TEXT_EXTENSIONS
from .security_config import EXCLUDED_DIRS, is_dangerous_path
from .token_utils import DEFAULT_CONTEXT_WINDOW, estimate_tokens
def _is_builtin_custom_models_config(path_str: str) -> bool:
"""
Check if path points to the server's built-in custom_models.json config file.
This only matches the server's internal config, not user-specified CUSTOM_MODELS_CONFIG_PATH.
We identify the built-in config by checking if it resolves to the server's conf directory.
Args:
path_str: Path to check
Returns:
True if this is the server's built-in custom_models.json config file
"""
try:
path = Path(path_str)
# Get the server root by going up from this file: utils/file_utils.py -> server_root
server_root = Path(__file__).parent.parent
builtin_config = server_root / "conf" / "custom_models.json"
# Check if the path resolves to the same file as our built-in config
# This handles both relative and absolute paths to the same file
return path.resolve() == builtin_config.resolve()
except Exception:
# If path resolution fails, it's not our built-in config
return False
logger = logging.getLogger(__name__)
def is_mcp_directory(path: Path) -> bool:
"""
Check if a directory is the MCP server's own directory.
This prevents the MCP from including its own code when scanning projects
where the MCP has been cloned as a subdirectory.
Args:
path: Directory path to check
Returns:
True if this is the MCP server directory or a subdirectory
"""
if not path.is_dir():
return False
# Get the directory where the MCP server is running from
# __file__ is utils/file_utils.py, so parent.parent is the MCP root
mcp_server_dir = Path(__file__).parent.parent.resolve()
# Check if the given path is the MCP server directory or a subdirectory
try:
path.resolve().relative_to(mcp_server_dir)
logger.info(f"Detected MCP server directory at {path}, will exclude from scanning")
return True
except ValueError:
# Not a subdirectory of MCP server
return False
def get_user_home_directory() -> Optional[Path]:
"""
Get the user's home directory.
Returns:
User's home directory path
"""
return Path.home()
def is_home_directory_root(path: Path) -> bool:
"""
Check if the given path is the user's home directory root.
This prevents scanning the entire home directory which could include
sensitive data and non-project files.
Args:
path: Directory path to check
Returns:
True if this is the home directory root
"""
user_home = get_user_home_directory()
if not user_home:
return False
try:
resolved_path = path.resolve()
resolved_home = user_home.resolve()
# Check if this is exactly the home directory
if resolved_path == resolved_home:
logger.warning(
f"Attempted to scan user home directory root: {path}. Please specify a subdirectory instead."
)
return True
# Also check common home directory patterns
path_str = str(resolved_path).lower()
home_patterns = [
"/users/", # macOS
"/home/", # Linux
"c:\\users\\", # Windows
"c:/users/", # Windows with forward slashes
]
for pattern in home_patterns:
if pattern in path_str:
# Extract the user directory path
# e.g., /Users/fahad or /home/username
parts = path_str.split(pattern)
if len(parts) > 1:
# Get the part after the pattern
after_pattern = parts[1]
# Check if we're at the user's root (no subdirectories)
if "/" not in after_pattern and "\\" not in after_pattern:
logger.warning(
f"Attempted to scan user home directory root: {path}. "
f"Please specify a subdirectory instead."
)
return True
except Exception as e:
logger.debug(f"Error checking if path is home directory: {e}")
return False
def detect_file_type(file_path: str) -> str:
"""
Detect file type for appropriate processing strategy.
This function is intended for specific file type handling (e.g., image processing,
binary file analysis, or enhanced file filtering).
Args:
file_path: Path to the file to analyze
Returns:
str: "text", "binary", or "image"
"""
path = Path(file_path)
# Check extension first (fast)
extension = path.suffix.lower()
if extension in TEXT_EXTENSIONS:
return "text"
elif extension in IMAGE_EXTENSIONS:
return "image"
elif extension in BINARY_EXTENSIONS:
return "binary"
# Fallback: check magic bytes for text vs binary
# This is helpful for files without extensions or unknown extensions
try:
with open(path, "rb") as f:
chunk = f.read(1024)
# Simple heuristic: if we can decode as UTF-8, likely text
chunk.decode("utf-8")
return "text"
except UnicodeDecodeError:
return "binary"
except (FileNotFoundError, PermissionError) as e:
logger.warning(f"Could not access file {file_path} for type detection: {e}")
return "unknown"
def should_add_line_numbers(file_path: str, include_line_numbers: Optional[bool] = None) -> bool:
"""
Determine if line numbers should be added to a file.
Args:
file_path: Path to the file
include_line_numbers: Explicit preference, or None for auto-detection
Returns:
bool: True if line numbers should be added
"""
if include_line_numbers is not None:
return include_line_numbers
# Default: DO NOT add line numbers
# Tools that want line numbers must explicitly request them
return False
def _normalize_line_endings(content: str) -> str:
"""
Normalize line endings for consistent line numbering.
Args:
content: File content with potentially mixed line endings
Returns:
str: Content with normalized LF line endings
"""
# Normalize all line endings to LF for consistent counting
return content.replace("\r\n", "\n").replace("\r", "\n")
def _add_line_numbers(content: str) -> str:
"""
Add line numbers to text content for precise referencing.
Args:
content: Text content to number
Returns:
str: Content with line numbers in format " 45│ actual code line"
Supports files up to 99,999 lines with dynamic width allocation
"""
# Normalize line endings first
normalized_content = _normalize_line_endings(content)
lines = normalized_content.split("\n")
# Dynamic width allocation based on total line count
# This supports files of any size by computing required width
total_lines = len(lines)
width = len(str(total_lines))
width = max(width, 4) # Minimum padding for readability
# Format with dynamic width and clear separator
numbered_lines = [f"{i + 1:{width}d}│ {line}" for i, line in enumerate(lines)]
return "\n".join(numbered_lines)
def resolve_and_validate_path(path_str: str) -> Path:
"""
Resolves and validates a path against security policies.
This function ensures safe file access by:
1. Requiring absolute paths (no ambiguity)
2. Resolving symlinks to prevent deception
3. Blocking access to dangerous system directories
Args:
path_str: Path string (must be absolute)
Returns:
Resolved Path object that is safe to access
Raises:
ValueError: If path is not absolute or otherwise invalid
PermissionError: If path is in a dangerous location
"""
# Step 1: Create a Path object
user_path = Path(path_str)
# Step 2: Security Policy - Require absolute paths
# Relative paths could be interpreted differently depending on working directory
if not user_path.is_absolute():
raise ValueError(f"Relative paths are not supported. Please provide an absolute path.\nReceived: {path_str}")
# Step 3: Resolve the absolute path (follows symlinks, removes .. and .)
# This is critical for security as it reveals the true destination of symlinks
resolved_path = user_path.resolve()
# Step 4: Check against dangerous paths
if is_dangerous_path(resolved_path):
logger.warning(f"Access denied - dangerous path: {resolved_path}")
raise PermissionError(f"Access to system directory denied: {path_str}")
# Step 5: Check if it's the home directory root
if is_home_directory_root(resolved_path):
raise PermissionError(
f"Cannot scan entire home directory: {path_str}\n" f"Please specify a subdirectory within your home folder."
)
return resolved_path
def expand_paths(paths: list[str], extensions: Optional[set[str]] = None) -> list[str]:
"""
Expand paths to individual files, handling both files and directories.
This function recursively walks directories to find all matching files.
It automatically filters out hidden files and common non-code directories
like __pycache__ to avoid including generated or system files.
Args:
paths: List of file or directory paths (must be absolute)
extensions: Optional set of file extensions to include (defaults to CODE_EXTENSIONS)
Returns:
List of individual file paths, sorted for consistent ordering
"""
if extensions is None:
extensions = CODE_EXTENSIONS
expanded_files = []
seen = set()
for path in paths:
try:
# Validate each path for security before processing
path_obj = resolve_and_validate_path(path)
except (ValueError, PermissionError):
# Skip invalid paths silently to allow partial success
continue
if not path_obj.exists():
continue
# Safety checks for directory scanning
if path_obj.is_dir():
# Check 1: Prevent scanning user's home directory root
if is_home_directory_root(path_obj):
logger.warning(f"Skipping home directory root: {path}. Please specify a project subdirectory instead.")
continue
# Check 2: Skip if this is the MCP's own directory
if is_mcp_directory(path_obj):
logger.info(
f"Skipping MCP server directory: {path}. The MCP server code is excluded from project scans."
)
continue
if path_obj.is_file():
# Add file directly
if str(path_obj) not in seen:
expanded_files.append(str(path_obj))
seen.add(str(path_obj))
elif path_obj.is_dir():
# Walk directory recursively to find all files
for root, dirs, files in os.walk(path_obj):
# Filter directories in-place to skip hidden and excluded directories
# This prevents descending into .git, .venv, __pycache__, node_modules, etc.
original_dirs = dirs[:]
dirs[:] = []
for d in original_dirs:
# Skip hidden directories
if d.startswith("."):
continue
# Skip excluded directories
if d in EXCLUDED_DIRS:
continue
# Skip MCP directories found during traversal
dir_path = Path(root) / d
if is_mcp_directory(dir_path):
logger.debug(f"Skipping MCP directory during traversal: {dir_path}")
continue
dirs.append(d)
for file in files:
# Skip hidden files (e.g., .DS_Store, .gitignore)
if file.startswith("."):
continue
file_path = Path(root) / file
# Filter by extension if specified
if not extensions or file_path.suffix.lower() in extensions:
full_path = str(file_path)
# Use set to prevent duplicates
if full_path not in seen:
expanded_files.append(full_path)
seen.add(full_path)
# Sort for consistent ordering across different runs
# This makes output predictable and easier to debug
expanded_files.sort()
return expanded_files
def read_file_content(
file_path: str, max_size: int = 1_000_000, *, include_line_numbers: Optional[bool] = None
) -> tuple[str, int]:
"""
Read a single file and format it for inclusion in AI prompts.
This function handles various error conditions gracefully and always
returns formatted content, even for errors. This ensures the AI model
gets context about what files were attempted but couldn't be read.
Args:
file_path: Path to file (must be absolute)
max_size: Maximum file size to read (default 1MB to prevent memory issues)
include_line_numbers: Whether to add line numbers. If None, auto-detects based on file type
Returns:
Tuple of (formatted_content, estimated_tokens)
Content is wrapped with clear delimiters for AI parsing
"""
logger.debug(f"[FILES] read_file_content called for: {file_path}")
try:
# Validate path security before any file operations
path = resolve_and_validate_path(file_path)
logger.debug(f"[FILES] Path validated and resolved: {path}")
except (ValueError, PermissionError) as e:
# Return error in a format that provides context to the AI
logger.debug(f"[FILES] Path validation failed for {file_path}: {type(e).__name__}: {e}")
error_msg = str(e)
content = f"\n--- ERROR ACCESSING FILE: {file_path} ---\nError: {error_msg}\n--- END FILE ---\n"
tokens = estimate_tokens(content)
logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens")
return content, tokens
try:
# Validate file existence and type
if not path.exists():
logger.debug(f"[FILES] File does not exist: {file_path}")
content = f"\n--- FILE NOT FOUND: {file_path} ---\nError: File does not exist\n--- END FILE ---\n"
return content, estimate_tokens(content)
if not path.is_file():
logger.debug(f"[FILES] Path is not a file: {file_path}")
content = f"\n--- NOT A FILE: {file_path} ---\nError: Path is not a file\n--- END FILE ---\n"
return content, estimate_tokens(content)
# Check file size to prevent memory exhaustion
stat_result = path.stat()
file_size = stat_result.st_size
logger.debug(f"[FILES] File size for {file_path}: {file_size:,} bytes")
if file_size > max_size:
logger.debug(f"[FILES] File too large: {file_path} ({file_size:,} > {max_size:,} bytes)")
modified_at = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
content = (
f"\n--- FILE TOO LARGE: {file_path} (Last modified: {modified_at}) ---\n"
f"File size: {file_size:,} bytes (max: {max_size:,})\n"
"--- END FILE ---\n"
)
return content, estimate_tokens(content)
# Determine if we should add line numbers
add_line_numbers = should_add_line_numbers(file_path, include_line_numbers)
logger.debug(f"[FILES] Line numbers for {file_path}: {'enabled' if add_line_numbers else 'disabled'}")
# Read the file with UTF-8 encoding, replacing invalid characters
# This ensures we can handle files with mixed encodings
logger.debug(f"[FILES] Reading file content for {file_path}")
with open(path, encoding="utf-8", errors="replace") as f:
file_content = f.read()
logger.debug(f"[FILES] Successfully read {len(file_content)} characters from {file_path}")
# Add line numbers if requested or auto-detected
if add_line_numbers:
file_content = _add_line_numbers(file_content)
logger.debug(f"[FILES] Added line numbers to {file_path}")
else:
# Still normalize line endings for consistency
file_content = _normalize_line_endings(file_content)
# Format with clear delimiters that help the AI understand file boundaries
# Using consistent markers makes it easier for the model to parse
# NOTE: These markers ("--- BEGIN FILE: ... ---") are distinct from git diff markers
# ("--- BEGIN DIFF: ... ---") to allow AI to distinguish between complete file content
# vs. partial diff content when files appear in both sections
modified_at = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
formatted = (
f"\n--- BEGIN FILE: {file_path} (Last modified: {modified_at}) ---\n"
f"{file_content}\n"
f"--- END FILE: {file_path} ---\n"
)
tokens = estimate_tokens(formatted)
logger.debug(f"[FILES] Formatted content for {file_path}: {len(formatted)} chars, {tokens} tokens")
return formatted, tokens
except Exception as e:
logger.debug(f"[FILES] Exception reading file {file_path}: {type(e).__name__}: {e}")
content = f"\n--- ERROR READING FILE: {file_path} ---\nError: {str(e)}\n--- END FILE ---\n"
tokens = estimate_tokens(content)
logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens")
return content, tokens
def read_files(
file_paths: list[str],
code: Optional[str] = None,
max_tokens: Optional[int] = None,
reserve_tokens: int = 50_000,
*,
include_line_numbers: bool = False,
) -> str:
"""
Read multiple files and optional direct code with smart token management.
This function implements intelligent token budgeting to maximize the amount
of relevant content that can be included in an AI prompt while staying
within token limits. It prioritizes direct code and reads files until
the token budget is exhausted.
Args:
file_paths: List of file or directory paths (absolute paths required)
code: Optional direct code to include (prioritized over files)
max_tokens: Maximum tokens to use (defaults to DEFAULT_CONTEXT_WINDOW)
reserve_tokens: Tokens to reserve for prompt and response (default 50K)
include_line_numbers: Whether to add line numbers to file content
Returns:
str: All file contents formatted for AI consumption
"""
if max_tokens is None:
max_tokens = DEFAULT_CONTEXT_WINDOW
logger.debug(f"[FILES] read_files called with {len(file_paths)} paths")
logger.debug(
f"[FILES] Token budget: max={max_tokens:,}, reserve={reserve_tokens:,}, available={max_tokens - reserve_tokens:,}"
)
content_parts = []
total_tokens = 0
available_tokens = max_tokens - reserve_tokens
files_skipped = []
# Priority 1: Handle direct code if provided
# Direct code is prioritized because it's explicitly provided by the user
if code:
formatted_code = f"\n--- BEGIN DIRECT CODE ---\n{code}\n--- END DIRECT CODE ---\n"
code_tokens = estimate_tokens(formatted_code)
if code_tokens <= available_tokens:
content_parts.append(formatted_code)
total_tokens += code_tokens
available_tokens -= code_tokens
# Priority 2: Process file paths
if file_paths:
# Expand directories to get all individual files
logger.debug(f"[FILES] Expanding {len(file_paths)} file paths")
all_files = expand_paths(file_paths)
logger.debug(f"[FILES] After expansion: {len(all_files)} individual files")
if not all_files and file_paths:
# No files found but paths were provided
logger.debug("[FILES] No files found from provided paths")
content_parts.append(f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n")
else:
# Read files sequentially until token limit is reached
logger.debug(f"[FILES] Reading {len(all_files)} files with token budget {available_tokens:,}")
for i, file_path in enumerate(all_files):
if total_tokens >= available_tokens:
logger.debug(f"[FILES] Token budget exhausted, skipping remaining {len(all_files) - i} files")
files_skipped.extend(all_files[i:])
break
file_content, file_tokens = read_file_content(file_path, include_line_numbers=include_line_numbers)
logger.debug(f"[FILES] File {file_path}: {file_tokens:,} tokens")
# Check if adding this file would exceed limit
if total_tokens + file_tokens <= available_tokens:
content_parts.append(file_content)
total_tokens += file_tokens
logger.debug(f"[FILES] Added file {file_path}, total tokens: {total_tokens:,}")
else:
# File too large for remaining budget
logger.debug(
f"[FILES] File {file_path} too large for remaining budget ({file_tokens:,} tokens, {available_tokens - total_tokens:,} remaining)"
)
files_skipped.append(file_path)
# Add informative note about skipped files to help users understand
# what was omitted and why
if files_skipped:
logger.debug(f"[FILES] {len(files_skipped)} files skipped due to token limits")
skip_note = "\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n"
skip_note += f"Total skipped: {len(files_skipped)}\n"
# Show first 10 skipped files as examples
for _i, file_path in enumerate(files_skipped[:10]):
skip_note += f" - {file_path}\n"
if len(files_skipped) > 10:
skip_note += f" ... and {len(files_skipped) - 10} more\n"
skip_note += "--- END SKIPPED FILES ---\n"
content_parts.append(skip_note)
result = "\n\n".join(content_parts) if content_parts else ""
logger.debug(f"[FILES] read_files complete: {len(result)} chars, {total_tokens:,} tokens used")
return result
def estimate_file_tokens(file_path: str) -> int:
"""
Estimate tokens for a file using file-type aware ratios.
Args:
file_path: Path to the file
Returns:
Estimated token count for the file
"""
try:
if not os.path.exists(file_path) or not os.path.isfile(file_path):
return 0
file_size = os.path.getsize(file_path)
# Get the appropriate ratio for this file type
from .file_types import get_token_estimation_ratio
ratio = get_token_estimation_ratio(file_path)
return int(file_size / ratio)
except Exception:
return 0
def check_files_size_limit(files: list[str], max_tokens: int, threshold_percent: float = 1.0) -> tuple[bool, int, int]:
"""
Check if a list of files would exceed token limits.
Args:
files: List of file paths to check
max_tokens: Maximum allowed tokens
threshold_percent: Percentage of max_tokens to use as threshold (0.0-1.0)
Returns:
Tuple of (within_limit, total_estimated_tokens, file_count)
"""
if not files:
return True, 0, 0
total_estimated_tokens = 0
file_count = 0
threshold = int(max_tokens * threshold_percent)
for file_path in files:
try:
estimated_tokens = estimate_file_tokens(file_path)
total_estimated_tokens += estimated_tokens
if estimated_tokens > 0: # Only count accessible files
file_count += 1
except Exception:
# Skip files that can't be accessed for size check
continue
within_limit = total_estimated_tokens <= threshold
return within_limit, total_estimated_tokens, file_count
def read_json_file(file_path: str) -> Optional[dict]:
"""
Read and parse a JSON file with proper error handling.
Args:
file_path: Path to the JSON file
Returns:
Parsed JSON data as dict, or None if file doesn't exist or invalid
"""
try:
if not os.path.exists(file_path):
return None
with open(file_path, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
def write_json_file(file_path: str, data: dict, indent: int = 2) -> bool:
"""
Write data to a JSON file with proper formatting.
Args:
file_path: Path to write the JSON file
data: Dictionary data to serialize
indent: JSON indentation level
Returns:
True if successful, False otherwise
"""
try:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=indent, ensure_ascii=False)
return True
except (OSError, TypeError):
return False
def get_file_size(file_path: str) -> int:
"""
Get file size in bytes with proper error handling.
Args:
file_path: Path to the file
Returns:
File size in bytes, or 0 if file doesn't exist or error
"""
try:
if os.path.exists(file_path) and os.path.isfile(file_path):
return os.path.getsize(file_path)
return 0
except OSError:
return 0
def ensure_directory_exists(file_path: str) -> bool:
"""
Ensure the parent directory of a file path exists.
Args:
file_path: Path to file (directory will be created for parent)
Returns:
True if directory exists or was created, False on error
"""
try:
directory = os.path.dirname(file_path)
if directory:
os.makedirs(directory, exist_ok=True)
return True
except OSError:
return False
def is_text_file(file_path: str) -> bool:
"""
Check if a file is likely a text file based on extension and content.
Args:
file_path: Path to the file
Returns:
True if file appears to be text, False otherwise
"""
from .file_types import is_text_file as check_text_type
return check_text_type(file_path)
def read_file_safely(file_path: str, max_size: int = 10 * 1024 * 1024) -> Optional[str]:
"""
Read a file with size limits and encoding handling.
Args:
file_path: Path to the file
max_size: Maximum file size in bytes (default 10MB)
Returns:
File content as string, or None if file too large or unreadable
"""
try:
if not os.path.exists(file_path) or not os.path.isfile(file_path):
return None
file_size = os.path.getsize(file_path)
if file_size > max_size:
return None
with open(file_path, encoding="utf-8", errors="ignore") as f:
return f.read()
except OSError:
return None
def check_total_file_size(files: list[str], model_name: str) -> Optional[dict]:
"""
Check if total file sizes would exceed token threshold before embedding.
IMPORTANT: This performs STRICT REJECTION at MCP boundary.
No partial inclusion - either all files fit or request is rejected.
This forces the CLI to make better file selection decisions.
This function MUST be called with the effective model name (after resolution).
It should never receive 'auto' or None - model resolution happens earlier.
Args:
files: List of file paths to check
model_name: The resolved model name for context-aware thresholds (required)
Returns:
Dict with `code_too_large` response if too large, None if acceptable
"""
if not files:
return None
# Validate we have a proper model name (not auto or None)
if not model_name or model_name.lower() == "auto":
raise ValueError(
f"check_total_file_size called with unresolved model: '{model_name}'. "
"Model must be resolved before file size checking."
)
logger.info(f"File size check: Using model '{model_name}' for token limit calculation")
from utils.model_context import ModelContext
model_context = ModelContext(model_name)
token_allocation = model_context.calculate_token_allocation()
# Dynamic threshold based on model capacity
context_window = token_allocation.total_tokens
if context_window >= 1_000_000: # Gemini-class models
threshold_percent = 0.8 # Can be more generous
elif context_window >= 500_000: # Mid-range models
threshold_percent = 0.7 # Moderate
else: # OpenAI-class models (200K)
threshold_percent = 0.6 # Conservative
max_file_tokens = int(token_allocation.file_tokens * threshold_percent)
# Use centralized file size checking (threshold already applied to max_file_tokens)
within_limit, total_estimated_tokens, file_count = check_files_size_limit(files, max_file_tokens)
if not within_limit:
return {
"status": "code_too_large",
"content": (
f"The selected files are too large for analysis "
f"(estimated {total_estimated_tokens:,} tokens, limit {max_file_tokens:,}). "
f"Please select fewer, more specific files that are most relevant "
f"to your question, then invoke the tool again."
),
"content_type": "text",
"metadata": {
"total_estimated_tokens": total_estimated_tokens,
"limit": max_file_tokens,
"file_count": file_count,
"threshold_percent": threshold_percent,
"model_context_window": context_window,
"model_name": model_name,
"instructions": "Reduce file selection and try again - all files must fit within budget. If this persists, please use a model with a larger context window where available.",
},
}
return None # Proceed with ALL files