Skip to main content
Glama

MCP Database Filesystem

by ppengit
MIT License
  • Apple
  • Linux
filesystem.py18 kB
"""Filesystem operations for MCP server.""" import logging import os import shutil from pathlib import Path from typing import Any, Dict, List, Optional, Union import aiofiles import asyncio from datetime import datetime from .config import get_config logger = logging.getLogger(__name__) class FilesystemSecurityError(Exception): """Raised when a filesystem operation fails security checks.""" pass class FilesystemOperationError(Exception): """Raised when a filesystem operation fails.""" pass class FilesystemManager: """Manages filesystem operations with security controls.""" def __init__(self): self._validate_configuration() self._log_configuration() def _validate_configuration(self) -> None: """Validate filesystem configuration.""" config = get_config() # Log current access mode if config.filesystem.is_full_access_mode: logger.warning("Filesystem in FULL ACCESS mode - can access all disk files!") logger.warning("Consider restricting access by setting FS_ALLOWED_PATHS for production use") else: logger.info(f"Filesystem access restricted to {len(config.filesystem.allowed_paths)} allowed paths") # Validate allowed paths exist and are accessible for path_str in config.filesystem.allowed_paths: if path_str == '*': # Skip wildcard continue path = Path(path_str) if not path.exists(): logger.warning(f"Allowed path does not exist: {path}") elif not path.is_dir(): logger.warning(f"Allowed path is not a directory: {path}") else: logger.debug(f"Validated allowed path: {path}") def _log_configuration(self) -> None: """Log current filesystem configuration for debugging.""" config = get_config() logger.info("Filesystem Configuration Summary:") logger.info(f" Access Mode: {'FULL ACCESS' if config.filesystem.is_full_access_mode else 'RESTRICTED'}") logger.info(f" Allowed Paths: {config.filesystem.allowed_paths if config.filesystem.allowed_paths else 'ALL (full access)'}") logger.info(f" Max File Size: {config.filesystem.max_file_size / (1024*1024):.1f} MB") logger.info(f" Allowed Extensions: {config.filesystem.allowed_extensions if config.filesystem.allowed_extensions else 'ALL'}") logger.info(f" Write Enabled: {config.filesystem.enable_write}") logger.info(f" Delete Enabled: {config.filesystem.enable_delete}") logger.info(f" Ignore File Locks: {config.filesystem.ignore_file_locks}") def _is_path_allowed(self, file_path: Union[str, Path]) -> bool: """Check if a path is allowed based on configuration.""" config = get_config() file_path = Path(file_path).resolve() # Check if we're in full access mode if config.filesystem.is_full_access_mode: logger.debug(f"Path allowed: {file_path} (full access mode - no restrictions)") return True # Check if path is under any allowed path for allowed_path_str in config.filesystem.allowed_paths: # Skip wildcard entries (already handled above) if allowed_path_str == '*': continue allowed_path = Path(allowed_path_str).resolve() try: file_path.relative_to(allowed_path) logger.debug(f"Path allowed: {file_path} is under {allowed_path}") return True # Path is under an allowed directory except ValueError: continue # Path is not under this allowed directory logger.debug(f"Path not allowed: {file_path} is not under any allowed directory") return False # Path is not under any allowed directory def _is_extension_allowed(self, file_path: Union[str, Path]) -> bool: """Check if file extension is allowed.""" config = get_config() file_path = Path(file_path) extension = file_path.suffix.lower() # If allowed extensions specified, check if extension is in the list if config.filesystem.allowed_extensions: # Check for wildcard (allow all) if '*.*' in config.filesystem.allowed_extensions or '*' in config.filesystem.allowed_extensions: logger.debug(f"Extension allowed: {extension} (wildcard mode - all extensions allowed)") return True # Check specific extensions allowed = extension in config.filesystem.allowed_extensions logger.debug(f"Extension {'allowed' if allowed else 'not allowed'}: {extension}") return allowed # No restrictions configured - allow all extensions (full access mode) logger.debug(f"Extension allowed: {extension} (no restrictions configured)") return True def _validate_file_operation(self, file_path: Union[str, Path], operation: str) -> None: """Validate file operation for security.""" config = get_config() file_path = Path(file_path) # Check if path is allowed if not self._is_path_allowed(file_path): raise FilesystemSecurityError(f"Path not allowed: {file_path}") # Check file extension if not self._is_extension_allowed(file_path): raise FilesystemSecurityError(f"File extension not allowed: {file_path.suffix}") # Check write operations if operation in ['write', 'create', 'move', 'copy'] and not config.filesystem.enable_write: raise FilesystemSecurityError("Write operations are disabled") # Check delete operations if operation == 'delete' and not config.filesystem.enable_delete: raise FilesystemSecurityError("Delete operations are disabled") logger.debug(f"File operation validated: {operation} on {file_path}") def read_file(self, file_path: Union[str, Path], encoding: str = 'utf-8') -> str: """Read file content.""" file_path = Path(file_path) self._validate_file_operation(file_path, 'read') try: if not file_path.exists(): raise FilesystemOperationError(f"File does not exist: {file_path}") if not file_path.is_file(): raise FilesystemOperationError(f"Path is not a file: {file_path}") # Check file size config = get_config() file_size = file_path.stat().st_size if file_size > config.filesystem.max_file_size: raise FilesystemOperationError( f"File too large: {file_size} bytes (max: {config.filesystem.max_file_size})" ) # Try to read file, with option to ignore locks content = None last_error = None # First try normal read try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() except (PermissionError, OSError) as e: last_error = e if config.filesystem.ignore_file_locks: logger.warning(f"File may be locked, trying alternative read methods: {file_path}") # Try reading with different sharing modes on Windows try: import msvcrt import os # Try opening with shared read/write access fd = os.open(file_path, os.O_RDONLY | os.O_BINARY) try: with os.fdopen(fd, 'rb') as f: raw_content = f.read() content = raw_content.decode(encoding) finally: # fd is closed by fdopen context manager pass except (ImportError, OSError, UnicodeDecodeError): # Fallback: try reading in binary mode and decode try: with open(file_path, 'rb') as f: raw_content = f.read() content = raw_content.decode(encoding, errors='replace') except Exception: # If all methods fail, raise the original error raise last_error else: raise last_error if content is None: raise FilesystemOperationError(f"Failed to read file: {last_error}") logger.info(f"File read successfully: {file_path} ({file_size} bytes)") return content except Exception as e: logger.error(f"Failed to read file {file_path}: {e}") raise FilesystemOperationError(f"Failed to read file: {e}") def write_file(self, file_path: Union[str, Path], content: str, encoding: str = 'utf-8', create_dirs: bool = True) -> None: """Write content to file.""" file_path = Path(file_path) self._validate_file_operation(file_path, 'write') try: # Check content size config = get_config() content_size = len(content.encode(encoding)) if content_size > config.filesystem.max_file_size: raise FilesystemOperationError( f"Content too large: {content_size} bytes (max: {config.filesystem.max_file_size})" ) # Create parent directories if needed if create_dirs: file_path.parent.mkdir(parents=True, exist_ok=True) # Try to write file, with option to ignore locks last_error = None # First try normal write try: with open(file_path, 'w', encoding=encoding) as f: f.write(content) except (PermissionError, OSError) as e: last_error = e if config.filesystem.ignore_file_locks: logger.warning(f"File may be locked, trying alternative write methods: {file_path}") # Try writing with different sharing modes on Windows try: import tempfile import shutil # Write to temporary file first, then replace temp_dir = file_path.parent with tempfile.NamedTemporaryFile(mode='w', encoding=encoding, dir=temp_dir, delete=False, suffix='.tmp') as temp_file: temp_file.write(content) temp_path = temp_file.name # Try to replace the original file try: if file_path.exists(): # On Windows, we might need to remove the target first file_path.unlink() shutil.move(temp_path, file_path) except Exception: # Clean up temp file if replacement failed Path(temp_path).unlink(missing_ok=True) raise except Exception: # If all methods fail, raise the original error raise last_error else: raise last_error logger.info(f"File written successfully: {file_path} ({content_size} bytes)") except Exception as e: logger.error(f"Failed to write file {file_path}: {e}") raise FilesystemOperationError(f"Failed to write file: {e}") def list_directory(self, dir_path: Union[str, Path], recursive: bool = False) -> List[Dict[str, Any]]: """List directory contents.""" dir_path = Path(dir_path) self._validate_file_operation(dir_path, 'read') try: if not dir_path.exists(): raise FilesystemOperationError(f"Directory does not exist: {dir_path}") if not dir_path.is_dir(): raise FilesystemOperationError(f"Path is not a directory: {dir_path}") items = [] if recursive: for item in dir_path.rglob('*'): if self._is_path_allowed(item): items.append(self._get_file_info(item)) else: for item in dir_path.iterdir(): if self._is_path_allowed(item): items.append(self._get_file_info(item)) logger.info(f"Directory listed successfully: {dir_path} ({len(items)} items)") return items except Exception as e: logger.error(f"Failed to list directory {dir_path}: {e}") raise FilesystemOperationError(f"Failed to list directory: {e}") def _get_file_info(self, file_path: Path) -> Dict[str, Any]: """Get file information.""" try: stat = file_path.stat() return { 'name': file_path.name, 'path': str(file_path), 'type': 'directory' if file_path.is_dir() else 'file', 'size': stat.st_size, 'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(), 'created': datetime.fromtimestamp(stat.st_ctime).isoformat(), 'extension': file_path.suffix.lower() if file_path.is_file() else None, } except Exception as e: logger.warning(f"Failed to get file info for {file_path}: {e}") return { 'name': file_path.name, 'path': str(file_path), 'type': 'unknown', 'error': str(e) } def create_directory(self, dir_path: Union[str, Path], parents: bool = True) -> None: """Create directory.""" dir_path = Path(dir_path) self._validate_file_operation(dir_path, 'create') try: dir_path.mkdir(parents=parents, exist_ok=True) logger.info(f"Directory created successfully: {dir_path}") except Exception as e: logger.error(f"Failed to create directory {dir_path}: {e}") raise FilesystemOperationError(f"Failed to create directory: {e}") def delete_file(self, file_path: Union[str, Path]) -> None: """Delete file with enhanced error handling.""" import time import stat file_path = Path(file_path) self._validate_file_operation(file_path, 'delete') try: if not file_path.exists(): raise FilesystemOperationError(f"File does not exist: {file_path}") if not file_path.is_file(): raise FilesystemOperationError(f"Path is not a file: {file_path}") # 尝试删除文件,如果失败则尝试修改权限后重试 max_retries = 3 for attempt in range(max_retries): try: # 尝试删除文件 file_path.unlink() logger.info(f"File deleted successfully: {file_path}") return except PermissionError as pe: if attempt < max_retries - 1: logger.warning(f"Permission denied on attempt {attempt + 1}, trying to fix permissions: {pe}") try: # 尝试修改文件权限 file_path.chmod(stat.S_IWRITE | stat.S_IREAD) time.sleep(0.1) # 短暂等待 continue except Exception as chmod_error: logger.warning(f"Failed to change file permissions: {chmod_error}") else: raise FilesystemOperationError(f"Permission denied: Cannot delete file {file_path}. File may be in use or you may not have sufficient permissions.") except OSError as ose: if attempt < max_retries - 1: logger.warning(f"OS error on attempt {attempt + 1}: {ose}") time.sleep(0.1) # 短暂等待后重试 continue else: # 提供更详细的错误信息 if "being used by another process" in str(ose).lower(): raise FilesystemOperationError(f"File is being used by another process: {file_path}") elif "access is denied" in str(ose).lower(): raise FilesystemOperationError(f"Access denied: Cannot delete file {file_path}. Check file permissions.") else: raise FilesystemOperationError(f"Cannot delete file {file_path}: {ose}") except FilesystemOperationError: # 重新抛出我们自己的错误 raise except Exception as e: logger.error(f"Unexpected error deleting file {file_path}: {e}") raise FilesystemOperationError(f"Unexpected error deleting file: {e}") # Global filesystem manager instance fs_manager = FilesystemManager()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ppengit/mcp-db-filesystem'

If you have feedback or need assistance with the MCP directory API, please join our Discord server