manager.py•21.7 kB
"""Core logic for managing project documentation updates."""
from __future__ import annotations
import asyncio
import difflib
import hashlib
import json
import logging
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
# Import with absolute paths from scribe_mcp root
from scribe_mcp.utils.files import async_atomic_write, ensure_parent
from scribe_mcp.utils.time import utcnow
from scribe_mcp.templates import template_root
import re
# Setup logging for doc management operations
doc_logger = logging.getLogger(__name__)
FRAGMENT_DIR = (template_root().parent / "fragments").resolve()
SECTION_MARKER = "<!-- ID: {section} -->"
def _log_operation(
    level: str,
    operation: str,
    doc: str,
    section: Optional[str],
    action: str,
    file_path: Path,
    duration_ms: float,
    file_size_before: int,
    file_size_after: int,
    success: bool,
    error_message: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None
) -> None:
    """Log document operations with structured data."""
    log_data = {
        "timestamp": time.time(),
        "operation": operation,
        "document": doc,
        "section": section,
        "action": action,
        "file_path": str(file_path),
        "duration_ms": duration_ms,
        "file_size_before": file_size_before,
        "file_size_after": file_size_after,
        "file_size_change": file_size_after - file_size_before,
        "success": success,
        "error_message": error_message,
        "metadata": metadata or {}
    }
    # Convert to JSON for structured logging
    log_message = json.dumps(log_data, separators=(',', ':'))
    if level == "error":
        doc_logger.error(f"Document operation failed: {operation} on {doc} - {error_message or 'Unknown error'}", extra={"structured_log": log_data})
    elif level == "warning":
        doc_logger.warning(f"Document operation warning: {operation} on {doc} - {error_message}", extra={"structured_log": log_data})
    elif level == "info":
        doc_logger.info(f"Document operation successful: {operation} on {doc}", extra={"structured_log": log_data})
    else:
        doc_logger.debug(f"Document operation: {operation} on {doc}", extra={"structured_log": log_data})
@dataclass
class DocChangeResult:
    doc: str
    section: Optional[str]
    action: str
    path: Path
    before_hash: str
    after_hash: str
    content_written: str
    diff_preview: str
    success: bool = True
    error_message: Optional[str] = None
    verification_passed: bool = False
    file_size_before: int = 0
    file_size_after: int = 0
class DocumentOperationError(Exception):
    """Raised when a document operation fails."""
    pass
class DocumentValidationError(Exception):
    """Raised when document validation fails."""
    pass
class DocumentVerificationError(Exception):
    """Raised when document write verification fails."""
    pass
