Skip to main content
Glama

Scribe MCP Server

by paxocial
files.py20.5 kB
"""File utilities for Scribe MCP with bulletproof atomic operations.""" from __future__ import annotations import asyncio import hashlib import json import os import shutil import tempfile import time from collections import deque from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Union from scribe_mcp.config.settings import settings from scribe_mcp.security.sandbox import safe_file_operation # Cross-platform file locking try: import msvcrt HAS_WINDOWS_LOCK = True except ImportError: HAS_WINDOWS_LOCK = False # Try to import portalocker as fallback try: import portalocker HAS_PORTALOCKER = True except ImportError: HAS_PORTALOCKER = False # POSIX fcntl - conditional import to prevent Windows crashes try: import fcntl HAS_FCNTL = True except ImportError: HAS_FCNTL = False def _ensure_safe_path(path: Union[str, Path], operation: str = "read", context: Optional[Dict[str, Any]] = None) -> Path: """ Validate path access against the repo sandbox before performing operations. """ repo_root = settings.project_root return safe_file_operation(repo_root, Path(path), operation=operation, context=context or {"component": "files"}) class FileLockError(Exception): """Raised when file locking fails.""" pass class AtomicFileError(Exception): """Raised when atomic file operation fails.""" pass @contextmanager def file_lock(file_path: Union[str, Path], mode: str = 'r+', timeout: float = 30.0): """ Cross-platform file locking context manager. Note: This locks a sibling '.lock' file, not the target file itself. This design prevents accidental "optimization" that would compromise the cross-platform locking mechanism. The lock file coordinates access while the actual file handle is yielded for operations. Args: file_path: Path to the file to lock (creates sibling .lock file) mode: File mode for opening the target file timeout: Lock timeout in seconds Yields: File handle with exclusive lock held on sibling .lock file Raises: FileLockError: If lock cannot be acquired within timeout """ file_path = _ensure_safe_path(file_path, operation="lock") lock_file = file_path.with_suffix(file_path.suffix + '.lock') # Create lock file lock_file.touch(exist_ok=True) lock_fd = None file_handle = None lock_acquired = False portalocker_lock = None try: # Try Windows locking first if available if HAS_WINDOWS_LOCK and os.name == 'nt': lock_fd = os.open(lock_file, os.O_RDWR | os.O_CREAT) try: msvcrt.locking(lock_fd, msvcrt.LK_NBLCK, 1) lock_acquired = True except OSError: pass # Fallback to portalocker if Windows lock failed or not available if not lock_acquired and HAS_PORTALOCKER: try: portalocker_lock = portalocker.Lock(lock_file, 'r+', timeout=timeout) portalocker_lock.acquire() # Acquire and hold across yield lock_acquired = True except portalocker.LockException: if portalocker_lock: try: portalocker_lock.release() except Exception: pass portalocker_lock = None # Final fallback to POSIX fcntl if not lock_acquired and HAS_FCNTL: lock_fd = os.open(lock_file, os.O_RDWR | os.O_CREAT) start_time = time.time() while True: try: fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) lock_acquired = True break except (IOError, OSError): if time.time() - start_time > timeout: raise FileLockError(f"Could not acquire lock on {file_path} within {timeout}s") time.sleep(0.1) if not lock_acquired: raise FileLockError(f"Could not acquire lock on {file_path} within {timeout}s - no locking mechanism available") # Open the actual file file_handle = open(file_path, mode, encoding='utf-8') yield file_handle finally: # Release lock based on what we acquired if file_handle: file_handle.close() # Release portalocker lock last if portalocker_lock: try: portalocker_lock.release() except Exception: pass if lock_fd is not None: try: if HAS_WINDOWS_LOCK and os.name == 'nt': msvcrt.locking(lock_fd, msvcrt.LK_UNLCK, 1) os.close(lock_fd) except OSError: pass class WriteAheadLog: """ Write-Ahead Log for crash recovery. Every operation is first written to the journal before being applied to the main file. On startup, any uncommitted operations are replayed. """ def __init__(self, log_path: Union[str, Path]): safe_log = _ensure_safe_path(log_path, operation="append", context={"component": "wal"}) self.log_path = safe_log self.journal_path = _ensure_safe_path( safe_log.with_suffix(safe_log.suffix + '.journal'), operation="append", context={"component": "wal", "type": "journal"} ) def write_entry(self, entry: Dict[str, Any]) -> str: """ Write an entry to the journal. Args: entry: Dictionary containing operation details Returns: Entry ID (timestamp + hash) """ entry_id = f"{datetime.now(timezone.utc).isoformat()}_{hashlib.sha256(json.dumps(entry, sort_keys=True).encode()).hexdigest()[:8]}" entry['id'] = entry_id entry['timestamp'] = datetime.now(timezone.utc).isoformat() journal_line = json.dumps(entry) + '\n' with file_lock(self.journal_path, 'a') as f: f.write(journal_line) f.flush() os.fsync(f.fileno()) return entry_id def commit_entry(self, entry_id: str): """Mark an entry as committed in the journal.""" commit_entry = { 'op': 'commit', 'ref_id': entry_id, 'timestamp': datetime.now(timezone.utc).isoformat() } with file_lock(self.journal_path, 'a') as f: f.write(json.dumps(commit_entry) + '\n') f.flush() os.fsync(f.fileno()) def replay_uncommitted(self) -> int: """ Replay any uncommitted entries from the journal. Returns: Number of entries replayed """ if not self.journal_path.exists(): return 0 replayed = 0 uncommitted = [] with file_lock(self.journal_path, 'r') as f: for line in f: try: entry = json.loads(line.strip()) if entry.get('op') == 'append' and 'ref_id' not in entry: uncommitted.append(entry) elif entry.get('op') == 'commit': # Remove committed entries from uncommitted list ref_id = entry.get('ref_id') uncommitted = [e for e in uncommitted if e.get('id') != ref_id] except json.JSONDecodeError: continue # Replay uncommitted entries for entry in uncommitted: if entry.get('op') == 'append': content = entry.get('content', '') try: self._apply_append(content) self.commit_entry(entry['id']) replayed += 1 except Exception as e: print(f"Warning: Failed to replay journal entry {entry['id']}: {e}") return replayed def _apply_append(self, content: str): """Apply an append operation to the main log.""" with file_lock(self.log_path, 'a') as f: f.write(content) f.flush() os.fsync(f.fileno()) def atomic_write(file_path: Union[str, Path], content: str, mode: str = 'w') -> None: """ Atomically write content to a file. Write to temporary file first, then atomic rename. Note: This function only supports overwrite mode ('w'). For atomic append operations, use WriteAheadLog which provides proper WAL-based appending. Args: file_path: Target file path content: Content to write mode: Write mode - must be 'w' (overwrite) for atomic operations Raises: AtomicFileError: If atomic operation fails ValueError: If mode is not 'w' (only overwrite is atomic) """ file_path = _ensure_safe_path(file_path, operation="write", context={"component": "files", "op": "atomic_write"}) # Validate mode - only overwrite is atomic if mode != 'w': raise ValueError(f"atomic_write only supports mode='w' (overwrite), got mode='{mode}'. Use WriteAheadLog for atomic append operations.") # Ensure parent directory exists file_path.parent.mkdir(parents=True, exist_ok=True) # Create temporary file in same directory temp_path = file_path.with_suffix(file_path.suffix + '.tmp') try: with open(temp_path, 'w', encoding='utf-8') as f: f.write(content) f.flush() os.fsync(f.fileno()) # Atomic rename temp_path.replace(file_path) # Sync parent directory dir_fd = os.open(file_path.parent, os.O_RDONLY) try: os.fsync(dir_fd) finally: os.close(dir_fd) except Exception as e: # Clean up temp file on failure if temp_path.exists(): temp_path.unlink() raise AtomicFileError(f"Atomic write failed: {e}") async def async_atomic_write(file_path: Union[str, Path], content: str, mode: str = 'w') -> None: """ Asynchronously atomically write content to a file. This is the async version of atomic_write that maintains all bulletproof reliability while providing async compatibility for the codebase. Uses asyncio.to_thread() to execute the proven synchronous atomic_write implementation without blocking the event loop. Args: file_path: Target file path content: Content to write mode: Write mode - must be 'w' (overwrite) for atomic operations Raises: AtomicFileError: If atomic operation fails ValueError: If mode is not 'w' (only overwrite is atomic) """ await asyncio.to_thread(atomic_write, file_path, content, mode) def preflight_backup(file_path: Union[str, Path]) -> Path: """ Create a preflight backup of the file. Args: file_path: File to backup Returns: Path to the backup file """ file_path = _ensure_safe_path(file_path, operation="read", context={"component": "files", "op": "backup"}) if not file_path.exists(): raise AtomicFileError(f"Cannot backup non-existent file: {file_path}") timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f")[:-3] backup_path = file_path.with_suffix(f".preflight-{timestamp}.bak") shutil.copy2(file_path, backup_path) return backup_path def verify_file_integrity(file_path: Union[str, Path]) -> Dict[str, Any]: """ Verify file integrity and return metadata. Args: file_path: File to verify Returns: Dictionary with integrity information """ file_path = _ensure_safe_path(file_path, operation="read", context={"component": "files", "op": "verify"}) if not file_path.exists(): return { 'exists': False, 'error': 'File does not exist' } try: stat = file_path.stat() # Calculate SHA-256 hash sha256_hash = hashlib.sha256() with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): sha256_hash.update(chunk) # Count lines if it's a text file line_count = None try: with open(file_path, 'r', encoding='utf-8') as f: line_count = sum(1 for _ in f) except UnicodeDecodeError: # Binary file pass return { 'exists': True, 'size_bytes': stat.st_size, 'size_mb': round(stat.st_size / (1024 * 1024), 3), 'line_count': line_count, 'sha256': sha256_hash.hexdigest(), 'modified_time': datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), 'created_time': datetime.fromtimestamp(stat.st_ctime, tz=timezone.utc).isoformat() } except Exception as e: return { 'exists': True, 'error': str(e) } async def ensure_parent(path: Path) -> None: """Create parent directories for the given path if missing.""" safe_path = _ensure_safe_path(path, operation="mkdir", context={"component": "files"}) await asyncio.to_thread(safe_path.parent.mkdir, parents=True, exist_ok=True) async def append_line(path: Path, line: str, use_wal: bool = True) -> None: """ Bulletproof append a single line to the provided file path. Args: path: File path to append to line: Line content to append use_wal: Whether to use Write-Ahead Log for crash safety """ path = _ensure_safe_path(path, operation="append", context={"component": "logs"}) await ensure_parent(path) if use_wal: await asyncio.to_thread(_write_line_with_wal, path, line) else: await asyncio.to_thread(_write_line, path, line) def _write_line_with_wal(path: Path, line: str) -> None: """Write line with Write-Ahead Log for crash safety.""" path = _ensure_safe_path(path, operation="append", context={"component": "wal"}) wal = WriteAheadLog(path) # Journal the operation first entry_id = wal.write_entry({ 'op': 'append', 'content': line if line.endswith('\n') else line + '\n', 'file_path': str(path) }) try: # Apply the operation with file locking with file_lock(path, 'a') as f: f.write(line) if not line.endswith("\n"): f.write("\n") f.flush() os.fsync(f.fileno()) # Commit the operation wal.commit_entry(entry_id) except Exception as e: print(f"Warning: Failed to append line {entry_id}: {e}") raise def _write_line(path: Path, line: str, use_lock: bool = True) -> None: """ Write line with optional file locking for safety. Args: path: File path to write to line: Line content to write use_lock: Whether to use file locking """ path = _ensure_safe_path(path, operation="append", context={"component": "logs"}) if use_lock: with file_lock(path, 'a') as handle: handle.write(line) if not line.endswith("\n"): handle.write("\n") handle.flush() os.fsync(handle.fileno()) else: with path.open("a", encoding="utf-8") as handle: handle.write(line) if not line.endswith("\n"): handle.write("\n") async def read_tail(path: Path, count: int) -> List[str]: """Return the last `count` lines from `path`.""" safe_path = _ensure_safe_path(path, operation="read", context={"component": "logs", "op": "tail"}) return await asyncio.to_thread(_read_tail_sync, safe_path, count) def _read_tail_sync(path: Path, count: int) -> List[str]: path = _ensure_safe_path(path, operation="read", context={"component": "logs", "op": "tail"}) if count <= 0: return [] try: with path.open("r", encoding="utf-8") as handle: tail = deque(handle, maxlen=count) except FileNotFoundError: return [] return [line.rstrip("\n") for line in tail] async def rotate_file(path: Path, suffix: str | None, confirm: bool = False, dry_run: bool = False, template_content: Optional[str] = None) -> Path: """ Bulletproof rotate `path` to a timestamped copy and return the archive path. Args: path: File path to rotate suffix: Suffix for the archive name confirm: Whether to actually perform the rotation dry_run: If True, simulate rotation without making changes template_content: Optional template content to use for new log Returns: Archive path Raises: AtomicFileError: If rotation fails """ path = _ensure_safe_path(path, operation="rotate", context={"component": "logs"}) suffix_part = suffix or "archive" archive = path.with_name(f"{path.name}.{suffix_part}.md") if not path.exists(): raise AtomicFileError(f"Cannot rotate non-existent file: {path}") # Verify file has content (prevent rotating empty files) file_info = verify_file_integrity(path) if file_info.get('line_count', 0) == 0: raise AtomicFileError(f"Cannot rotate empty file: {path}") if dry_run or not confirm: # Return what would happen without actually doing it return archive # Pre-flight backup backup_path = await asyncio.to_thread(preflight_backup, path) # Lock order: journal → log log_lock_acquired = False try: # Generate new log content in temp file first (avoid template race) new_log_content = template_content or "# Progress Log\n\n" # Create temp file for new log content temp_path = path.with_suffix(path.suffix + '.new') await asyncio.to_thread(_write_temp_file, temp_path, new_log_content) try: # Acquire log lock BEFORE any file operations with file_lock(path, 'r+', timeout=30.0) as f: log_lock_acquired = True # Create archive with unique name to avoid overwrites if archive.exists(): timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f")[:-3] archive = archive.with_name(f"{archive.stem}_{timestamp}{archive.suffix}") await ensure_parent(archive) # Atomic rotation: rename original to archive await asyncio.to_thread(path.rename, archive) # Atomic rename: temp → new log temp_path.replace(path) # Sync parent directory dir_fd = os.open(path.parent, os.O_RDONLY) try: os.fsync(dir_fd) finally: os.close(dir_fd) return archive finally: # Ensure temp file is cleaned up if temp_path.exists(): temp_path.unlink() except Exception as e: # Rollback: restore from backup if rotation failed try: if backup_path.exists(): await asyncio.to_thread(backup_path.rename, path) except Exception as rollback_error: print(f"Critical: Failed to rollback rotation: {rollback_error}") raise AtomicFileError(f"Rotation failed and was rolled back: {e}") finally: # Lock will be automatically released by file_lock context manager if log_lock_acquired: # Log lock is released by exiting the context manager pass def _write_temp_file(temp_path: Path, content: str) -> None: """Write content to temporary file.""" temp_path = _ensure_safe_path(temp_path, operation="write", context={"component": "files", "op": "temp"}) with open(temp_path, 'w', encoding='utf-8') as f: f.write(content) f.flush() os.fsync(f.fileno()) def _create_new_log(path: Path) -> None: """ Create a new progress log file with proper header. DEPRECATED: Use template_content parameter in rotate_file instead. Args: path: Path where to create the new log """ path = _ensure_safe_path(path, operation="write", context={"component": "logs", "op": "new"}) with file_lock(path, 'w') as f: f.write("# Progress Log\n\n") f.flush() os.fsync(f.fileno())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/paxocial/scribe_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server