Skip to main content
Glama

Scribe MCP Server

by paxocial
manager.py21.7 kB
"""Core logic for managing project documentation updates.""" from __future__ import annotations import asyncio import difflib import hashlib import json import logging import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Optional, Tuple # Import with absolute paths from scribe_mcp root from scribe_mcp.utils.files import async_atomic_write, ensure_parent from scribe_mcp.utils.time import utcnow from scribe_mcp.templates import template_root import re # Setup logging for doc management operations doc_logger = logging.getLogger(__name__) FRAGMENT_DIR = (template_root().parent / "fragments").resolve() SECTION_MARKER = "<!-- ID: {section} -->" def _log_operation( level: str, operation: str, doc: str, section: Optional[str], action: str, file_path: Path, duration_ms: float, file_size_before: int, file_size_after: int, success: bool, error_message: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> None: """Log document operations with structured data.""" log_data = { "timestamp": time.time(), "operation": operation, "document": doc, "section": section, "action": action, "file_path": str(file_path), "duration_ms": duration_ms, "file_size_before": file_size_before, "file_size_after": file_size_after, "file_size_change": file_size_after - file_size_before, "success": success, "error_message": error_message, "metadata": metadata or {} } # Convert to JSON for structured logging log_message = json.dumps(log_data, separators=(',', ':')) if level == "error": doc_logger.error(f"Document operation failed: {operation} on {doc} - {error_message or 'Unknown error'}", extra={"structured_log": log_data}) elif level == "warning": doc_logger.warning(f"Document operation warning: {operation} on {doc} - {error_message}", extra={"structured_log": log_data}) elif level == "info": doc_logger.info(f"Document operation successful: {operation} on {doc}", extra={"structured_log": log_data}) else: doc_logger.debug(f"Document operation: {operation} on {doc}", extra={"structured_log": log_data}) @dataclass class DocChangeResult: doc: str section: Optional[str] action: str path: Path before_hash: str after_hash: str content_written: str diff_preview: str success: bool = True error_message: Optional[str] = None verification_passed: bool = False file_size_before: int = 0 file_size_after: int = 0 class DocumentOperationError(Exception): """Raised when a document operation fails.""" pass class DocumentValidationError(Exception): """Raised when document validation fails.""" pass class DocumentVerificationError(Exception): """Raised when document write verification fails.""" pass async def apply_doc_change( project: Dict[str, Any], *, doc: str, action: str, section: Optional[str], content: Optional[str], template: Optional[str], metadata: Optional[Dict[str, Any]], dry_run: bool, ) -> DocChangeResult: """Apply a document change with comprehensive error handling and verification.""" start_time = time.time() file_size_before = 0 try: # Validate inputs _validate_inputs(doc, action, section, content, template) # Resolve and validate document path doc_path = _resolve_doc_path(project, doc) await ensure_parent(doc_path) # Get original content and metadata original_text = "" file_size_before = 0 if doc_path.exists(): try: original_text = await asyncio.to_thread(doc_path.read_text, encoding="utf-8") file_size_before = doc_path.stat().st_size except (OSError, UnicodeDecodeError) as e: raise DocumentOperationError(f"Failed to read existing document {doc_path}: {e}") before_hash = _hash_text(original_text) # Render content try: rendered_content = await _render_content(project, content, template, metadata) except Exception as e: raise DocumentOperationError(f"Failed to render content: {e}") # Apply the change based on action try: if action == "replace_section": updated_text = _replace_section(original_text, section, rendered_content) elif action == "append": updated_text = _append_block(original_text, rendered_content) elif action == "status_update": updated_text = _toggle_checklist_status(original_text, section, metadata or {}) else: raise ValueError(f"Unsupported action '{action}'.") except Exception as e: raise DocumentOperationError(f"Failed to apply {action} operation: {e}") after_hash = _hash_text(updated_text) # Generate diff preview diff_preview = "\n".join( difflib.unified_diff( original_text.splitlines(), updated_text.splitlines(), fromfile="before", tofile="after", lineterm="", ) ) # Apply changes to file system file_size_after = 0 verification_passed = False if not dry_run: try: # Write the file await async_atomic_write(doc_path, updated_text, mode="w") # Verify the write was successful verification_passed = await _verify_file_write(doc_path, updated_text, after_hash) if not verification_passed: raise DocumentVerificationError(f"File write verification failed for {doc_path}") file_size_after = doc_path.stat().st_size # Log successful operation duration_ms = (time.time() - start_time) * 1000 _log_operation( level="info", operation="apply_doc_change", doc=doc, section=section, action=action, file_path=doc_path, duration_ms=duration_ms, file_size_before=file_size_before, file_size_after=file_size_after, success=True, metadata={ "before_hash": before_hash, "after_hash": after_hash, "dry_run": dry_run } ) except Exception as e: # Attempt rollback if write failed try: if original_text and doc_path.exists(): await async_atomic_write(doc_path, original_text, mode="w") duration_ms = (time.time() - start_time) * 1000 _log_operation( level="warning", operation="apply_doc_change", doc=doc, section=section, action=action, file_path=doc_path, duration_ms=duration_ms, file_size_before=file_size_before, file_size_after=file_size_before, # Back to original success=False, error_message="Write failed, rolled back successfully", metadata={"rollback": True, "original_error": str(e)} ) except Exception as rollback_error: duration_ms = (time.time() - start_time) * 1000 _log_operation( level="error", operation="apply_doc_change", doc=doc, section=section, action=action, file_path=doc_path, duration_ms=duration_ms, file_size_before=file_size_before, file_size_after=file_size_before, success=False, error_message=f"Write failed and rollback failed: {rollback_error}", metadata={"rollback_failed": True, "original_error": str(e)} ) raise DocumentOperationError(f"Failed to write document {doc_path}: {e}") return DocChangeResult( doc=doc, section=section, action=action, path=doc_path, before_hash=before_hash, after_hash=after_hash, content_written=rendered_content, diff_preview=diff_preview, success=True, verification_passed=verification_passed, file_size_before=file_size_before, file_size_after=file_size_after, ) except (DocumentValidationError, DocumentOperationError, DocumentVerificationError) as e: duration_ms = (time.time() - start_time) * 1000 _log_operation( level="error", operation="apply_doc_change", doc=doc, section=section, action=action, file_path=Path(""), duration_ms=duration_ms, file_size_before=file_size_before, file_size_after=0, success=False, error_message=str(e), metadata={"error_type": type(e).__name__} ) return DocChangeResult( doc=doc, section=section, action=action, path=Path(""), before_hash="", after_hash="", content_written="", diff_preview="", success=False, error_message=str(e), verification_passed=False, file_size_before=file_size_before, file_size_after=0, ) except Exception as e: # Catch any unexpected errors duration_ms = (time.time() - start_time) * 1000 _log_operation( level="error", operation="apply_doc_change", doc=doc, section=section, action=action, file_path=Path(""), duration_ms=duration_ms, file_size_before=file_size_before, file_size_after=0, success=False, error_message=f"Unexpected error: {e}", metadata={"error_type": type(e).__name__, "unexpected": True} ) return DocChangeResult( doc=doc, section=section, action=action, path=Path(""), before_hash="", after_hash="", content_written="", diff_preview="", success=False, error_message=f"Unexpected error: {e}", verification_passed=False, file_size_before=file_size_before, file_size_after=0, ) def _resolve_doc_path(project: Dict[str, Any], doc_key: str) -> Path: """ Resolve documentation path with safety assertions. Ensures all resolved paths remain within the repository sandbox and provides sensible fallbacks when explicit paths are not defined. """ # Validate inputs if not doc_key: raise ValueError("doc_key cannot be empty") project_name = project.get("name") if not project_name: raise ValueError("project must have a name") project_root = Path(project.get("root", "")) if not project_root.exists(): raise ValueError(f"Project root does not exist: {project_root}") # Try explicit paths first docs = project.get("docs") or {} target = docs.get(doc_key) if not target and doc_key == "progress_log": target = project.get("progress_log") if target: resolved_path = Path(target).resolve() # CRITICAL: Ensure path is within project sandbox try: resolved_path.relative_to(project_root) except ValueError as e: raise SecurityError(f"Document path {resolved_path} is outside project root {project_root}") from e doc_logger.debug(f"Resolved doc path for {doc_key} using explicit target: {resolved_path}") return resolved_path # Fallback to conventional structure docs_dir = docs.get("architecture") if docs_dir: docs_dir = Path(docs_dir).parent else: # Use conventional docs structure docs_dir = project_root / "docs" / "dev_plans" / slugify_project_name(project_name) filename = { "architecture": "ARCHITECTURE_GUIDE.md", "phase_plan": "PHASE_PLAN.md", "checklist": "CHECKLIST.md", "progress_log": "PROGRESS_LOG.md", "doc_log": "DOC_LOG.md", "security_log": "SECURITY_LOG.md", "bug_log": "BUG_LOG.md", }.get(doc_key, f"{doc_key.upper()}.md") resolved_path = (docs_dir / filename).resolve() # CRITICAL: Ensure fallback path is within project sandbox try: resolved_path.relative_to(project_root) except ValueError as e: raise SecurityError(f"Fallback document path {resolved_path} is outside project root {project_root}") from e doc_logger.debug(f"Resolved doc path for {doc_key} using fallback: {resolved_path}") return resolved_path class SecurityError(RuntimeError): """Security violation when document paths escape project sandbox.""" async def _render_content( project: Dict[str, Any], content: Optional[str], template_name: Optional[str], metadata: Optional[Dict[str, Any]], ) -> str: """Render content using Jinja2 template engine with fallback to direct content.""" if template_name: try: # Import template engine dynamically to avoid circular imports from scribe_mcp.template_engine import Jinja2TemplateEngine, TemplateEngineError # Initialize template engine engine = Jinja2TemplateEngine( project_root=Path(project.get("root", "")), project_name=project.get("name", ""), security_mode="sandbox" ) # Add timestamp to metadata enhanced_metadata = metadata.copy() if metadata else {} enhanced_metadata.update({ "timestamp": utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"), }) # Render template rendered = engine.render_template( template_name=f"{template_name}.md", metadata=enhanced_metadata ) doc_logger.debug(f"Successfully rendered template '{template_name}' using Jinja2 engine") return rendered except TemplateEngineError as e: doc_logger.error(f"Template engine error for '{template_name}': {e}") raise DocumentOperationError(f"Failed to render template '{template_name}': {e}") except Exception as e: doc_logger.error(f"Unexpected error rendering template '{template_name}': {e}") raise DocumentOperationError(f"Unexpected template error: {e}") if isinstance(content, str) and content.strip(): # For direct content, still allow Jinja2 processing for variables try: from scribe_mcp.template_engine import Jinja2TemplateEngine, TemplateEngineError engine = Jinja2TemplateEngine( project_root=Path(project.get("root", "")), project_name=project.get("name", ""), security_mode="sandbox" ) enhanced_metadata = metadata.copy() if metadata else {} enhanced_metadata.update({ "timestamp": utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"), }) rendered = engine.render_string(content.strip(), metadata=enhanced_metadata) doc_logger.debug(f"Successfully rendered content string using Jinja2 engine") return rendered except (TemplateEngineError, ImportError): # Fallback to original content if Jinja2 processing fails doc_logger.warning(f"Jinja2 processing failed, using raw content") return content.strip() raise ValueError("Either content or template must be provided.") async def _load_fragment(name: str) -> str: fragment = (FRAGMENT_DIR / f"{name}.md").resolve() if not fragment.exists(): raise FileNotFoundError(f"Template fragment '{name}' not found under {FRAGMENT_DIR}") return await asyncio.to_thread(fragment.read_text, encoding="utf-8") def _replace_section(text: str, section: Optional[str], content: str) -> str: marker = SECTION_MARKER.format(section=section) idx = text.find(marker) if idx == -1: raise ValueError(f"Section '{section}' anchor not found (expected '{marker}').") start = idx + len(marker) # Skip newline right after marker if start < len(text) and text[start] == "\r": start += 1 if start < len(text) and text[start] == "\n": start += 1 next_marker = text.find("<!-- ID:", start) if next_marker == -1: next_marker = len(text) new_block = marker + "\n" + content.strip() + "\n" replacement = text[:idx] + new_block + text[next_marker:] return replacement def _append_block(text: str, content: str) -> str: if not text.endswith("\n"): text += "\n" return text + content.strip() + "\n" def _toggle_checklist_status(text: str, section: Optional[str], metadata: Dict[str, Any]) -> str: desired = metadata.get("status", "").lower() proof = metadata.get("proof") replacement = None lines = text.splitlines() for idx, line in enumerate(lines): if section and section not in line: continue if "- [ ]" in line or "- [x]" in line: if desired in {"done", "completed", "complete"}: new_line = line.replace("- [ ]", "- [x]", 1) elif desired in {"pending", "todo"}: new_line = line.replace("- [x]", "- [ ]", 1) else: new_line = line if proof: new_line = new_line.split(" | ")[0] + f" | proof={proof}" lines[idx] = new_line replacement = True break if not replacement: raise ValueError(f"Could not find checklist line containing '{section}'.") return "\n".join(lines) + ("\n" if text.endswith("\n") else "") def _validate_inputs( doc: str, action: str, section: Optional[str], content: Optional[str], template: Optional[str] ) -> None: """Validate input parameters for document operations.""" if not doc or not isinstance(doc, str): raise DocumentValidationError("Document name must be a non-empty string") if not action or not isinstance(action, str): raise DocumentValidationError("Action must be a non-empty string") valid_actions = {"replace_section", "append", "status_update"} if action not in valid_actions: raise DocumentValidationError(f"Invalid action '{action}'. Must be one of: {valid_actions}") if action in {"replace_section", "status_update"} and (not section or not isinstance(section, str)): raise DocumentValidationError(f"Section parameter is required for {action} action") if not content and not template: raise DocumentValidationError("Either content or template must be provided") # Validate section ID format if section and not section.replace("_", "").replace("-", "").isalnum(): raise DocumentValidationError(f"Invalid section ID '{section}'. Must contain only alphanumeric characters, hyphens, and underscores") async def _verify_file_write(file_path: Path, expected_content: str, expected_hash: str) -> bool: """Verify that the file was written correctly.""" try: # Check if file exists if not file_path.exists(): return False # Read the actual content actual_content = await asyncio.to_thread(file_path.read_text, encoding="utf-8") # Verify content matches exactly if actual_content != expected_content: doc_logger.error(f"Content mismatch in {file_path}: expected {len(expected_content)} chars, got {len(actual_content)} chars") return False # Verify hash matches actual_hash = _hash_text(actual_content) if actual_hash != expected_hash: doc_logger.error(f"Hash mismatch in {file_path}: expected {expected_hash[:8]}..., got {actual_hash[:8]}...") return False return True except Exception as e: doc_logger.error(f"Failed to verify file write for {file_path}: {e}") return False def _hash_text(value: str) -> str: return hashlib.sha256(value.encode("utf-8")).hexdigest() class DefaultDict(dict): """Helper to avoid KeyError when formatting template fragments.""" def __init__(self, base: Dict[str, Any]): super().__init__(base) def __missing__(self, key: str) -> str: return "" _SLUG_CLEANER = re.compile(r"[^0-9a-z_]+") def slugify_project_name(name: str) -> str: """Return a filesystem-friendly slug; duplicated here to avoid circular imports during tests.""" normalised = name.strip().lower().replace(" ", "_") return _SLUG_CLEANER.sub("_", normalised).strip("_") or "project"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/paxocial/scribe_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server