"""Core logic for managing project documentation updates."""
from __future__ import annotations
import asyncio
import ast
import difflib
import hashlib
import json
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
# Import with absolute paths from scribe_mcp root
from scribe_mcp.utils.files import async_atomic_write, ensure_parent, preflight_backup
from scribe_mcp.config.repo_config import RepoDiscovery
from scribe_mcp.utils.frontmatter import (
apply_frontmatter_updates,
build_frontmatter,
parse_frontmatter,
)
from scribe_mcp.utils.diff_compiler import compile_unified_diff
from scribe_mcp.utils.time import utcnow
from scribe_mcp.templates import template_root
from scribe_mcp.utils.parameter_validator import (
ToolValidator,
BulletproofParameterCorrector,
)
import re
# Setup logging for doc management operations
doc_logger = logging.getLogger(__name__)
FRAGMENT_DIR = (template_root().parent / "fragments").resolve()
SECTION_MARKER = "<!-- ID: {section} -->"
_HUNK_HEADER = re.compile(r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@")
_HEADER_LINE_PATTERN = re.compile(r"^(#{1,6})\s+(.*\S.*)$")
PATCH_MODE_STRUCTURED = "structured"
PATCH_MODE_UNIFIED = "unified"
PATCH_MODE_ALIASES = {"diff": PATCH_MODE_UNIFIED}
PATCH_MODE_ALLOWED = {PATCH_MODE_STRUCTURED, PATCH_MODE_UNIFIED}
def _log_operation(
level: str,
operation: str,
doc: str,
section: Optional[str],
action: str,
file_path: Path,
duration_ms: float,
file_size_before: int,
file_size_after: int,
success: bool,
error_message: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> None:
"""Log document operations with structured data."""
log_data = {
"timestamp": time.time(),
"operation": operation,
"document": doc,
"section": section,
"action": action,
"file_path": str(file_path),
"duration_ms": duration_ms,
"file_size_before": file_size_before,
"file_size_after": file_size_after,
"file_size_change": file_size_after - file_size_before,
"success": success,
"error_message": error_message,
"metadata": metadata or {}
}
# Convert to JSON for structured logging
log_message = json.dumps(log_data, separators=(',', ':'))
if level == "error":
doc_logger.error(f"Document operation failed: {operation} on {doc} - {error_message or 'Unknown error'}", extra={"structured_log": log_data})
elif level == "warning":
doc_logger.warning(f"Document operation warning: {operation} on {doc} - {error_message}", extra={"structured_log": log_data})
elif level == "info":
doc_logger.info(f"Document operation successful: {operation} on {doc}", extra={"structured_log": log_data})
else:
doc_logger.debug(f"Document operation: {operation} on {doc}", extra={"structured_log": log_data})
@dataclass
class DocChangeResult:
doc: str
section: Optional[str]
action: str
path: Path
before_hash: str
after_hash: str
content_written: str
diff_preview: str
extra: Dict[str, Any] = field(default_factory=dict)
success: bool = True
error_message: Optional[str] = None
verification_passed: bool = False
file_size_before: int = 0
file_size_after: int = 0
class DocumentOperationError(Exception):
"""Raised when a document operation fails."""
def __init__(self, message: str, extra: Optional[Dict[str, Any]] = None) -> None:
super().__init__(message)
self.extra = extra or {}
class DocumentVerificationError(Exception):
"""Raised when document write verification fails."""
pass
class DocumentValidationError(Exception):
"""Raised when document validation fails."""
pass
async def apply_doc_change(
project: Dict[str, Any],
*,
doc: str,
action: str,
section: Optional[str],
content: Optional[str],
patch: Optional[str] = None,
patch_source_hash: Optional[str] = None,
edit: Optional[Dict[str, Any]] = None,
patch_mode: Optional[str] = None,
start_line: Optional[int] = None,
end_line: Optional[int] = None,
template: Optional[str],
metadata: Optional[Dict[str, Any]],
dry_run: bool,
) -> DocChangeResult:
"""Apply a document change with comprehensive error handling and verification."""
start_time = time.time()
file_size_before = 0
try:
# Validate and correct inputs (bulletproof - never fails)
doc, action, section, content, template, metadata = _validate_and_correct_inputs(
doc, action, section, content, patch, patch_source_hash, edit,
start_line, end_line, template, metadata
)
# Resolve and validate document path
if action != "create_doc":
docs_mapping = project.get("docs") or {}
if doc not in docs_mapping:
raise DocumentOperationError(f"DOC_NOT_FOUND: doc '{doc}' is not registered")
if action == "create_doc":
doc_path = _resolve_create_doc_path(project, metadata, doc)
else:
doc_path = _resolve_doc_path(project, doc)
repo_root = Path(project["root"]).resolve()
await ensure_parent(doc_path, repo_root=repo_root)
# Get original content and metadata
original_text = ""
file_size_before = 0
if doc_path.exists():
try:
original_text = await asyncio.to_thread(doc_path.read_text, encoding="utf-8")
file_size_before = doc_path.stat().st_size
except (OSError, UnicodeDecodeError) as e:
raise DocumentOperationError(f"Failed to read existing document {doc_path}: {e}")
before_hash = _hash_text(original_text)
try:
original_parsed = parse_frontmatter(original_text)
except ValueError as exc:
raise DocumentOperationError(str(exc))
original_body = original_parsed.body
frontmatter_line_count = len(original_parsed.frontmatter_raw.splitlines()) if original_parsed.has_frontmatter else 0
# Render content when required
rendered_content: Optional[str] = None
if action in {"replace_section", "append"}:
try:
rendered_content = await _render_content(project, content, template, metadata)
except Exception as e:
raise DocumentOperationError(f"Failed to render content: {e}")
# Apply the change based on action
extra: Dict[str, Any] = {}
try:
if action == "replace_section":
assert rendered_content is not None
allow_append = False
if isinstance(metadata, dict):
allow_append = bool(metadata.get("allow_append") or metadata.get("scaffold"))
updated_body = _replace_section(
original_body,
section,
rendered_content,
allow_append=allow_append,
)
elif action == "append":
assert rendered_content is not None
meta_payload = metadata if isinstance(metadata, dict) else {}
position_value = meta_payload.get("position", "after")
updated_body = _append_block(
original_body,
rendered_content,
section=section,
position=str(position_value),
)
elif action == "status_update":
updated_body = _toggle_checklist_status(original_body, section, metadata or {})
elif action == "apply_patch":
patch_text = patch or content
current_hash = _hash_text(original_body)
patch_hash_source = "provided" if patch_source_hash else "auto"
patch_used = patch_text or ""
meta_mode = None
if isinstance(metadata, dict):
meta_mode = metadata.get("patch_mode") or metadata.get("mode")
def _normalize_mode(value: Any) -> str:
if not value:
return ""
normalized = str(value).strip().lower()
return PATCH_MODE_ALIASES.get(normalized, normalized)
if patch_mode and meta_mode:
normalized_arg = _normalize_mode(patch_mode)
normalized_meta = _normalize_mode(meta_mode)
if normalized_arg and normalized_meta and normalized_arg != normalized_meta:
raise DocumentOperationError(
"PATCH_MODE_CONFLICT: patch_mode argument conflicts with metadata"
)
effective_mode = _normalize_mode(patch_mode) or _normalize_mode(meta_mode)
if not effective_mode:
effective_mode = PATCH_MODE_STRUCTURED
if patch_used and edit:
raise DocumentOperationError(
"PATCH_MODE_CONFLICT: provide either patch or edit, not both"
)
if effective_mode not in PATCH_MODE_ALLOWED:
raise DocumentOperationError(
"PATCH_MODE_INVALID: use patch_mode=structured or patch_mode=unified"
)
if effective_mode == PATCH_MODE_STRUCTURED:
if not isinstance(edit, dict):
raise DocumentOperationError(
"PATCH_MODE_STRUCTURED_REQUIRES_EDIT: provide edit payload"
)
allow_append = False
if isinstance(metadata, dict):
allow_append = bool(metadata.get("allow_append") or metadata.get("scaffold"))
updated_candidate = _apply_structured_edit(
original_body,
edit,
allow_append=allow_append,
)
patch_used = compile_unified_diff(
original_body,
updated_candidate,
fromfile="before",
tofile="after",
)
elif not patch_used:
raise DocumentOperationError(
"PATCH_MODE_UNIFIED_REQUIRES_PATCH: provide patch content"
)
if patch_source_hash and current_hash != patch_source_hash:
raise DocumentOperationError(
"PATCH_STALE_SOURCE: patch_source_hash does not match current file content",
extra={"precondition_failed": "SOURCE_HASH_MISMATCH"},
)
try:
updated_body, hunks_applied = _apply_unified_patch(original_body, patch_used)
extra = {
"hunks_applied": hunks_applied,
"patch_source_hash": current_hash,
"patch_source_hash_source": patch_hash_source,
}
except DocumentOperationError as exc:
error_message = str(exc)
if _is_patch_context_error(error_message) and not patch_source_hash:
latest_text = original_text
if doc_path.exists():
latest_text = await asyncio.to_thread(doc_path.read_text, encoding="utf-8")
latest_parsed = parse_frontmatter(latest_text)
if latest_parsed.body != original_body:
original_body = latest_parsed.body
original_parsed = latest_parsed
current_hash = _hash_text(original_body)
before_hash = current_hash
file_size_before = doc_path.stat().st_size if doc_path.exists() else 0
try:
rebased_patch, rebase_info = _rebase_patch_to_current_context(
original_body, patch_used
)
patch_used = rebased_patch
updated_body, hunks_applied = _apply_unified_patch(
original_body, patch_used
)
extra = {
"hunks_applied": hunks_applied,
"patch_source_hash": current_hash,
"patch_source_hash_source": "auto",
"rebase_attempted": True,
"rebase_applied": True,
"rebase_info": rebase_info,
}
except DocumentOperationError as rebase_error:
diagnostics = _build_patch_failure_diagnostics(
original_body, patch_text or "", str(rebase_error)
)
diagnostics.setdefault("rebase_attempted", True)
if getattr(rebase_error, "extra", None):
diagnostics.setdefault("rebase_details", rebase_error.extra)
diagnostics.setdefault("rebase_failed_reason", str(rebase_error))
raise DocumentOperationError(
error_message,
extra={"patch_diagnostics": diagnostics},
)
else:
diagnostics = _build_patch_failure_diagnostics(
original_body, patch_used, error_message
)
raise DocumentOperationError(
error_message,
extra={"patch_diagnostics": diagnostics},
)
patch_ranges = _compute_patch_ranges(patch_used)
if patch_ranges:
extra["affected_ranges"] = patch_ranges
if dry_run:
previews = _build_bounded_previews(original_body, updated_body, patch_ranges)
if previews:
extra["preview_window"] = previews[0]
extra["preview_windows"] = previews
elif action == "normalize_headers":
updated_body = _normalize_headers_text(original_body)
elif action == "generate_toc":
updated_body = _generate_toc_text(original_body)
elif action == "replace_range":
resolved_start = start_line
resolved_end = end_line
if isinstance(metadata, dict):
if resolved_start is None and "start_line" in metadata:
resolved_start = metadata["start_line"]
if resolved_end is None and "end_line" in metadata:
resolved_end = metadata["end_line"]
if resolved_start is not None:
resolved_start = int(resolved_start)
if resolved_end is not None:
resolved_end = int(resolved_end)
updated_body = _replace_range_text(
original_body,
resolved_start,
resolved_end,
content or "",
)
elif action == "replace_text":
if not isinstance(metadata, dict):
raise DocumentOperationError(
"REPLACE_TEXT_MISSING_METADATA: provide metadata with find/replace values"
)
find_text = metadata.get("find")
if not isinstance(find_text, str) or not find_text:
raise DocumentOperationError("REPLACE_TEXT_MISSING_FIND: metadata.find is required")
replace_text = metadata.get("replace")
if replace_text is None:
replace_text = ""
match_mode = str(metadata.get("match_mode") or "literal").strip().lower()
if match_mode not in {"literal", "regex"}:
raise DocumentOperationError(
"REPLACE_TEXT_MATCH_MODE_INVALID: use literal or regex"
)
replace_all = bool(metadata.get("replace_all", True))
scope = metadata.get("scope")
allow_no_match = bool(metadata.get("allow_no_match", False))
updated_body, hits = _replace_text_with_scope(
original_body,
find_text=find_text,
replace_text=str(replace_text),
match_mode=match_mode,
replace_all=replace_all,
scope=str(scope) if scope else None,
allow_no_match=allow_no_match,
)
extra["replace_text"] = {
"find": find_text,
"replace_all": replace_all,
"match_mode": match_mode,
"scope": scope,
"matches": hits,
}
elif action == "create_doc":
overwrite = bool(metadata.get("overwrite")) if isinstance(metadata, dict) else False
if doc_path.exists() and not overwrite:
raise DocumentOperationError(
"CREATE_DOC_EXISTS: target path already exists (use metadata.overwrite to replace or "
"metadata.register_existing to register without overwrite)"
)
updated_body = _build_create_doc_body(content, metadata)
if isinstance(metadata, dict) and isinstance(metadata.get("frontmatter"), dict):
metadata.setdefault("frontmatter", {})
if isinstance(metadata, dict):
doc_type = metadata.get("doc_type") or doc
metadata.setdefault("frontmatter", {})
metadata["frontmatter"].setdefault("doc_type", doc_type)
elif action == "validate_crosslinks":
extra = {"crosslinks": _validate_crosslinks(original_text, project, metadata)}
updated_body = original_body
else:
raise ValueError(f"Unsupported action '{action}'.")
except DocumentOperationError:
raise
except Exception as e:
raise DocumentOperationError(f"Failed to apply {action} operation: {e}")
after_hash = _hash_text(updated_body)
body_diff_preview = compile_unified_diff(
original_body,
updated_body,
fromfile="before",
tofile="after",
)
# Short-circuit for read-only actions
if action == "validate_crosslinks":
return DocChangeResult(
doc=doc,
section=section,
action=action,
path=doc_path,
before_hash=before_hash,
after_hash=before_hash,
content_written=original_text,
diff_preview="",
extra=extra if "extra" in locals() else {},
success=True,
verification_passed=True,
file_size_before=file_size_before,
file_size_after=file_size_before,
)
# Apply changes to file system
file_size_after = 0
verification_passed = False
date_str = utcnow().strftime("%Y-%m-%d")
frontmatter_extra: Dict[str, Any] = {}
try:
updated_text, frontmatter_extra, frontmatter_line_count = _apply_frontmatter_pipeline(
original_parsed,
updated_body,
doc=doc,
project=project,
metadata=metadata,
date_str=date_str,
)
after_hash = _hash_text(updated_text)
frontmatter_only = updated_body == original_body and bool(frontmatter_extra.get("frontmatter_updated"))
frontmatter_extra.update(
{
"frontmatter_line_count": frontmatter_line_count,
"frontmatter_only": frontmatter_only,
}
)
if frontmatter_only:
diff_preview = ""
else:
diff_preview = body_diff_preview
except ValueError as exc:
raise DocumentOperationError(str(exc))
if not dry_run:
try:
doc_snapshots_enabled = True
try:
repo_config = RepoDiscovery.load_config(repo_root)
doc_snapshots_enabled = bool(getattr(repo_config, "doc_snapshots", True))
except Exception:
doc_snapshots_enabled = True
if doc_path.exists() and doc_snapshots_enabled:
try:
backup_path = await asyncio.to_thread(
preflight_backup,
doc_path,
repo_root=repo_root,
context={
"component": "files",
"op": "backup",
"project_name": project.get("name"),
},
)
frontmatter_extra.setdefault("preflight_backup", str(backup_path))
except Exception as exc:
raise DocumentOperationError(
f"DOC_SNAPSHOT_FAILED: {exc}"
) from exc
# Write the file
await async_atomic_write(doc_path, updated_text, mode="w", repo_root=repo_root)
# Verify the write was successful
verification_passed = await _verify_file_write(doc_path, updated_text, after_hash)
if not verification_passed:
raise DocumentVerificationError(f"File write verification failed for {doc_path}")
file_size_after = doc_path.stat().st_size
# Log successful operation
duration_ms = (time.time() - start_time) * 1000
_log_operation(
level="info",
operation="apply_doc_change",
doc=doc,
section=section,
action=action,
file_path=doc_path,
duration_ms=duration_ms,
file_size_before=file_size_before,
file_size_after=file_size_after,
success=True,
metadata={
"before_hash": before_hash,
"after_hash": after_hash,
"dry_run": dry_run
}
)
except Exception as e:
# Attempt rollback if write failed
try:
if original_text and doc_path.exists():
await async_atomic_write(doc_path, original_text, mode="w", repo_root=repo_root)
duration_ms = (time.time() - start_time) * 1000
_log_operation(
level="warning",
operation="apply_doc_change",
doc=doc,
section=section,
action=action,
file_path=doc_path,
duration_ms=duration_ms,
file_size_before=file_size_before,
file_size_after=file_size_before, # Back to original
success=False,
error_message="Write failed, rolled back successfully",
metadata={"rollback": True, "original_error": str(e)}
)
except Exception as rollback_error:
duration_ms = (time.time() - start_time) * 1000
_log_operation(
level="error",
operation="apply_doc_change",
doc=doc,
section=section,
action=action,
file_path=doc_path,
duration_ms=duration_ms,
file_size_before=file_size_before,
file_size_after=file_size_before,
success=False,
error_message=f"Write failed and rollback failed: {rollback_error}",
metadata={"rollback_failed": True, "original_error": str(e)}
)
raise DocumentOperationError(f"Failed to write document {doc_path}: {e}")
include_frontmatter_extra = bool(
isinstance(metadata, dict) and metadata.get("include_frontmatter_extra")
)
if not include_frontmatter_extra and frontmatter_extra:
frontmatter_extra = {
key: value
for key, value in frontmatter_extra.items()
if not key.startswith("frontmatter")
}
return DocChangeResult(
doc=doc,
section=section,
action=action,
path=doc_path,
before_hash=before_hash,
after_hash=after_hash,
content_written=updated_text,
diff_preview=diff_preview,
extra={
**(extra if "extra" in locals() else {}),
**frontmatter_extra,
},
success=True,
verification_passed=verification_passed,
file_size_before=file_size_before,
file_size_after=file_size_after,
)
except (DocumentValidationError, DocumentOperationError, DocumentVerificationError) as e:
duration_ms = (time.time() - start_time) * 1000
_log_operation(
level="error",
operation="apply_doc_change",
doc=doc,
section=section,
action=action,
file_path=Path(""),
duration_ms=duration_ms,
file_size_before=file_size_before,
file_size_after=0,
success=False,
error_message=str(e),
metadata={"error_type": type(e).__name__}
)
extra: Dict[str, Any] = {}
if isinstance(e, DocumentOperationError) and getattr(e, "extra", None):
extra.update(e.extra)
if "PATCH_STALE_SOURCE" in str(e):
extra.setdefault("precondition_failed", "SOURCE_HASH_MISMATCH")
return DocChangeResult(
doc=doc,
section=section,
action=action,
path=Path(""),
before_hash="",
after_hash="",
content_written="",
diff_preview="",
extra=extra,
success=False,
error_message=str(e),
verification_passed=False,
file_size_before=file_size_before,
file_size_after=0,
)
except Exception as e:
# Catch any unexpected errors
duration_ms = (time.time() - start_time) * 1000
_log_operation(
level="error",
operation="apply_doc_change",
doc=doc,
section=section,
action=action,
file_path=Path(""),
duration_ms=duration_ms,
file_size_before=file_size_before,
file_size_after=0,
success=False,
error_message=f"Unexpected error: {e}",
metadata={"error_type": type(e).__name__, "unexpected": True}
)
return DocChangeResult(
doc=doc,
section=section,
action=action,
path=Path(""),
before_hash="",
after_hash="",
content_written="",
diff_preview="",
extra={},
success=False,
error_message=f"Unexpected error: {e}",
verification_passed=False,
file_size_before=file_size_before,
file_size_after=0,
)
def _resolve_doc_path(project: Dict[str, Any], doc_key: str) -> Path:
"""
Resolve documentation path with safety assertions.
Ensures all resolved paths remain within the repository sandbox
and provides sensible fallbacks when explicit paths are not defined.
"""
# Validate inputs
if not doc_key:
raise ValueError("doc_key cannot be empty")
project_name = project.get("name")
if not project_name:
raise ValueError("project must have a name")
project_root = Path(project.get("root", ""))
if not project_root.exists():
raise ValueError(f"Project root does not exist: {project_root}")
# Try explicit paths first
docs = project.get("docs") or {}
target = docs.get(doc_key)
if not target and doc_key == "progress_log":
target = project.get("progress_log")
if target:
resolved_path = Path(target).resolve()
# CRITICAL: Ensure path is within project sandbox
try:
resolved_path.relative_to(project_root)
except ValueError as e:
raise SecurityError(f"Document path {resolved_path} is outside project root {project_root}") from e
doc_logger.debug(f"Resolved doc path for {doc_key} using explicit target: {resolved_path}")
return resolved_path
# Fallback to conventional structure
docs_dir = docs.get("architecture")
if docs_dir:
docs_dir = Path(docs_dir).parent
else:
# Use conventional docs structure
docs_dir = project_root / "docs" / "dev_plans" / slugify_project_name(project_name)
filename = {
"architecture": "ARCHITECTURE_GUIDE.md",
"phase_plan": "PHASE_PLAN.md",
"checklist": "CHECKLIST.md",
"progress_log": "PROGRESS_LOG.md",
"doc_log": "DOC_LOG.md",
"security_log": "SECURITY_LOG.md",
"bug_log": "BUG_LOG.md",
}.get(doc_key, f"{doc_key.upper()}.md")
resolved_path = (docs_dir / filename).resolve()
# CRITICAL: Ensure fallback path is within project sandbox
try:
resolved_path.relative_to(project_root)
except ValueError as e:
raise SecurityError(f"Fallback document path {resolved_path} is outside project root {project_root}") from e
doc_logger.debug(f"Resolved doc path for {doc_key} using fallback: {resolved_path}")
return resolved_path
def _resolve_create_doc_path(
project: Dict[str, Any],
metadata: Optional[Dict[str, Any]],
doc_key: str,
) -> Path:
project_root = Path(project.get("root", ""))
if not project_root.exists():
raise ValueError(f"Project root does not exist: {project_root}")
metadata = metadata or {}
doc_name = metadata.get("doc_name") or metadata.get("register_as") or metadata.get("doc_type") or doc_key
doc_name = str(doc_name or "").strip()
if not doc_name:
raise DocumentOperationError("CREATE_DOC_MISSING_NAME: doc_name or register_as is required")
safe_name = _slugify_filename(doc_name)
docs_dir = project.get("docs_dir")
if docs_dir:
docs_dir = Path(docs_dir)
else:
docs_dir = _resolve_doc_path(project, "architecture").parent
target_dir = metadata.get("target_dir")
if target_dir:
target_path = Path(target_dir)
if not target_path.is_absolute():
target_path = project_root / target_path
try:
target_path.resolve().relative_to(project_root.resolve())
except ValueError as exc:
raise SecurityError(
f"Target directory {target_path} is outside project root {project_root}"
) from exc
docs_dir = target_path
resolved_path = (docs_dir / f"{safe_name}.md").resolve()
try:
resolved_path.relative_to(project_root)
except ValueError as exc:
raise SecurityError(
f"Document path {resolved_path} is outside project root {project_root}"
) from exc
return resolved_path
class SecurityError(RuntimeError):
"""Security violation when document paths escape project sandbox."""
async def _render_content(
project: Dict[str, Any],
content: Optional[str],
template_name: Optional[str],
metadata: Optional[Dict[str, Any]],
) -> str:
"""Render content using Jinja2 template engine with fallback to direct content."""
# Handle case where metadata comes as JSON string from MCP framework
if metadata is None:
metadata = {}
elif isinstance(metadata, str):
try:
import json
metadata = json.loads(metadata)
except (json.JSONDecodeError, TypeError):
metadata = {}
# Check if template_name is actually a fallback content string from parameter validator
# Common fallback strings that should be treated as content, not template names
fallback_strings = {"No message provided", "Invalid message format", "Empty message"}
if template_name and template_name in fallback_strings:
# Treat fallback strings as content, not template names
if content is None:
content = template_name
template_name = None
if template_name:
try:
# Import template engine dynamically to avoid circular imports
from scribe_mcp.template_engine import Jinja2TemplateEngine, TemplateEngineError
# Initialize template engine
engine = Jinja2TemplateEngine(
project_root=Path(project.get("root", "")),
project_name=project.get("name", ""),
security_mode="sandbox"
)
# Add timestamp to metadata
enhanced_metadata = metadata.copy() if metadata else {}
enhanced_metadata.update({
"timestamp": utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
})
# Render template
rendered = engine.render_template(
template_name=f"{template_name}.md",
metadata=enhanced_metadata
)
doc_logger.debug(f"Successfully rendered template '{template_name}' using Jinja2 engine")
return rendered
except TemplateEngineError as e:
doc_logger.error(f"Template engine error for '{template_name}': {e}")
raise DocumentOperationError(f"Failed to render template '{template_name}': {e}")
except Exception as e:
doc_logger.error(f"Unexpected error rendering template '{template_name}': {e}")
raise DocumentOperationError(f"Unexpected template error: {e}")
if isinstance(content, str) and content:
# Preserve author-provided formatting, but allow inline Jinja rendering when present.
normalized = content.replace("\r\n", "\n")
if "{{" in normalized or "{%" in normalized or "{#" in normalized:
try:
from scribe_mcp.template_engine import Jinja2TemplateEngine, TemplateEngineError
engine = Jinja2TemplateEngine(
project_root=Path(project.get("root", "")),
project_name=project.get("name", ""),
security_mode="sandbox",
)
enhanced_metadata = metadata.copy() if metadata else {}
enhanced_metadata.setdefault(
"timestamp", utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
)
rendered_inline = engine.render_string(
normalized,
metadata=enhanced_metadata,
strict=False,
fallback=True,
)
return rendered_inline
except Exception as e: # pragma: no cover - defensive
doc_logger.error(f"Inline Jinja rendering failed, returning raw content: {e}")
return normalized
return normalized
raise ValueError("Either content or template must be provided.")
async def _load_fragment(name: str) -> str:
fragment = (FRAGMENT_DIR / f"{name}.md").resolve()
if not fragment.exists():
raise FileNotFoundError(f"Template fragment '{name}' not found under {FRAGMENT_DIR}")
return await asyncio.to_thread(fragment.read_text, encoding="utf-8")
def _replace_section(text: str, section: Optional[str], content: str, *, allow_append: bool = False) -> str:
marker = SECTION_MARKER.format(section=section)
idx = text.find(marker)
if idx == -1:
if not allow_append:
raise DocumentOperationError(
f"SECTION_ANCHOR_MISSING: '{section}' not found (set metadata.allow_append=true to append)"
)
doc_logger.warning(
"Section anchor missing; auto-appending '%s' to document.",
section,
extra={"section": section, "action": "replace_section"},
)
prefix = text.rstrip()
if prefix:
prefix = prefix + "\n\n"
return prefix + marker + "\n" + content.strip() + "\n"
if text.find(marker, idx + 1) != -1:
raise DocumentOperationError(
f"SECTION_ANCHOR_AMBIGUOUS: '{section}' appears multiple times; resolve duplicates first"
)
start = idx + len(marker)
# Skip newline right after marker
if start < len(text) and text[start] == "\r":
start += 1
if start < len(text) and text[start] == "\n":
start += 1
next_marker = text.find("<!-- ID:", start)
if next_marker == -1:
next_marker = len(text)
new_block = marker + "\n" + content.strip() + "\n"
replacement = text[:idx] + new_block + text[next_marker:]
return replacement
def _replace_text_literal(
text: str,
find_text: str,
replace_text: str,
*,
replace_all: bool,
) -> tuple[str, int]:
if replace_all:
return text.replace(find_text, replace_text), text.count(find_text)
return text.replace(find_text, replace_text, 1), (1 if find_text in text else 0)
def _replace_text_regex(
text: str,
pattern: str,
replace_text: str,
*,
replace_all: bool,
) -> tuple[str, int]:
compiled = re.compile(pattern)
count = 0 if replace_all else 1
updated, hits = compiled.subn(replace_text, text, count=count)
return updated, hits
def _replace_text_with_scope(
text: str,
*,
find_text: str,
replace_text: str,
match_mode: str,
replace_all: bool,
scope: Optional[str],
allow_no_match: bool,
) -> tuple[str, int]:
target_text = text
prefix = ""
suffix = ""
marker = None
if scope:
normalized = scope.strip()
if normalized.startswith("section:"):
section_id = normalized.split(":", 1)[1].strip()
if not section_id:
raise DocumentOperationError("REPLACE_TEXT_SCOPE_INVALID: section id missing")
marker = SECTION_MARKER.format(section=section_id)
idx = text.find(marker)
if idx == -1:
raise DocumentOperationError(
f"SECTION_ANCHOR_MISSING: '{section_id}' not found in replace_text scope"
)
if text.find(marker, idx + 1) != -1:
raise DocumentOperationError(
f"SECTION_ANCHOR_AMBIGUOUS: '{section_id}' appears multiple times"
)
start = idx + len(marker)
if start < len(text) and text[start] == "\r":
start += 1
if start < len(text) and text[start] == "\n":
start += 1
next_marker = text.find("<!-- ID:", start)
if next_marker == -1:
next_marker = len(text)
prefix = text[:start]
target_text = text[start:next_marker]
suffix = text[next_marker:]
if match_mode == "regex":
updated, hits = _replace_text_regex(
target_text,
find_text,
replace_text,
replace_all=replace_all,
)
else:
updated, hits = _replace_text_literal(
target_text,
find_text,
replace_text,
replace_all=replace_all,
)
if hits == 0 and not allow_no_match:
raise DocumentOperationError("REPLACE_TEXT_NO_MATCH: no matches found")
return prefix + updated + suffix, hits
def _append_block(text: str, content: str, section: Optional[str] = None, position: str = "after") -> str:
normalized = content.strip()
if not normalized:
return text
if not section:
if not text.endswith("\n"):
text += "\n"
if normalized:
return text + normalized + "\n"
return text
marker = SECTION_MARKER.format(section=section)
idx = text.find(marker)
if idx == -1:
doc_logger.warning(
"Append anchor '%s' missing; creating new block.",
section,
extra={"section": section, "action": "append"},
)
prefix = text.rstrip()
if prefix:
prefix = prefix + "\n\n"
return prefix + marker + "\n" + normalized + "\n"
after_marker = idx + len(marker)
while after_marker < len(text) and text[after_marker] in "\r\n":
after_marker += 1
next_marker = text.find("<!-- ID:", after_marker)
if next_marker == -1:
next_marker = len(text)
position = position.lower()
if position == "before":
insertion_point = idx
elif position in {"inside", "within", "start"}:
insertion_point = after_marker
else:
insertion_point = next_marker
prefix = text[:insertion_point]
suffix = text[insertion_point:]
insertion = normalized
if insertion and not insertion.endswith("\n"):
insertion += "\n"
if prefix and not prefix.endswith("\n"):
insertion = "\n" + insertion
if suffix and not suffix.startswith("\n"):
insertion = insertion + "\n"
return prefix + insertion + suffix
def _line_matches(expected: str, actual: str) -> bool:
if actual == expected:
return True
if expected.endswith("\n") and actual == expected[:-1]:
return True
if actual.endswith("\n") and expected == actual[:-1]:
return True
return False
def _format_hunk_header(old_start: int, old_count: int, new_start: int, new_count: int) -> str:
return f"@@ -{old_start},{old_count} +{new_start},{new_count} @@"
def _parse_patch_hunks(patch_text: str) -> tuple[list[str], list[Dict[str, Any]]]:
"""Parse unified diff into header lines and hunk definitions."""
lines = patch_text.splitlines(keepends=False)
header_lines: list[str] = []
hunks: list[Dict[str, Any]] = []
i = 0
while i < len(lines) and (lines[i].startswith("--- ") or lines[i].startswith("+++ ")):
header_lines.append(lines[i])
i += 1
while i < len(lines):
line = lines[i]
match = _HUNK_HEADER.match(line)
if not match:
if line.strip() == "":
i += 1
continue
raise DocumentOperationError(f"PATCH_INVALID_FORMAT: expected hunk header: {line!r}")
hunk_lines: list[str] = []
header = line
i += 1
while i < len(lines) and not lines[i].startswith("@@ "):
hunk_lines.append(lines[i])
i += 1
hunks.append(
{
"header": header,
"lines": hunk_lines,
}
)
return header_lines, hunks
def _collect_hunk_original_lines(hunk_lines: list[str]) -> list[str]:
"""Return original file lines implied by a hunk (context + deletions)."""
original_lines: list[str] = []
for line in hunk_lines:
if not line or line.startswith("\\"):
continue
prefix = line[0]
if prefix in {" ", "-"}:
original_lines.append(f"{line[1:]}\n")
return original_lines
def _find_sequence_indices(haystack: list[str], needle: list[str]) -> list[int]:
if not needle:
return []
matches: list[int] = []
limit = len(haystack) - len(needle)
for idx in range(limit + 1):
if haystack[idx: idx + len(needle)] == needle:
matches.append(idx)
return matches
def _find_alignment_with_one_line_gaps(
original_lines: list[str], target_lines: list[str]
) -> Optional[list[int]]:
"""Find a unique alignment allowing a single skipped line between matches."""
if not target_lines:
return None
alignments: list[list[int]] = []
def dfs(target_index: int, next_start: int, positions: list[int]) -> None:
if len(alignments) > 1:
return
if target_index >= len(target_lines):
alignments.append(positions)
return
for offset in (0, 1):
candidate_index = next_start + offset
if candidate_index >= len(original_lines):
continue
if _line_matches(target_lines[target_index], original_lines[candidate_index]):
dfs(target_index + 1, candidate_index + 1, positions + [candidate_index])
return
for index, line in enumerate(original_lines):
if _line_matches(target_lines[0], line):
dfs(1, index + 1, [index])
if len(alignments) > 1:
break
if len(alignments) == 1:
return alignments[0]
return None
def _expand_hunk_with_one_line_gaps(
original_lines: list[str], hunk_lines: list[str]
) -> Optional[tuple[list[str], Dict[str, Any]]]:
"""Insert missing single-gap context lines into hunk when alignment is unique."""
effective_line_indices = [
idx for idx, line in enumerate(hunk_lines) if line and line[0] in {" ", "-"}
]
target_lines = [
f"{hunk_lines[idx][1:]}\n" for idx in effective_line_indices
]
if not target_lines:
return None
alignment = _find_alignment_with_one_line_gaps(original_lines, target_lines)
if alignment is None:
return None
mapping = {hunk_idx: position for hunk_idx, position in zip(effective_line_indices, alignment)}
expanded: list[str] = []
gap_lines_added: list[int] = []
for hunk_index, line in enumerate(hunk_lines):
expanded.append(line)
if hunk_index not in mapping:
continue
current_pos = mapping[hunk_index]
next_effective_indices = [idx for idx in effective_line_indices if idx > hunk_index]
if not next_effective_indices:
continue
next_hunk_index = next_effective_indices[0]
next_pos = mapping.get(next_hunk_index)
if next_pos is None:
continue
gap = next_pos - current_pos
if gap == 2:
missing_line = original_lines[current_pos + 1].rstrip("\n")
missing_line = missing_line.rstrip("\r")
expanded.append(f" {missing_line}")
gap_lines_added.append(current_pos + 2)
elif gap > 2:
return None
if not gap_lines_added:
return None
return expanded, {"gap_lines_added": gap_lines_added}
def _rebase_patch_to_current_context(
original_text: str, patch_text: str
) -> tuple[str, Dict[str, Any]]:
"""Rebase patch hunk headers to current file context using exact matches."""
header_lines, hunks = _parse_patch_hunks(patch_text)
original_lines = original_text.splitlines(keepends=True)
rebased_lines: list[str] = []
if header_lines:
rebased_lines.extend(header_lines)
rebase_info: Dict[str, Any] = {"hunks": []}
for hunk in hunks:
hunk_lines = hunk["lines"]
original_hunk_lines = _collect_hunk_original_lines(hunk_lines)
matches = _find_sequence_indices(original_lines, original_hunk_lines)
context_expanded = False
expansion_info: Dict[str, Any] = {}
if len(matches) != 1:
expanded = _expand_hunk_with_one_line_gaps(original_lines, hunk_lines)
if expanded:
hunk_lines, expansion_info = expanded
context_expanded = True
original_hunk_lines = _collect_hunk_original_lines(hunk_lines)
matches = _find_sequence_indices(original_lines, original_hunk_lines)
if len(matches) != 1:
reason = "ambiguous" if matches else "missing"
extra = {
"rebase_reason": reason,
"match_candidates": [m + 1 for m in matches],
}
if context_expanded:
extra["context_expanded"] = True
extra.update(expansion_info)
raise DocumentOperationError(
f"PATCH_CONTEXT_REBASE_FAILED: {reason} context match",
extra=extra,
)
start_index = matches[0]
old_count = sum(1 for line in hunk_lines if line and line[0] in {" ", "-"})
new_count = sum(1 for line in hunk_lines if line and line[0] in {" ", "+"})
old_start = start_index + 1
new_start = old_start
rebased_lines.append(_format_hunk_header(old_start, old_count, new_start, new_count))
rebased_lines.extend(hunk_lines)
rebase_info["hunks"].append(
{
"start_line": old_start,
"end_line": old_start + max(old_count - 1, 0),
"match_start_line": old_start,
"match_end_line": old_start + max(old_count - 1, 0),
"context_expanded": context_expanded,
**expansion_info,
}
)
return ("\n".join(rebased_lines) + "\n", rebase_info)
def _extract_expected_line(error_message: str) -> Optional[str]:
if "expected=" not in error_message:
return None
prefix = "expected="
try:
expected_part = error_message.split(prefix, 1)[1]
expected_repr = expected_part.split(" got=", 1)[0]
return ast.literal_eval(expected_repr)
except (ValueError, SyntaxError):
return None
def _build_patch_failure_diagnostics(
original_text: str,
patch_text: str,
error_message: str,
) -> Dict[str, Any]:
diagnostics: Dict[str, Any] = {}
if error_message.startswith("PATCH_"):
diagnostics["error_code"] = error_message.split(":", 1)[0]
if diagnostics["error_code"] == "PATCH_RANGE_ERROR":
diagnostics["failure_stage"] = "range_validation"
else:
diagnostics["failure_stage"] = "apply_patch"
expected_line = _extract_expected_line(error_message)
if expected_line:
diagnostics["expected_line"] = expected_line
line_numbers: list[int] = []
for idx, line in enumerate(original_text.splitlines(keepends=True), start=1):
if _line_matches(expected_line, line):
line_numbers.append(idx)
if len(line_numbers) >= 5:
break
if line_numbers:
diagnostics["matching_line_numbers"] = line_numbers
try:
_, hunks = _parse_patch_hunks(patch_text)
original_lines = original_text.splitlines(keepends=True)
ranges: list[Dict[str, Any]] = []
for hunk in hunks:
original_hunk_lines = _collect_hunk_original_lines(hunk["lines"])
matches = _find_sequence_indices(original_lines, original_hunk_lines)
if len(matches) == 1:
start = matches[0] + 1
end = start + max(len(original_hunk_lines) - 1, 0)
ranges.append({"start_line": start, "end_line": end})
if ranges:
diagnostics["current_line_ranges"] = ranges
except DocumentOperationError:
pass
diagnostics.setdefault(
"hint",
"PATCH_CONTEXT_MISMATCH: file changed or hunk header out of date; re-extract context from current file.",
)
return diagnostics
def _compute_patch_ranges(patch_text: str) -> list[Dict[str, int]]:
ranges: list[Dict[str, int]] = []
_, hunks = _parse_patch_hunks(patch_text)
for hunk in hunks:
header = hunk["header"]
match = _HUNK_HEADER.match(header)
if not match:
continue
old_start = int(match.group(1))
old_count = int(match.group(2) or "1")
end_line = old_start + max(old_count - 1, 0)
ranges.append({"start_line": old_start, "end_line": end_line})
return ranges
def _build_bounded_preview(
original_text: str,
updated_text: str,
ranges: list[Dict[str, int]],
context: int = 3,
) -> Dict[str, Any]:
if not ranges:
return {}
first_range = ranges[0]
start_line = max(1, first_range["start_line"] - context)
end_line = first_range["end_line"] + context
original_lines = original_text.splitlines()
updated_lines = updated_text.splitlines()
bounded_end = min(len(updated_lines), end_line)
bounded_start = min(start_line, bounded_end if bounded_end else start_line)
return {
"start_line": bounded_start,
"end_line": bounded_end,
"before": "\n".join(original_lines[bounded_start - 1: bounded_end]),
"after": "\n".join(updated_lines[bounded_start - 1: bounded_end]),
}
def _build_bounded_previews(
original_text: str,
updated_text: str,
ranges: list[Dict[str, int]],
context: int = 3,
) -> list[Dict[str, Any]]:
previews: list[Dict[str, Any]] = []
for entry in ranges:
previews.append(
_build_bounded_preview(
original_text,
updated_text,
[entry],
context=context,
)
)
return previews
def _apply_unified_patch(original_text: str, patch_text: str) -> tuple[str, int]:
"""Apply a unified diff patch to original_text with strict context matching."""
if not patch_text or not patch_text.strip():
raise DocumentOperationError("apply_patch requires non-empty patch text")
original_lines = original_text.splitlines(keepends=True)
lines = patch_text.splitlines(keepends=False)
i = 0
while i < len(lines) and (lines[i].startswith("--- ") or lines[i].startswith("+++ ")):
i += 1
output: list[str] = []
original_index = 0
hunks_applied = 0
while i < len(lines):
header = lines[i]
match = _HUNK_HEADER.match(header)
if not match:
if header.strip() == "":
i += 1
continue
raise DocumentOperationError(f"PATCH_INVALID_FORMAT: expected hunk header: {header!r}")
old_start = int(match.group(1))
target_index = old_start - 1
if target_index < original_index or target_index > len(original_lines):
raise DocumentOperationError("PATCH_RANGE_ERROR: hunk target out of range for original content")
output.extend(original_lines[original_index:target_index])
original_index = target_index
i += 1
while i < len(lines) and not lines[i].startswith("@@ "):
line = lines[i]
if line.startswith("\\"):
i += 1
continue
if not line:
raise DocumentOperationError("PATCH_INVALID_FORMAT: empty line without prefix")
prefix = line[0]
text = line[1:]
patched_line = f"{text}\n"
if prefix == " ":
if original_index >= len(original_lines) or not _line_matches(
patched_line, original_lines[original_index]
):
got = original_lines[original_index] if original_index < len(original_lines) else "<EOF>"
raise DocumentOperationError(
f"PATCH_CONTEXT_MISMATCH: expected={patched_line!r} got={got!r}"
)
output.append(original_lines[original_index])
original_index += 1
elif prefix == "-":
if original_index >= len(original_lines) or not _line_matches(
patched_line, original_lines[original_index]
):
got = original_lines[original_index] if original_index < len(original_lines) else "<EOF>"
raise DocumentOperationError(
f"PATCH_DELETE_MISMATCH: expected={patched_line!r} got={got!r}"
)
original_index += 1
elif prefix == "+":
output.append(patched_line)
else:
raise DocumentOperationError(f"PATCH_INVALID_FORMAT: invalid line prefix {prefix!r}")
i += 1
hunks_applied += 1
output.extend(original_lines[original_index:])
return ("".join(output), hunks_applied)
def _is_patch_context_error(error_message: str) -> bool:
return any(
code in error_message
for code in (
"PATCH_CONTEXT_MISMATCH",
"PATCH_DELETE_MISMATCH",
"PATCH_RANGE_ERROR",
)
)
def _replace_range_text(
original_text: str,
start_line: Optional[int],
end_line: Optional[int],
replacement: str,
) -> str:
"""Replace inclusive line range [start_line, end_line] (1-based) or homologous section."""
allow_header_fallback = start_line is not None and end_line is not None
header_replacement = _replace_section_by_header(
original_text, replacement, allow_missing_header_fallback=allow_header_fallback
)
if header_replacement is not None:
return header_replacement
if start_line is None or end_line is None:
raise DocumentOperationError("replace_range requires start_line and end_line")
if start_line < 1 or end_line < start_line:
raise DocumentOperationError(f"Invalid range: start_line={start_line} end_line={end_line}")
lines = original_text.splitlines(keepends=True)
if start_line > len(lines) + 1:
raise DocumentOperationError("start_line out of range")
if end_line > len(lines):
raise DocumentOperationError("end_line out of range")
repl = replacement.replace("\r\n", "\n")
if repl and not repl.endswith("\n"):
repl += "\n"
new_lines = lines[: start_line - 1] + ([repl] if repl else []) + lines[end_line:]
return "".join(new_lines)
def _replace_section_by_header(
original_text: str,
replacement: str,
*,
allow_missing_header_fallback: bool = False,
) -> Optional[str]:
"""Replace a Markdown header section when the replacement starts with a header.
Section replacement is attempted before falling back to numeric ranges. When a header
is present, it must match exactly (level and title) and only level ≥ 2 is considered.
Headers inside fenced code blocks are ignored, so replacements remain deterministic.
If the header is missing or ambiguous and fallback is allowed (both start/end provided),
no error is raised so the caller can continue with the provided numeric range.
"""
header_info = _extract_replacement_header(replacement)
if header_info is None:
return None
level, title = header_info
header_repr = f"{('#' * level)} {title}"
headers = _collect_markdown_headers(original_text)
matching_sections = [
header for header in headers if header["level"] == level and header["title"] == title
]
if not matching_sections:
if allow_missing_header_fallback:
return None
raise DocumentOperationError(f"SECTION_HEADER_NOT_FOUND: {header_repr} is missing")
if len(matching_sections) > 1:
line_numbers = ", ".join(str(section["line_number"]) for section in matching_sections)
raise DocumentOperationError(
f"SECTION_HEADER_AMBIGUOUS: {header_repr} matches multiple sections (lines {line_numbers})"
)
target = matching_sections[0]
section_end = len(original_text)
for header in headers:
if header["start"] <= target["start"]:
continue
if header["level"] <= level:
section_end = header["start"]
break
insertion = replacement
if insertion and not insertion.endswith("\n"):
insertion += "\n"
return original_text[: target["start"]] + insertion + original_text[section_end:]
def _collect_markdown_headers(text: str) -> list[dict[str, Any]]:
"""Return metadata for every Markdown header in `text`, skipping fenced code blocks."""
headers: list[dict[str, Any]] = []
lines = text.splitlines(keepends=True)
in_fence = False
position = 0
for idx, line in enumerate(lines):
line_start = position
stripped = line.lstrip()
if stripped.startswith("```") or stripped.startswith("~~~"):
in_fence = not in_fence
position += len(line)
continue
if in_fence:
position += len(line)
continue
match = _HEADER_LINE_PATTERN.match(stripped)
if match:
level = len(match.group(1))
title = match.group(2).strip()
if title:
headers.append(
{
"level": level,
"title": title,
"start": line_start,
"line_number": idx + 1,
}
)
position += len(line)
return headers
def _extract_replacement_header(content: str) -> Optional[tuple[int, str]]:
"""Return (level, title) if the content starts with a Markdown header (## or deeper)."""
for line in content.splitlines():
trimmed_line = line.strip()
if not trimmed_line:
continue
match = _HEADER_LINE_PATTERN.match(line.lstrip())
if not match:
return None
level = len(match.group(1))
if level < 2:
return None
title = match.group(2).strip()
if not title:
return None
return level, title
return None
def _replace_block_text(original_text: str, anchor: str, replacement: str) -> str:
"""Replace a block starting at the anchor line through the next blank line."""
if not anchor:
raise DocumentOperationError("STRUCTURED_EDIT_ANCHOR_REQUIRED: anchor is required")
lines = original_text.splitlines()
matches: list[int] = []
in_fence = False
for idx, line in enumerate(lines):
stripped = line.lstrip()
if stripped.startswith("```") or stripped.startswith("~~~"):
in_fence = not in_fence
continue
if in_fence:
continue
if anchor in line:
matches.append(idx)
if not matches:
raise DocumentOperationError(
f"STRUCTURED_EDIT_ANCHOR_NOT_FOUND: anchor '{anchor}' not found"
)
if len(matches) > 1:
match_lines = ", ".join(f"line {idx + 1}" for idx in matches)
raise DocumentOperationError(
"STRUCTURED_EDIT_ANCHOR_AMBIGUOUS\n"
f"anchor: \"{anchor}\"\n"
f"matches: [{match_lines}]"
)
start_index = matches[0]
end_index = start_index
while end_index < len(lines) and lines[end_index].strip() != "":
end_index += 1
replacement_lines = replacement.splitlines()
new_lines = lines[:start_index] + replacement_lines + lines[end_index:]
return "\n".join(new_lines)
def _normalize_headers_text(original_text: str) -> str:
"""Normalize markdown header numbering, skipping fenced code blocks."""
import re
lines = original_text.splitlines()
counters = [0] * 6
in_fence = False
output: list[str] = []
index = 0
def _next_prefix(level: int) -> str:
for idx in range(level - 1):
if counters[idx] == 0:
counters[idx] = 1
counters[level - 1] += 1
for idx in range(level, 6):
counters[idx] = 0
return ".".join(str(value) for value in counters[:level])
while index < len(lines):
line = lines[index]
stripped = line.lstrip()
if stripped.startswith("```") or stripped.startswith("~~~"):
in_fence = not in_fence
output.append(line)
index += 1
continue
if in_fence:
output.append(line)
index += 1
continue
if line.strip() and index + 1 < len(lines):
underline = lines[index + 1].strip()
if re.match(r"^={3,}$", underline) or re.match(r"^-{3,}$", underline):
level = 1 if underline.startswith("=") else 2
title = line.strip()
title = re.sub(r"^\d+(?:\.\d+)*[.)]?\s+", "", title)
if title:
prefix = _next_prefix(level)
output.append(f"{'#' * level} {prefix} {title}")
index += 2
continue
match = re.match(r"^(#{1,6})(\s*)(.*)$", line)
if match:
hashes, _, title = match.groups()
title = title.strip()
title = re.sub(r"^\d+(?:\.\d+)*[.)]?\s+", "", title)
if title:
level = len(hashes)
prefix = _next_prefix(level)
output.append(f"{hashes} {prefix} {title}")
index += 1
continue
output.append(line)
index += 1
normalized = "\n".join(output)
if original_text.endswith("\n"):
normalized += "\n"
return normalized
def _build_github_anchor(text: str, counts: Dict[str, int]) -> str:
"""Generate a GitHub-style anchor slug with de-duplication."""
import re
import unicodedata
normalized = unicodedata.normalize("NFKD", text)
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
slug = ascii_text.strip().lower()
slug = slug.replace("_", "-")
slug = re.sub(r"[^\w\s-]", "", slug)
slug = re.sub(r"\s+", "-", slug)
slug = re.sub(r"-{2,}", "-", slug).strip("-")
if not slug:
slug = "section"
count = counts.get(slug, 0)
counts[slug] = count + 1
if count:
return f"{slug}-{count}"
return slug
def _generate_toc_text(original_text: str) -> str:
"""Generate or update a TOC block based on headers, skipping fenced code."""
import re
lines = original_text.splitlines()
toc_start = "<!-- TOC:start -->"
toc_end = "<!-- TOC:end -->"
in_fence = False
in_toc = False
filtered_lines: list[str] = []
for line in lines:
stripped = line.lstrip()
if stripped.startswith("```") or stripped.startswith("~~~"):
in_fence = not in_fence
if line.strip() == toc_start:
in_toc = True
continue
if line.strip() == toc_end:
in_toc = False
continue
if in_toc:
continue
filtered_lines.append(line)
toc_lines: list[str] = []
anchor_counts: Dict[str, int] = {}
in_fence = False
index = 0
while index < len(filtered_lines):
line = filtered_lines[index]
stripped = line.lstrip()
if stripped.startswith("```") or stripped.startswith("~~~"):
in_fence = not in_fence
index += 1
continue
if in_fence:
index += 1
continue
if line.strip() and index + 1 < len(filtered_lines):
underline = filtered_lines[index + 1].strip()
if re.match(r"^={3,}$", underline) or re.match(r"^-{3,}$", underline):
level = 1 if underline.startswith("=") else 2
title = line.strip()
if title:
anchor = _build_github_anchor(title, anchor_counts)
indent = " " * (level - 1)
toc_lines.append(f"{indent}- [{title}](#{anchor})")
index += 2
continue
match = re.match(r"^(#{1,6})(\s*)(.*)$", line)
if match:
hashes, _, title = match.groups()
title = title.strip()
if title:
level = len(hashes)
anchor = _build_github_anchor(title, anchor_counts)
indent = " " * (level - 1)
toc_lines.append(f"{indent}- [{title}](#{anchor})")
index += 1
toc_block = [toc_start]
toc_block.extend(toc_lines)
toc_block.append(toc_end)
body_lines = lines
start_idx = None
end_idx = None
for idx, line in enumerate(lines):
if line.strip() == toc_start and start_idx is None:
start_idx = idx
if line.strip() == toc_end and start_idx is not None:
end_idx = idx
break
if start_idx is not None and end_idx is not None and end_idx >= start_idx:
body_lines = lines[:start_idx] + toc_block + lines[end_idx + 1 :]
else:
body_lines = toc_block + [""] + lines
rendered = "\n".join(body_lines)
if original_text.endswith("\n"):
rendered += "\n"
return rendered
def _apply_structured_edit(
original_text: str,
edit: Dict[str, Any],
*,
allow_append: bool = False,
) -> str:
"""Apply a structured edit specification to text and return updated content."""
edit_type = str(edit.get("type") or "").strip().lower()
if not edit_type:
raise DocumentOperationError("STRUCTURED_EDIT_TYPE_REQUIRED: edit.type is required")
if edit_type == "replace_range":
start_line = edit.get("start_line")
end_line = edit.get("end_line")
content = edit.get("content", "")
if start_line is None or end_line is None:
raise DocumentOperationError(
"STRUCTURED_EDIT_RANGE_REQUIRED: start_line and end_line are required"
)
return _replace_range_text(original_text, int(start_line), int(end_line), str(content))
if edit_type == "replace_block":
anchor = str(edit.get("anchor") or "").strip()
content = edit.get("new_content", edit.get("content", ""))
return _replace_block_text(original_text, anchor, str(content))
if edit_type == "replace_section":
section = str(edit.get("section") or "").strip()
content = edit.get("content", "")
if not section:
raise DocumentOperationError("STRUCTURED_EDIT_SECTION_REQUIRED: section is required")
return _replace_section(original_text, section, str(content), allow_append=allow_append)
raise DocumentOperationError(f"STRUCTURED_EDIT_UNSUPPORTED: {edit_type}")
def _toggle_checklist_status(text: str, section: Optional[str], metadata: Dict[str, Any]) -> str:
desired_raw = metadata.get("status")
desired = desired_raw.lower().strip() if isinstance(desired_raw, str) else None
proof = metadata.get("proof")
label = metadata.get("label") or metadata.get("item") or metadata.get("text") or metadata.get("title")
done_states = {"done", "completed", "complete", "true", "yes", "checked", "finished"}
pending_states = {"pending", "todo", "not done", "open", "incomplete", "undone"}
def resolve_token(existing_line: Optional[str]) -> str:
if desired in done_states:
return "[x]"
if desired in pending_states:
return "[ ]"
if existing_line:
return "[x]" if "- [x]" in existing_line else "[ ]"
return "[x]"
lines = text.splitlines()
section_marker = SECTION_MARKER.format(section=section) if section else None
section_start_idx: Optional[int] = None
section_end_idx: int = len(lines)
if section_marker:
for idx, line in enumerate(lines):
if line.strip() == section_marker:
section_start_idx = idx
break
if section_start_idx is not None:
for idx in range(section_start_idx + 1, len(lines)):
stripped = lines[idx].strip()
if stripped.startswith("<!-- ID:") and stripped != section_marker:
section_end_idx = idx
break
else:
# Auto-heal missing anchor: append new section with checklist entry.
doc_logger.warning(
"Checklist section anchor '%s' missing; creating new block.",
section,
extra={"section": section, "action": "status_update"},
)
token = resolve_token(None)
entry_label = label or (section.replace("_", " ").title() if section else "Checklist item")
new_line = f"- {token} {entry_label}"
if proof:
new_line += f" | proof={proof}"
prefix = text.rstrip()
if prefix:
prefix = prefix + "\n\n"
return prefix + section_marker + "\n" + new_line + "\n"
replacement = False
search_start = (section_start_idx + 1) if section_start_idx is not None else 0
search_end = section_end_idx
for idx in range(search_start, search_end):
line = lines[idx]
if "- [ ]" not in line and "- [x]" not in line:
continue
token = resolve_token(line)
if "- [x]" in line:
new_line = line.replace("- [x]", f"- {token}", 1)
else:
new_line = line.replace("- [ ]", f"- {token}", 1)
parts = new_line.split(" | ")
prefix = parts[0].rstrip()
other_tokens = [part for part in parts[1:] if not part.startswith("proof=")]
if proof:
other_tokens.append(f"proof={proof}")
new_line = " | ".join([prefix] + other_tokens) if other_tokens else prefix
lines[idx] = new_line
replacement = True
break
if not replacement:
token = resolve_token(None)
entry_label = label or (section.replace("_", " ").title() if section else "Checklist item")
new_line = f"- {token} {entry_label}"
if proof:
new_line += f" | proof={proof}"
insertion_index = search_end
lines.insert(insertion_index, new_line)
replacement = True
updated_text = "\n".join(lines)
if text.endswith("\n"):
return updated_text + "\n"
return updated_text
def _extract_title(text: str, fallback: str) -> str:
for line in text.splitlines():
stripped = line.strip()
if stripped.startswith("#"):
return stripped.lstrip("#").strip()
return fallback
def _default_frontmatter(
doc: str,
project_name: str,
body_text: str,
date_str: str,
) -> Dict[str, Any]:
from scribe_mcp.tools.project_utils import slugify_project_name
project_slug = slugify_project_name(project_name or "project")
title = _extract_title(body_text, doc.replace("_", " ").title())
return {
"id": f"{project_slug}-{doc}",
"title": title,
"doc_type": doc,
"category": "engineering",
"status": "draft",
"version": "0.1",
"last_updated": date_str,
"maintained_by": "Corta Labs",
"created_by": "Corta Labs",
"owners": [],
"related_docs": [],
"tags": [],
"summary": "",
}
def _apply_frontmatter_pipeline(
parsed: "FrontmatterResult",
updated_body: str,
*,
doc: str,
project: Dict[str, Any],
metadata: Optional[Dict[str, Any]],
date_str: str,
) -> tuple[str, Dict[str, Any], int]:
if metadata and metadata.get("frontmatter_disable") is True:
line_count = len(parsed.frontmatter_raw.splitlines()) if parsed.has_frontmatter else 0
if parsed.has_frontmatter:
return (
parsed.frontmatter_raw + updated_body,
{"frontmatter_updated": False},
line_count,
)
updated_parsed = parse_frontmatter(updated_body)
if updated_parsed.has_frontmatter:
line_count = len(updated_parsed.frontmatter_raw.splitlines())
return (
updated_body,
{"frontmatter_updated": False},
line_count,
)
return updated_body, {"frontmatter_updated": False}, 0
updates: Dict[str, Any] = {}
if isinstance(metadata, dict) and isinstance(metadata.get("frontmatter"), dict):
updates.update(metadata["frontmatter"])
updates["last_updated"] = date_str
if not parsed.has_frontmatter:
updated_parsed = parse_frontmatter(updated_body)
if updated_parsed.has_frontmatter:
if updates:
frontmatter_raw, merged = apply_frontmatter_updates(
updated_parsed.frontmatter_raw,
updated_parsed.frontmatter_data,
updates,
)
line_count = len(frontmatter_raw.splitlines())
new_text = frontmatter_raw + updated_parsed.body
return (
new_text,
{
"frontmatter_updated": True,
"frontmatter_created": False,
"frontmatter": merged,
},
line_count,
)
line_count = len(updated_parsed.frontmatter_raw.splitlines())
return (
updated_body,
{"frontmatter_updated": False, "frontmatter_created": False},
line_count,
)
defaults = _default_frontmatter(doc, project.get("name", ""), updated_body, date_str)
defaults.update(updates)
frontmatter_block = build_frontmatter(defaults)
line_count = len(frontmatter_block.splitlines())
new_text = frontmatter_block + updated_body
return new_text, {"frontmatter_updated": True, "frontmatter_created": True}, line_count
frontmatter_raw, merged = apply_frontmatter_updates(
parsed.frontmatter_raw, parsed.frontmatter_data, updates
)
line_count = len(frontmatter_raw.splitlines())
new_text = frontmatter_raw + updated_body
return new_text, {"frontmatter_updated": True, "frontmatter_created": False, "frontmatter": merged}, line_count
def _validate_comparison_symbols(value: Any) -> bool:
"""Validate that comparison symbols in strings are properly handled."""
if not isinstance(value, str):
return True
# Check for patterns that look like they're meant to be numeric comparisons
# These are the cases that typically cause "not supported between str and float" errors
import re
# Pattern: numeric value followed by comparison operator followed by numeric value
# e.g., "5 > 3", "10.5 <= 20", etc.
numeric_comparison_pattern = r'^\s*\d+\.?\d*\s*[><=]+\s*\d+\.?\d*\s*$'
if re.match(numeric_comparison_pattern, value.strip()):
# This looks like a numeric comparison that should be handled elsewhere
return False
# Also check for problematic metadata values that might be compared as numbers
if value.strip().isdigit() or (value.replace('.', '', 1).isdigit() and '.' in value):
# Pure numeric values might cause issues in string vs numeric comparisons
return True # Allow these, but be cautious
return True
def _slugify_filename(value: str) -> str:
import re
slug = re.sub(r"[^\w\-.]+", "_", value.strip())
slug = re.sub(r"_+", "_", slug).strip("_")
return slug or "document"
def _build_create_doc_body(
content: Optional[str],
metadata: Optional[Dict[str, Any]],
) -> str:
if content:
body = str(content)
else:
metadata = metadata or {}
body = ""
raw_body = metadata.get("body") or metadata.get("snippet")
if isinstance(raw_body, str):
body = raw_body
elif isinstance(metadata.get("sections"), list):
sections = []
for section in metadata["sections"]:
if not isinstance(section, dict):
continue
title = str(section.get("title") or "").strip()
text = str(section.get("content") or "").strip()
if not title and not text:
continue
if title:
sections.append(f"## {title}")
if text:
sections.append(text)
sections.append("")
body = "\n".join(sections).rstrip()
if not body:
raise DocumentOperationError(
"CREATE_DOC_MISSING_CONTENT: provide content, body, snippet, or sections"
)
if not body.endswith("\n"):
body += "\n"
return body
def _extract_header_anchors(text: str) -> set[str]:
import re
parsed = parse_frontmatter(text)
lines = parsed.body.splitlines()
anchors: set[str] = set()
counts: Dict[str, int] = {}
in_fence = False
for line in lines:
stripped = line.lstrip()
if stripped.startswith("```") or stripped.startswith("~~~"):
in_fence = not in_fence
continue
if in_fence:
continue
match = re.match(r"^(#{1,6})\s+(.*)$", line)
if not match:
continue
title = match.group(2).strip()
if not title:
continue
anchors.add(_build_github_anchor(title, counts))
return anchors
def _validate_crosslinks(
text: str,
project: Dict[str, Any],
metadata: Optional[Dict[str, Any]],
) -> list[Dict[str, Any]]:
parsed = parse_frontmatter(text)
related_docs = []
if isinstance(metadata, dict) and isinstance(metadata.get("related_docs"), list):
related_docs = metadata.get("related_docs") or []
else:
related_docs = parsed.frontmatter_data.get("related_docs") or []
if not isinstance(related_docs, list):
related_docs = []
check_anchors = bool(metadata.get("check_anchors")) if isinstance(metadata, dict) else False
docs_dir = project.get("docs_dir")
project_root = Path(project.get("root", ""))
base_dir = Path(docs_dir) if docs_dir else _resolve_doc_path(project, "architecture").parent
diagnostics: list[Dict[str, Any]] = []
for entry in related_docs:
target = str(entry)
path_part, anchor_part = (target.split("#", 1) + [None])[:2]
candidate = Path(path_part)
if not candidate.is_absolute():
candidate = base_dir / candidate
resolved = candidate.resolve()
exists = resolved.exists()
anchor_found = None
if exists and check_anchors and anchor_part:
anchors = _extract_header_anchors(resolved.read_text(encoding="utf-8"))
anchor_found = anchor_part in anchors
diagnostics.append(
{
"target": target,
"path": str(resolved),
"exists": exists,
"anchor": anchor_part,
"anchor_found": anchor_found,
}
)
return diagnostics
def _validate_and_correct_inputs(
doc: str,
action: str,
section: Optional[str],
content: Optional[str],
patch: Optional[str] = None,
patch_source_hash: Optional[str] = None,
edit: Optional[Dict[str, Any]] = None,
start_line: Optional[int] = None,
end_line: Optional[int] = None,
template: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> tuple[str, str, Optional[str], Optional[str], Optional[str], Dict[str, Any]]:
"""
Bulletproof parameter validation and correction that NEVER fails.
Returns corrected parameters tuple: (doc, action, section, content, template, metadata)
"""
# Define validation schema
validation_schema = {
"doc": {
"type": str,
"required": True,
"default": ""
},
"action": {
"type": str,
"required": True,
"allowed_values": {"replace_section", "append", "status_update", "list_sections", "batch",
"apply_patch", "replace_range", "replace_text", "normalize_headers", "generate_toc", "create_doc", "validate_crosslinks",
"search", "create_research_doc", "create_bug_report", "create_review_report", "create_agent_report_card"},
"default": "append"
},
"section": {
"type": str,
"required": False,
"default": None
},
"content": {
"type": str,
"required": False,
"default": None
},
"template": {
"type": str,
"required": False,
"default": None
},
"metadata": {
"type": dict,
"required": False,
"default": {}
}
}
# Apply bulletproof parameter correction
input_params = {
"doc": doc,
"action": action,
"section": section,
"content": content,
"patch_source_hash": patch_source_hash,
"edit": edit,
"template": template,
"metadata": metadata
}
corrected_params = BulletproofParameterCorrector.ensure_parameter_validity(input_params, validation_schema)
# Apply business logic corrections
corrected_doc = corrected_params["doc"]
corrected_action = corrected_params["action"]
corrected_section = corrected_params["section"]
corrected_content = corrected_params["content"]
corrected_template = corrected_params["template"]
corrected_metadata = corrected_params["metadata"]
# Special handling for different actions
strict_doc_actions = {
"apply_patch",
"replace_range",
"replace_text",
"normalize_headers",
"generate_toc",
"validate_crosslinks",
"create_doc",
}
if action in strict_doc_actions and doc is not None:
corrected_doc = str(doc).strip()
if corrected_action == "list_sections":
return corrected_doc, corrected_action, None, None, None, {}
if corrected_action == "batch":
# Ensure metadata has operations list
if not isinstance(corrected_metadata, dict):
corrected_metadata = {}
if "operations" not in corrected_metadata or not corrected_metadata["operations"]:
corrected_metadata["operations"] = []
if not isinstance(corrected_metadata["operations"], list):
corrected_metadata["operations"] = [corrected_metadata["operations"]]
return corrected_doc, corrected_action, None, None, None, corrected_metadata
# Section validation for actions that need it
if corrected_action in {"replace_section", "status_update"}:
if not corrected_section:
corrected_section = "main_content"
# Sanitize section ID - keep only alphanumeric, hyphens, underscores
import re
sanitized_section = re.sub(r'[^a-zA-Z0-9_-]', '', corrected_section)
if not sanitized_section:
sanitized_section = "main_content"
corrected_section = sanitized_section
# For actions that do NOT require an explicit section (e.g. append, batch,
# list_sections), treat missing/None sections as truly absent rather than
# letting the generic corrector turn them into fallback strings like
# "No message provided", which would create bogus anchors.
if corrected_action not in {"replace_section", "status_update"}:
if section is None:
corrected_section = None
# Content/template handling (preserve caller formatting for content)
if content is not None:
corrected_content = str(content).replace("\r\n", "\n")
# Content/template handling
if corrected_action == "status_update":
# For status updates, ensure metadata has required fields
if not isinstance(corrected_metadata, dict):
corrected_metadata = {}
if not any(key in corrected_metadata for key in ["status", "proof", "completed"]):
corrected_metadata["status"] = "in_progress"
# Clear content and template for status updates
corrected_content = None
corrected_template = None
else:
# For other actions, ensure we have either content or template
if corrected_action in {"apply_patch", "replace_range", "replace_text", "normalize_headers", "generate_toc", "create_doc", "validate_crosslinks"}:
if corrected_action == "apply_patch" and edit is not None:
if corrected_content in {"No message provided", "Empty message"}:
corrected_content = None
if corrected_action in {"normalize_headers", "generate_toc"}:
corrected_content = None
corrected_template = None
if corrected_action in {"create_doc", "validate_crosslinks"}:
corrected_content = corrected_content if corrected_content not in {"No message provided", "Empty message"} else None
corrected_template = None
elif not corrected_content and not corrected_template:
corrected_content = f"## {corrected_action.replace('_', ' ').title()}\n\nContent placeholder."
return corrected_doc, corrected_action, corrected_section, corrected_content, corrected_template, corrected_metadata
async def _verify_file_write(file_path: Path, expected_content: str, expected_hash: str) -> bool:
"""Verify that the file was written correctly."""
try:
# Check if file exists
if not file_path.exists():
return False
# Read the actual content
actual_content = await asyncio.to_thread(file_path.read_text, encoding="utf-8")
# Verify content matches exactly
if actual_content != expected_content:
doc_logger.error(f"Content mismatch in {file_path}: expected {len(expected_content)} chars, got {len(actual_content)} chars")
return False
# Verify hash matches
actual_hash = _hash_text(actual_content)
if actual_hash != expected_hash:
doc_logger.error(f"Hash mismatch in {file_path}: expected {expected_hash[:8]}..., got {actual_hash[:8]}...")
return False
return True
except Exception as e:
doc_logger.error(f"Failed to verify file write for {file_path}: {e}")
return False
def _hash_text(value: str) -> str:
return hashlib.sha256(value.encode("utf-8")).hexdigest()
class DefaultDict(dict):
"""Helper to avoid KeyError when formatting template fragments."""
def __init__(self, base: Dict[str, Any]):
super().__init__(base)
def __missing__(self, key: str) -> str:
return ""
_SLUG_CLEANER = re.compile(r"[^0-9a-z_]+")
def slugify_project_name(name: str) -> str:
"""Return a filesystem-friendly slug; duplicated here to avoid circular imports during tests."""
normalised = name.strip().lower().replace(" ", "_")
return _SLUG_CLEANER.sub("_", normalised).strip("_") or "project"