async def apply_doc_change(
    project: Dict[str, Any],
    *,
    doc: str,
    action: str,
    section: Optional[str],
    content: Optional[str],
    template: Optional[str],
    metadata: Optional[Dict[str, Any]],
    dry_run: bool,
) -> DocChangeResult:
    """Apply a document change with comprehensive error handling and verification."""
    start_time = time.time()
    file_size_before = 0
    try:
        # Validate inputs
        _validate_inputs(doc, action, section, content, template)
        # Resolve and validate document path
        doc_path = _resolve_doc_path(project, doc)
        await ensure_parent(doc_path)
        # Get original content and metadata
        original_text = ""
        file_size_before = 0
        if doc_path.exists():
            try:
                original_text = await asyncio.to_thread(doc_path.read_text, encoding="utf-8")
                file_size_before = doc_path.stat().st_size
            except (OSError, UnicodeDecodeError) as e:
                raise DocumentOperationError(f"Failed to read existing document {doc_path}: {e}")
        before_hash = _hash_text(original_text)
        # Render content
        try:
            rendered_content = await _render_content(project, content, template, metadata)
        except Exception as e:
            raise DocumentOperationError(f"Failed to render content: {e}")
        # Apply the change based on action
        try:
            if action == "replace_section":
                updated_text = _replace_section(original_text, section, rendered_content)
            elif action == "append":
                updated_text = _append_block(original_text, rendered_content)
            elif action == "status_update":
                updated_text = _toggle_checklist_status(original_text, section, metadata or {})
            else:
                raise ValueError(f"Unsupported action '{action}'.")
        except Exception as e:
            raise DocumentOperationError(f"Failed to apply {action} operation: {e}")
        after_hash = _hash_text(updated_text)
        # Generate diff preview
        diff_preview = "\n".join(
            difflib.unified_diff(
                original_text.splitlines(),
                updated_text.splitlines(),
                fromfile="before",
                tofile="after",
                lineterm="",
            )
        )
        # Apply changes to file system
        file_size_after = 0
        verification_passed = False
        if not dry_run:
            try:
                # Write the file
                await async_atomic_write(doc_path, updated_text, mode="w")
                # Verify the write was successful
                verification_passed = await _verify_file_write(doc_path, updated_text, after_hash)
                if not verification_passed:
                    raise DocumentVerificationError(f"File write verification failed for {doc_path}")
                file_size_after = doc_path.stat().st_size
                # Log successful operation
                duration_ms = (time.time() - start_time) * 1000
                _log_operation(
                    level="info",
                    operation="apply_doc_change",
                    doc=doc,
                    section=section,
                    action=action,
                    file_path=doc_path,
                    duration_ms=duration_ms,
                    file_size_before=file_size_before,
                    file_size_after=file_size_after,
                    success=True,
                    metadata={
                        "before_hash": before_hash,
                        "after_hash": after_hash,
                        "dry_run": dry_run
                    }
                )
            except Exception as e:
                # Attempt rollback if write failed
                try:
                    if original_text and doc_path.exists():
                        await async_atomic_write(doc_path, original_text, mode="w")
                        duration_ms = (time.time() - start_time) * 1000
                        _log_operation(
                            level="warning",
                            operation="apply_doc_change",
                            doc=doc,
                            section=section,
                            action=action,
                            file_path=doc_path,
                            duration_ms=duration_ms,
                            file_size_before=file_size_before,
                            file_size_after=file_size_before,  # Back to original
                            success=False,
                            error_message="Write failed, rolled back successfully",
                            metadata={"rollback": True, "original_error": str(e)}
                        )
                except Exception as rollback_error:
                    duration_ms = (time.time() - start_time) * 1000
                    _log_operation(
                        level="error",
                        operation="apply_doc_change",
                        doc=doc,
                        section=section,
                        action=action,
                        file_path=doc_path,
                        duration_ms=duration_ms,
                        file_size_before=file_size_before,
                        file_size_after=file_size_before,
                        success=False,
                        error_message=f"Write failed and rollback failed: {rollback_error}",
                        metadata={"rollback_failed": True, "original_error": str(e)}
                    )
                raise DocumentOperationError(f"Failed to write document {doc_path}: {e}")
        return DocChangeResult(
            doc=doc,
            section=section,
            action=action,
            path=doc_path,
            before_hash=before_hash,
            after_hash=after_hash,
            content_written=rendered_content,
            diff_preview=diff_preview,
            success=True,
            verification_passed=verification_passed,
            file_size_before=file_size_before,
            file_size_after=file_size_after,
        )
    except (DocumentValidationError, DocumentOperationError, DocumentVerificationError) as e:
        duration_ms = (time.time() - start_time) * 1000
        _log_operation(
            level="error",
            operation="apply_doc_change",
            doc=doc,
            section=section,
            action=action,
            file_path=Path(""),
            duration_ms=duration_ms,
            file_size_before=file_size_before,
            file_size_after=0,
            success=False,
            error_message=str(e),
            metadata={"error_type": type(e).__name__}
        )
        return DocChangeResult(
            doc=doc,
            section=section,
            action=action,
            path=Path(""),
            before_hash="",
            after_hash="",
            content_written="",
            diff_preview="",
            success=False,
            error_message=str(e),
            verification_passed=False,
            file_size_before=file_size_before,
            file_size_after=0,
        )
    except Exception as e:
        # Catch any unexpected errors
        duration_ms = (time.time() - start_time) * 1000
        _log_operation(
            level="error",
            operation="apply_doc_change",
            doc=doc,
            section=section,
            action=action,
            file_path=Path(""),
            duration_ms=duration_ms,
            file_size_before=file_size_before,
            file_size_after=0,
            success=False,
            error_message=f"Unexpected error: {e}",
            metadata={"error_type": type(e).__name__, "unexpected": True}
        )
        return DocChangeResult(
            doc=doc,
            section=section,
            action=action,
            path=Path(""),
            before_hash="",
            after_hash="",
            content_written="",
            diff_preview="",
            success=False,
            error_message=f"Unexpected error: {e}",
            verification_passed=False,
            file_size_before=file_size_before,
            file_size_after=0,
        )
