Skip to main content
Glama
file_service.py18.1 kB
"""File Service Module. Provides secure file operations with access control, validation, and intelligent file handling. """ import os import shutil import hashlib import mimetypes from pathlib import Path from typing import Dict, Any, Optional, List, Union, Tuple from datetime import datetime import json try: from core.config import get_config from core.exceptions import ( FileAccessError, SecurityError, ValidationError, ResourceError ) from core.types import FileInfo CORE_AVAILABLE = True except ImportError: CORE_AVAILABLE = False # Fallback types class FileInfo: def __init__(self, **kwargs): for k, v in kwargs.items(): setattr(self, k, v) class FileService: """Secure file service with access control.""" def __init__(self): """Initialize file service.""" self.config = get_config() if CORE_AVAILABLE else None self._setup_allowed_paths() self._setup_allowed_extensions() def _setup_allowed_paths(self): """Setup allowed file paths.""" # Default allowed paths self.allowed_paths = [ os.getcwd(), # Current working directory os.path.expanduser("~/Downloads"), os.path.expanduser("~/Documents"), "/tmp" ] # Add config-based paths if available if self.config and hasattr(self.config.security, 'allowed_paths'): self.allowed_paths.extend(self.config.security.allowed_paths) # Normalize paths self.allowed_paths = [os.path.abspath(path) for path in self.allowed_paths] def _setup_allowed_extensions(self): """Setup allowed file extensions.""" self.allowed_extensions = { '.xlsx', '.xls', '.csv', '.json', '.txt', '.html', '.xml', '.png', '.jpg', '.jpeg', '.gif', '.svg', '.pdf' } # Add config-based extensions if available if self.config and hasattr(self.config.security, 'allowed_extensions'): self.allowed_extensions.update(self.config.security.allowed_extensions) def validate_file_access(self, file_path: str, operation: str = 'read') -> bool: """Validate file access permissions. Args: file_path: Path to the file operation: Type of operation ('read', 'write', 'delete') Returns: True if access is allowed Raises: SecurityError: If access is denied """ try: abs_path = os.path.abspath(file_path) # Check if path is within allowed directories path_allowed = False for allowed_path in self.allowed_paths: if abs_path.startswith(allowed_path): path_allowed = True break if not path_allowed: raise SecurityError(operation="文件访问", reason=f"文件路径不在允许的目录中: {abs_path}") # Check file extension file_ext = Path(file_path).suffix.lower() if file_ext and file_ext not in self.allowed_extensions: raise SecurityError(operation="文件访问", reason=f"不允许的文件扩展名: {file_ext}") # Check file size for existing files if os.path.exists(abs_path): file_size = os.path.getsize(abs_path) max_size = self.config.security.max_file_size if self.config else 100 * 1024 * 1024 # 100MB default if file_size > max_size: raise SecurityError(operation="文件大小检查", reason=f"文件大小 ({file_size} 字节) 超过限制 ({max_size} 字节)") # Check operation permissions if operation == 'read' and os.path.exists(abs_path): if not os.access(abs_path, os.R_OK): raise SecurityError(operation="读取文件", reason=f"没有读取权限: {abs_path}") elif operation == 'write': parent_dir = os.path.dirname(abs_path) if not os.access(parent_dir, os.W_OK): raise SecurityError(operation="写入文件", reason=f"没有写入权限: {parent_dir}") elif operation == 'delete' and os.path.exists(abs_path): if not os.access(abs_path, os.W_OK): raise SecurityError(operation="删除文件", reason=f"没有删除权限: {abs_path}") return True except SecurityError: raise except Exception as e: if CORE_AVAILABLE: raise SecurityError(operation="文件访问验证", reason=f"文件访问验证失败: {e}") else: raise Exception(f"文件访问验证失败: {e}") def get_file_info(self, file_path: str) -> FileInfo: """Get comprehensive file information. Args: file_path: Path to the file Returns: FileInfo object with file details """ self.validate_file_access(file_path, 'read') try: path = Path(file_path) if not path.exists(): raise FileAccessError(file_path=file_path, reason="文件不存在") stat = path.stat() # Get MIME type mime_type, _ = mimetypes.guess_type(str(path)) # Calculate file hash for integrity file_hash = self._calculate_file_hash(str(path)) return FileInfo( path=str(path.absolute()), name=path.name, size=stat.st_size, extension=path.suffix.lower(), mime_type=mime_type, created_time=datetime.fromtimestamp(stat.st_ctime), modified_time=datetime.fromtimestamp(stat.st_mtime), accessed_time=datetime.fromtimestamp(stat.st_atime), is_readable=os.access(path, os.R_OK), is_writable=os.access(path, os.W_OK), is_executable=os.access(path, os.X_OK), file_hash=file_hash ) except Exception as e: if CORE_AVAILABLE: raise FileAccessError(file_path=file_path, reason=f"获取文件信息失败: {e}") else: raise Exception(f"获取文件信息失败: {e}") def read_file_content(self, file_path: str, encoding: str = 'utf-8') -> str: """Read file content safely. Args: file_path: Path to the file encoding: File encoding Returns: File content as string """ self.validate_file_access(file_path, 'read') try: with open(file_path, 'r', encoding=encoding) as f: return f.read() except UnicodeDecodeError: # Try different encodings for enc in ['gbk', 'gb2312', 'latin1']: try: with open(file_path, 'r', encoding=enc) as f: return f.read() except UnicodeDecodeError: continue raise FileAccessError(file_path=file_path, reason="无法解码文件") except Exception as e: if CORE_AVAILABLE: raise FileAccessError(file_path=file_path, reason=f"读取文件失败: {e}") else: raise Exception(f"读取文件失败: {e}") def write_file_content(self, file_path: str, content: str, encoding: str = 'utf-8', backup: bool = True) -> bool: """Write content to file safely. Args: file_path: Path to the file content: Content to write encoding: File encoding backup: Whether to create backup of existing file Returns: True if successful """ self.validate_file_access(file_path, 'write') try: # Create backup if file exists and backup is requested if backup and os.path.exists(file_path): backup_path = f"{file_path}.backup.{int(datetime.now().timestamp())}" shutil.copy2(file_path, backup_path) # Ensure parent directory exists os.makedirs(os.path.dirname(file_path), exist_ok=True) # Write content with open(file_path, 'w', encoding=encoding) as f: f.write(content) return True except Exception as e: if CORE_AVAILABLE: raise FileAccessError(file_path=file_path, reason=f"写入文件失败: {e}") else: raise Exception(f"写入文件失败: {e}") def copy_file(self, source_path: str, dest_path: str, overwrite: bool = False) -> bool: """Copy file safely. Args: source_path: Source file path dest_path: Destination file path overwrite: Whether to overwrite existing file Returns: True if successful """ self.validate_file_access(source_path, 'read') self.validate_file_access(dest_path, 'write') try: if os.path.exists(dest_path) and not overwrite: raise FileAccessError(file_path=dest_path, reason="目标文件已存在") # Ensure parent directory exists os.makedirs(os.path.dirname(dest_path), exist_ok=True) shutil.copy2(source_path, dest_path) return True except Exception as e: if CORE_AVAILABLE: raise FileAccessError(file_path=source_path, reason=f"复制文件失败: {e}") else: raise Exception(f"复制文件失败: {e}") def move_file(self, source_path: str, dest_path: str, overwrite: bool = False) -> bool: """Move file safely. Args: source_path: Source file path dest_path: Destination file path overwrite: Whether to overwrite existing file Returns: True if successful """ self.validate_file_access(source_path, 'write') # Need write to delete source self.validate_file_access(dest_path, 'write') try: if os.path.exists(dest_path) and not overwrite: raise FileAccessError(dest_path, "目标文件已存在") # Ensure parent directory exists os.makedirs(os.path.dirname(dest_path), exist_ok=True) shutil.move(source_path, dest_path) return True except Exception as e: if CORE_AVAILABLE: raise FileAccessError(file_path=source_path, reason=f"移动文件失败: {e}") else: raise Exception(f"移动文件失败: {e}") def delete_file(self, file_path: str, secure: bool = False) -> bool: """Delete file safely. Args: file_path: Path to the file secure: Whether to perform secure deletion Returns: True if successful """ self.validate_file_access(file_path, 'delete') try: if not os.path.exists(file_path): return True # Already deleted if secure: # Secure deletion by overwriting with random data file_size = os.path.getsize(file_path) with open(file_path, 'r+b') as f: f.write(os.urandom(file_size)) f.flush() os.fsync(f.fileno()) os.remove(file_path) return True except Exception as e: if CORE_AVAILABLE: raise FileAccessError(file_path=file_path, reason=f"删除文件失败: {e}") else: raise Exception(f"删除文件失败: {e}") def list_directory(self, dir_path: str, pattern: Optional[str] = None, recursive: bool = False) -> List[FileInfo]: """List directory contents safely. Args: dir_path: Directory path pattern: File pattern to match recursive: Whether to search recursively Returns: List of FileInfo objects """ self.validate_file_access(dir_path, 'read') try: if not os.path.isdir(dir_path): raise FileAccessError(file_path=dir_path, reason="不是有效的目录") files = [] if recursive: for root, dirs, filenames in os.walk(dir_path): for filename in filenames: file_path = os.path.join(root, filename) try: if pattern is None or self._match_pattern(filename, pattern): files.append(self.get_file_info(file_path)) except Exception: continue # Skip files that can't be accessed else: for item in os.listdir(dir_path): item_path = os.path.join(dir_path, item) try: if os.path.isfile(item_path): if pattern is None or self._match_pattern(item, pattern): files.append(self.get_file_info(item_path)) except Exception: continue # Skip files that can't be accessed return files except Exception as e: if CORE_AVAILABLE: raise FileAccessError(file_path=dir_path, reason=f"列出目录失败: {e}") else: raise Exception(f"列出目录失败: {e}") def create_directory(self, dir_path: str, parents: bool = True) -> bool: """Create directory safely. Args: dir_path: Directory path to create parents: Whether to create parent directories Returns: True if successful """ self.validate_file_access(dir_path, 'write') try: if parents: os.makedirs(dir_path, exist_ok=True) else: os.mkdir(dir_path) return True except Exception as e: if CORE_AVAILABLE: raise FileAccessError(file_path=dir_path, reason=f"创建目录失败: {e}") else: raise Exception(f"创建目录失败: {e}") def get_disk_usage(self, path: str) -> Dict[str, int]: """Get disk usage information. Args: path: Path to check Returns: Dictionary with total, used, and free space in bytes """ try: usage = shutil.disk_usage(path) return { 'total': usage.total, 'used': usage.total - usage.free, 'free': usage.free, 'usage_percentage': ((usage.total - usage.free) / usage.total) * 100 } except Exception as e: if CORE_AVAILABLE: raise ResourceError(resource_type="磁盘空间", limit="未知", current=f"获取失败: {e}") else: raise Exception(f"获取磁盘使用情况失败: {e}") def _calculate_file_hash(self, file_path: str, algorithm: str = 'md5') -> str: """Calculate file hash for integrity checking.""" hash_func = hashlib.new(algorithm) try: with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): hash_func.update(chunk) return hash_func.hexdigest() except Exception: return "" def _match_pattern(self, filename: str, pattern: str) -> bool: """Simple pattern matching for filenames.""" import fnmatch return fnmatch.fnmatch(filename.lower(), pattern.lower()) def cleanup_temp_files(self, max_age_hours: int = 24) -> int: """Clean up temporary files. Args: max_age_hours: Maximum age of files to keep Returns: Number of files cleaned up """ import time temp_dirs = ['/tmp', os.path.expanduser('~/tmp')] cleaned_count = 0 current_time = time.time() max_age_seconds = max_age_hours * 3600 for temp_dir in temp_dirs: if not os.path.exists(temp_dir): continue try: for item in os.listdir(temp_dir): item_path = os.path.join(temp_dir, item) if os.path.isfile(item_path): file_age = current_time - os.path.getmtime(item_path) if file_age > max_age_seconds: try: self.delete_file(item_path) cleaned_count += 1 except Exception: continue # Skip files that can't be deleted except Exception: continue # Skip directories that can't be accessed return cleaned_count

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Lillard01/chatExcel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server