Skip to main content
Glama
files.pyโ€ข7.45 kB
""" Smart file handling with conversation memory integration Supports directory expansion, deduplication, and multiple handling modes """ import os import logging from pathlib import Path from typing import Dict, List, Optional, Tuple, Union from config import Config from utils.security import validate_paths, is_safe_path from utils.tokens import estimate_tokens logger = logging.getLogger(__name__) def expand_paths(paths: List[str]) -> List[str]: """ Expand directory paths to individual file paths Args: paths: List of file or directory paths Returns: List of individual file paths """ expanded = [] config = Config() for path in paths: abs_path = Path(path).resolve() if abs_path.is_file(): expanded.append(str(abs_path)) elif abs_path.is_dir(): # Recursively find files in directory for root, dirs, files in os.walk(abs_path): # Filter excluded directories dirs[:] = [d for d in dirs if d not in config.EXCLUDED_DIRS] for file in files: file_path = Path(root) / file if file_path.suffix in config.ALLOWED_FILE_EXTENSIONS: expanded.append(str(file_path)) else: logger.warning(f"Path not found: {path}") return expanded def read_files( file_paths: List[str], mode: str = "embedded", max_tokens: Optional[int] = None ) -> Union[Dict[str, str], Dict[str, dict]]: """ Read multiple files with smart handling modes Args: file_paths: List of absolute file paths mode: How to handle files - "embedded", "summary", or "reference" max_tokens: Maximum tokens to read (for embedded mode) Returns: Dict mapping file path to content (or metadata for summary/reference modes) """ if mode == "embedded": return read_files_embedded(file_paths, max_tokens) elif mode == "summary": return read_files_summary(file_paths) elif mode == "reference": return read_files_reference(file_paths) else: raise ValueError(f"Unknown file handling mode: {mode}") def read_files_embedded(file_paths: List[str], max_tokens: Optional[int] = None) -> Dict[str, str]: """ Read files and return full content (traditional mode) Args: file_paths: List of absolute file paths max_tokens: Maximum tokens to read across all files Returns: Dict mapping file path to full content """ contents = {} config = Config() total_tokens = 0 # Prioritize files by modification time (newest first) file_info = [] for path in file_paths: abs_path = Path(path).resolve() if abs_path.exists() and abs_path.is_file(): mtime = abs_path.stat().st_mtime file_info.append((str(abs_path), mtime)) # Sort by modification time (newest first) file_info.sort(key=lambda x: x[1], reverse=True) for path, _ in file_info: try: abs_path = Path(path) # Security checks if not is_safe_path(abs_path): logger.warning(f"Unsafe path skipped: {abs_path}") continue # Size check size = abs_path.stat().st_size if size > config.MAX_FILE_SIZE: logger.warning(f"File too large ({size} bytes): {abs_path}") contents[str(abs_path)] = f"[File too large: {size:,} bytes]" continue # Extension check if abs_path.suffix not in config.ALLOWED_FILE_EXTENSIONS: logger.warning(f"Unsupported file type: {abs_path}") continue # Read file content with open(abs_path, "r", encoding="utf-8", errors="ignore") as f: content = f.read() # Token budget check if max_tokens: content_tokens = estimate_tokens(content) if total_tokens + content_tokens > max_tokens: logger.warning(f"Token budget exceeded, skipping remaining files") break total_tokens += content_tokens contents[str(abs_path)] = content logger.debug(f"Read file: {abs_path} ({len(content):,} chars)") except Exception as e: logger.error(f"Error reading {path}: {e}") contents[path] = f"[Error reading file: {e}]" return contents def read_files_summary(file_paths: List[str]) -> Dict[str, dict]: """ Read files and return summaries instead of full content Args: file_paths: List of absolute file paths Returns: Dict mapping file path to summary metadata """ summaries = {} for path in file_paths: try: abs_path = Path(path).resolve() if not abs_path.exists() or not abs_path.is_file(): continue # Get file info stat_info = abs_path.stat() size = stat_info.st_size # Read first few lines for summary with open(abs_path, "r", encoding="utf-8", errors="ignore") as f: lines = f.readlines() # Generate summary summary = { "size_bytes": size, "line_count": len(lines), "file_type": abs_path.suffix, "preview": "".join(lines[:10]).strip(), # First 10 lines "last_modified": stat_info.st_mtime, } # Add language-specific info if abs_path.suffix == ".py": import_lines = [line.strip() for line in lines if line.strip().startswith(("import ", "from "))] class_lines = [line.strip() for line in lines if line.strip().startswith("class ")] func_lines = [line.strip() for line in lines if line.strip().startswith("def ")] summary.update( {"imports": len(import_lines), "classes": len(class_lines), "functions": len(func_lines)} ) summaries[str(abs_path)] = summary except Exception as e: logger.error(f"Error summarizing {path}: {e}") summaries[path] = {"error": str(e)} return summaries def read_files_reference(file_paths: List[str]) -> Dict[str, dict]: """ Store files in reference system and return reference IDs Args: file_paths: List of absolute file paths Returns: Dict mapping file path to reference metadata """ references = {} # This would integrate with a file storage system # For now, return metadata that indicates files are "stored" for path in file_paths: try: abs_path = Path(path).resolve() if abs_path.exists() and abs_path.is_file(): ref_id = f"ref_{hash(str(abs_path)) % 10000:04d}" references[str(abs_path)] = { "reference_id": ref_id, "stored": True, "size": abs_path.stat().st_size, "type": abs_path.suffix, } else: references[path] = {"error": "File not found"} except Exception as e: logger.error(f"Error referencing {path}: {e}") references[path] = {"error": str(e)} return references

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/david-strejc/sage-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server