def _resolve_doc_path(project: Dict[str, Any], doc_key: str) -> Path:
    """
    Resolve documentation path with safety assertions.
    Ensures all resolved paths remain within the repository sandbox
    and provides sensible fallbacks when explicit paths are not defined.
    """
    # Validate inputs
    if not doc_key:
        raise ValueError("doc_key cannot be empty")
    project_name = project.get("name")
    if not project_name:
        raise ValueError("project must have a name")
    project_root = Path(project.get("root", ""))
    if not project_root.exists():
        raise ValueError(f"Project root does not exist: {project_root}")
    # Try explicit paths first
    docs = project.get("docs") or {}
    target = docs.get(doc_key)
    if not target and doc_key == "progress_log":
        target = project.get("progress_log")
    if target:
        resolved_path = Path(target).resolve()
        # CRITICAL: Ensure path is within project sandbox
        try:
            resolved_path.relative_to(project_root)
        except ValueError as e:
            raise SecurityError(f"Document path {resolved_path} is outside project root {project_root}") from e
        doc_logger.debug(f"Resolved doc path for {doc_key} using explicit target: {resolved_path}")
        return resolved_path
    # Fallback to conventional structure
    docs_dir = docs.get("architecture")
    if docs_dir:
        docs_dir = Path(docs_dir).parent
    else:
        # Use conventional docs structure
        docs_dir = project_root / "docs" / "dev_plans" / slugify_project_name(project_name)
    filename = {
        "architecture": "ARCHITECTURE_GUIDE.md",
        "phase_plan": "PHASE_PLAN.md",
        "checklist": "CHECKLIST.md",
        "progress_log": "PROGRESS_LOG.md",
        "doc_log": "DOC_LOG.md",
        "security_log": "SECURITY_LOG.md",
        "bug_log": "BUG_LOG.md",
    }.get(doc_key, f"{doc_key.upper()}.md")
    resolved_path = (docs_dir / filename).resolve()
    # CRITICAL: Ensure fallback path is within project sandbox
    try:
        resolved_path.relative_to(project_root)
    except ValueError as e:
        raise SecurityError(f"Fallback document path {resolved_path} is outside project root {project_root}") from e
    doc_logger.debug(f"Resolved doc path for {doc_key} using fallback: {resolved_path}")
    return resolved_path
class SecurityError(RuntimeError):
    """Security violation when document paths escape project sandbox."""
async def _render_content(
    project: Dict[str, Any],
    content: Optional[str],
    template_name: Optional[str],
    metadata: Optional[Dict[str, Any]],
) -> str:
    """Render content using Jinja2 template engine with fallback to direct content."""
    if template_name:
        try:
            # Import template engine dynamically to avoid circular imports
            from scribe_mcp.template_engine import Jinja2TemplateEngine, TemplateEngineError
            # Initialize template engine
            engine = Jinja2TemplateEngine(
                project_root=Path(project.get("root", "")),
                project_name=project.get("name", ""),
                security_mode="sandbox"
            )
            # Add timestamp to metadata
            enhanced_metadata = metadata.copy() if metadata else {}
            enhanced_metadata.update({
                "timestamp": utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
            })
            # Render template
            rendered = engine.render_template(
                template_name=f"{template_name}.md",
                metadata=enhanced_metadata
            )
            doc_logger.debug(f"Successfully rendered template '{template_name}' using Jinja2 engine")
            return rendered
        except TemplateEngineError as e:
            doc_logger.error(f"Template engine error for '{template_name}': {e}")
            raise DocumentOperationError(f"Failed to render template '{template_name}': {e}")
        except Exception as e:
            doc_logger.error(f"Unexpected error rendering template '{template_name}': {e}")
            raise DocumentOperationError(f"Unexpected template error: {e}")
    if isinstance(content, str) and content.strip():
        # For direct content, still allow Jinja2 processing for variables
        try:
            from scribe_mcp.template_engine import Jinja2TemplateEngine, TemplateEngineError
            engine = Jinja2TemplateEngine(
                project_root=Path(project.get("root", "")),
                project_name=project.get("name", ""),
                security_mode="sandbox"
            )
            enhanced_metadata = metadata.copy() if metadata else {}
            enhanced_metadata.update({
                "timestamp": utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
            })
            rendered = engine.render_string(content.strip(), metadata=enhanced_metadata)
            doc_logger.debug(f"Successfully rendered content string using Jinja2 engine")
            return rendered
        except (TemplateEngineError, ImportError):
            # Fallback to original content if Jinja2 processing fails
            doc_logger.warning(f"Jinja2 processing failed, using raw content")
            return content.strip()
    raise ValueError("Either content or template must be provided.")
async def _load_fragment(name: str) -> str:
    fragment = (FRAGMENT_DIR / f"{name}.md").resolve()
    if not fragment.exists():
        raise FileNotFoundError(f"Template fragment '{name}' not found under {FRAGMENT_DIR}")
    return await asyncio.to_thread(fragment.read_text, encoding="utf-8")
def _replace_section(text: str, section: Optional[str], content: str) -> str:
    marker = SECTION_MARKER.format(section=section)
    idx = text.find(marker)
    if idx == -1:
        raise ValueError(f"Section '{section}' anchor not found (expected '{marker}').")
    start = idx + len(marker)
    # Skip newline right after marker
    if start < len(text) and text[start] == "\r":
        start += 1
    if start < len(text) and text[start] == "\n":
        start += 1
    next_marker = text.find("<!-- ID:", start)
    if next_marker == -1:
        next_marker = len(text)
    new_block = marker + "\n" + content.strip() + "\n"
    replacement = text[:idx] + new_block + text[next_marker:]
    return replacement
def _append_block(text: str, content: str) -> str:
    if not text.endswith("\n"):
        text += "\n"
    return text + content.strip() + "\n"
def _toggle_checklist_status(text: str, section: Optional[str], metadata: Dict[str, Any]) -> str:
    desired = metadata.get("status", "").lower()
    proof = metadata.get("proof")
    replacement = None
    lines = text.splitlines()
    for idx, line in enumerate(lines):
        if section and section not in line:
            continue
        if "- [ ]" in line or "- [x]" in line:
            if desired in {"done", "completed", "complete"}:
                new_line = line.replace("- [ ]", "- [x]", 1)
            elif desired in {"pending", "todo"}:
                new_line = line.replace("- [x]", "- [ ]", 1)
            else:
                new_line = line
            if proof:
                new_line = new_line.split(" | ")[0] + f" | proof={proof}"
            lines[idx] = new_line
            replacement = True
            break
    if not replacement:
        raise ValueError(f"Could not find checklist line containing '{section}'.")
    return "\n".join(lines) + ("\n" if text.endswith("\n") else "")
def _validate_inputs(
    doc: str,
    action: str,
    section: Optional[str],
    content: Optional[str],
    template: Optional[str]
) -> None:
    """Validate input parameters for document operations."""
    if not doc or not isinstance(doc, str):
        raise DocumentValidationError("Document name must be a non-empty string")
    if not action or not isinstance(action, str):
        raise DocumentValidationError("Action must be a non-empty string")
    valid_actions = {"replace_section", "append", "status_update"}
    if action not in valid_actions:
        raise DocumentValidationError(f"Invalid action '{action}'. Must be one of: {valid_actions}")
    if action in {"replace_section", "status_update"} and (not section or not isinstance(section, str)):
        raise DocumentValidationError(f"Section parameter is required for {action} action")
    if not content and not template:
        raise DocumentValidationError("Either content or template must be provided")
    # Validate section ID format
    if section and not section.replace("_", "").replace("-", "").isalnum():
        raise DocumentValidationError(f"Invalid section ID '{section}'. Must contain only alphanumeric characters, hyphens, and underscores")
async def _verify_file_write(file_path: Path, expected_content: str, expected_hash: str) -> bool:
    """Verify that the file was written correctly."""
    try:
        # Check if file exists
        if not file_path.exists():
            return False
        # Read the actual content
        actual_content = await asyncio.to_thread(file_path.read_text, encoding="utf-8")
        # Verify content matches exactly
        if actual_content != expected_content:
            doc_logger.error(f"Content mismatch in {file_path}: expected {len(expected_content)} chars, got {len(actual_content)} chars")
            return False
        # Verify hash matches
        actual_hash = _hash_text(actual_content)
        if actual_hash != expected_hash:
            doc_logger.error(f"Hash mismatch in {file_path}: expected {expected_hash[:8]}..., got {actual_hash[:8]}...")
            return False
        return True
    except Exception as e:
        doc_logger.error(f"Failed to verify file write for {file_path}: {e}")
        return False
def _hash_text(value: str) -> str:
    return hashlib.sha256(value.encode("utf-8")).hexdigest()
class DefaultDict(dict):
    """Helper to avoid KeyError when formatting template fragments."""
    def __init__(self, base: Dict[str, Any]):
        super().__init__(base)
    def __missing__(self, key: str) -> str:
        return ""
_SLUG_CLEANER = re.compile(r"[^0-9a-z_]+")
def slugify_project_name(name: str) -> str:
    """Return a filesystem-friendly slug; duplicated here to avoid circular imports during tests."""
    normalised = name.strip().lower().replace(" ", "_")
    return _SLUG_CLEANER.sub("_", normalised).strip("_") or "project"