"""Special document creation and index update helpers for manage_docs."""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Awaitable, Callable, Dict, Optional
from scribe_mcp import server as server_module
from scribe_mcp.doc_management import healing as healing_shared
from scribe_mcp.doc_management import indexing as indexing_shared
from scribe_mcp.doc_management import special_indexes as special_indexes_shared
from scribe_mcp.doc_management import utils as utils_shared
from scribe_mcp.doc_management.manager import DocumentOperationError
from scribe_mcp.tools.append_entry import append_entry
from scribe_mcp.utils.slug import slugify_project_name
def _hash_text(content: str) -> str:
return hashlib.sha256(content.encode("utf-8")).hexdigest()
async def _get_or_create_storage_project(backend: Any, project: Dict[str, Any]) -> Any:
timeout = server_module.settings.storage_timeout_seconds
async with asyncio.timeout(timeout):
storage_record = await backend.fetch_project(project["name"])
if not storage_record:
async with asyncio.timeout(timeout):
storage_record = await backend.upsert_project(
name=project["name"],
repo_root=project["root"],
progress_log_path=project["progress_log"],
)
return storage_record
def _build_special_metadata(
project: Dict[str, Any],
metadata: Dict[str, Any],
agent_id: str,
extra: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
return utils_shared.build_special_metadata(project, metadata, agent_id, extra=extra)
async def _render_special_template(
project: Dict[str, Any],
agent_id: str,
template_name: str,
metadata: Dict[str, Any],
extra_metadata: Optional[Dict[str, Any]] = None,
prepared_metadata: Optional[Dict[str, Any]] = None,
) -> str:
try:
from scribe_mcp.template_engine import Jinja2TemplateEngine, TemplateEngineError
engine = Jinja2TemplateEngine(
project_root=Path(project.get("root", "")),
project_name=project.get("name", ""),
security_mode="sandbox",
)
if prepared_metadata is None:
prepared_metadata = _build_special_metadata(
project,
metadata,
agent_id,
extra=extra_metadata,
)
return engine.render_template(
template_name=f"documents/{template_name}",
metadata=prepared_metadata,
)
except (ImportError, TemplateEngineError) as exc:
raise DocumentOperationError(f"Failed to render template '{template_name}': {exc}") from exc
async def _record_special_doc_change(
backend: Any,
project: Dict[str, Any],
agent_id: str,
doc_label: str,
target_path: Path,
metadata: Dict[str, Any],
before_hash: str,
after_hash: str,
logger: logging.Logger,
) -> None:
if not backend:
return
try:
storage_record = await _get_or_create_storage_project(backend, project)
except Exception as exc: # pragma: no cover
logger.warning("Failed to prepare storage record for %s: %s", doc_label, exc)
return
action = "create" if not before_hash else "update"
try:
await backend.record_doc_change(
storage_record,
doc=doc_label,
section=None,
action=action,
agent=agent_id,
metadata=metadata,
sha_before=before_hash,
sha_after=after_hash,
)
except Exception as exc:
logger.warning("Failed to record special doc change for %s: %s", doc_label, exc)
async def _record_agent_report_card_metadata(
backend: Any,
project: Dict[str, Any],
agent_id: str,
target_path: Path,
metadata: Dict[str, Any],
logger: logging.Logger,
) -> None:
if not backend:
return
try:
storage_record = await _get_or_create_storage_project(backend, project)
except Exception as exc:
logger.warning("Failed to prepare storage project for agent card: %s", exc)
return
try:
await backend.record_agent_report_card(
storage_record,
file_path=str(target_path),
agent_name=metadata.get("agent_name", agent_id),
stage=metadata.get("stage"),
overall_grade=utils_shared.parse_numeric_grade(metadata.get("overall_grade")),
performance_level=metadata.get("performance_level"),
metadata=metadata,
)
except Exception as exc:
logger.warning("Failed to record agent report card metadata: %s", exc)
def get_index_updater_for_path(
file_path: Path,
project_root: Path,
docs_dir: Path,
agent_id: str,
) -> Optional[Callable[[], Awaitable[None]]]:
return indexing_shared.get_index_updater_for_path(
file_path=file_path,
project_root=project_root,
docs_dir=docs_dir,
agent_id=agent_id,
update_research_index=_update_research_index,
update_bug_index=_update_bug_index,
update_review_index=_update_review_index,
update_agent_card_index=_update_agent_card_index,
)
async def handle_special_document_creation(
project: Dict[str, Any],
action: str,
doc_name: Optional[str],
target_dir: Optional[str],
content: Optional[str],
metadata: Optional[Dict[str, Any]],
dry_run: bool,
agent_id: str,
storage_backend: Any,
helper: Any,
context: Any,
project_registry: Any,
logger: logging.Logger,
) -> Dict[str, Any]:
healed_metadata, _, _ = healing_shared.normalize_metadata_with_healing(metadata)
metadata = healed_metadata
project_root = Path(project.get("root", ""))
docs_dir_str = project.get("docs_dir", "")
docs_dir = Path(docs_dir_str) if docs_dir_str else Path("")
if not docs_dir or str(docs_dir) in {"", "."}:
project_slug = slugify_project_name(project.get("name", ""))
docs_dir = project_root / ".scribe" / "docs" / "dev_plans" / project_slug
now = datetime.now(timezone.utc)
timestamp_str = now.strftime("%Y-%m-%d %H:%M:%S UTC")
template_name = ""
doc_label = ""
target_path: Optional[Path] = None
index_updater: Optional[Callable[[], Awaitable[None]]] = None
index_path: Optional[Path] = None
extra_metadata: Dict[str, Any] = {}
placement_warning: Optional[str] = None
if action == "create_research_doc":
if not doc_name:
return helper.apply_context_payload(
helper.error_response(
"doc_name is required for research document creation",
),
context,
)
safe_name = re.sub(r"[^\w\-_.]", "_", doc_name)
safe_name = re.sub(r"_+", "_", safe_name).strip("_")
if not safe_name:
safe_name = f"research_{int(datetime.now().timestamp())}"
override_dir = target_dir or metadata.get("target_dir")
if override_dir:
research_dir = Path(override_dir)
if not research_dir.is_absolute():
research_dir = project_root / research_dir
else:
research_dir = docs_dir / "research"
progress_log_path = project.get("progress_log", "")
if progress_log_path:
progress_log_dir = Path(progress_log_path).parent
try:
expected_resolved = progress_log_dir.resolve()
actual_resolved = docs_dir.resolve()
if expected_resolved != actual_resolved:
placement_warning = (
f"Research doc will be created in '{actual_resolved}/research/' "
f"but PROGRESS_LOG is in '{expected_resolved}'. "
f"Use target_dir parameter or metadata.target_dir to override placement."
)
except Exception:
pass
target_path = research_dir / f"{safe_name}.md"
template_name = "RESEARCH_REPORT_TEMPLATE.md"
doc_label = "research_report"
extra_metadata = {
"title": doc_name.replace("_", " ").title(),
"doc_name": safe_name,
"researcher": metadata.get("researcher", agent_id),
}
index_updater = lambda: _update_research_index(research_dir, agent_id)
index_path = research_dir / "INDEX.md"
elif action == "create_bug_report":
category = metadata.get("category")
if not category or not category.strip():
return helper.apply_context_payload(
helper.error_response(
"metadata with non-empty 'category' is required for bug report creation",
),
context,
)
category = re.sub(r"[^\w\-_.]", "_", category.strip())
slug = metadata.get("slug")
if slug:
slug = re.sub(r"[^\w\-_.]", "_", str(slug).strip())
if not slug:
slug = f"bug_{int(now.timestamp())}"
bug_dir = project_root / "docs" / "bugs" / category / f"{now.strftime('%Y-%m-%d')}_{slug}"
target_path = bug_dir / "report.md"
template_name = "BUG_REPORT_TEMPLATE.md"
doc_label = "bug_report"
extra_metadata = {
"slug": slug,
"category": category,
"reported_at": metadata.get("reported_at", timestamp_str),
}
index_updater = lambda: _update_bug_index(project_root / "docs" / "bugs", agent_id)
index_path = project_root / "docs" / "bugs" / "INDEX.md"
elif action == "create_review_report":
stage = metadata.get("stage", "unknown")
target_path = docs_dir / f"REVIEW_REPORT_{stage}_{now.strftime('%Y-%m-%d')}_{now.strftime('%H%M')}.md"
template_name = "REVIEW_REPORT_TEMPLATE.md"
doc_label = "review_report"
extra_metadata = {"stage": stage}
index_updater = lambda: _update_review_index(docs_dir, agent_id)
index_path = docs_dir / "REVIEW_INDEX.md"
elif action == "create_agent_report_card":
card_agent = metadata.get("agent_name", agent_id)
stage = metadata.get("stage", "unknown")
target_path = docs_dir / f"AGENT_REPORT_CARD_{card_agent}_{stage}_{now.strftime('%Y%m%d_%H%M')}.md"
template_name = "AGENT_REPORT_CARD_TEMPLATE.md"
doc_label = "agent_report_card"
extra_metadata = {
"agent_name": card_agent,
"stage": stage,
}
index_updater = lambda: _update_agent_card_index(docs_dir, agent_id)
index_path = docs_dir / "AGENT_CARDS_INDEX.md"
else:
return helper.apply_context_payload(
helper.error_response(f"Unsupported special document action: {action}"),
context,
)
prepared_metadata = _build_special_metadata(project, metadata, agent_id, extra_metadata)
rendered_content = content
if not rendered_content:
try:
if action == "create_review_report":
rendered_content = await _render_review_report_template(
project,
agent_id,
prepared_metadata,
logger=logger,
)
elif action == "create_agent_report_card":
rendered_content = await _render_agent_report_card_template(
project,
agent_id,
prepared_metadata,
logger=logger,
)
else:
rendered_content = await _render_special_template(
project,
agent_id,
template_name,
metadata,
extra_metadata=extra_metadata,
prepared_metadata=prepared_metadata,
)
except DocumentOperationError as exc:
return helper.apply_context_payload(
helper.error_response(str(exc)),
context,
)
if rendered_content is None:
return helper.apply_context_payload(
helper.error_response("Failed to render document content."),
context,
)
try:
target_path.resolve().relative_to(project_root.resolve())
except ValueError:
return helper.apply_context_payload(
helper.error_response(
f"Generated document path {target_path} is outside project root",
),
context,
)
if dry_run:
return helper.apply_context_payload(
{
"ok": True,
"dry_run": True,
"path": str(target_path),
"content": rendered_content,
},
context,
)
before_hash = ""
if target_path.exists():
try:
before_hash = _hash_text(target_path.read_text(encoding="utf-8"))
except (OSError, UnicodeDecodeError):
before_hash = ""
log_warning: Optional[str] = None
try:
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_text(rendered_content, encoding="utf-8")
after_hash = _hash_text(rendered_content)
await _record_special_doc_change(
storage_backend,
project,
agent_id,
doc_label,
target_path,
prepared_metadata,
before_hash,
after_hash,
logger=logger,
)
if doc_label == "agent_report_card":
await _record_agent_report_card_metadata(
storage_backend,
project,
agent_id,
target_path,
prepared_metadata,
logger=logger,
)
healed_metadata, _, _ = healing_shared.normalize_metadata_with_healing(prepared_metadata)
log_meta = healed_metadata
log_meta.update(
{
"doc": doc_label,
"section": "",
"action": "create",
"document_type": doc_label,
"file_path": str(target_path),
"file_size": target_path.stat().st_size,
}
)
for key, value in list(log_meta.items()):
if isinstance(value, (dict, list)):
try:
log_meta[key] = json.dumps(value, sort_keys=True)
except (TypeError, ValueError):
log_meta[key] = str(value)
try:
await append_entry(
message=f"Created {doc_label.replace('_', ' ')}: {target_path.name}",
status="success",
meta=log_meta,
agent=agent_id,
log_type="doc_updates",
format="structured",
)
except Exception as exc:
log_warning = str(exc)
if index_updater:
try:
await index_updater()
except Exception as exc:
logger.warning("Failed to update index for %s: %s", doc_label, exc)
registration_warning: Optional[str] = None
if storage_backend and project:
try:
project_name = project.get("name")
if project_name:
doc_key = f"{doc_label}_{target_path.stem}"
current_docs = project.get("docs", {})
current_docs[doc_key] = str(target_path)
docs_json = json.dumps(current_docs)
await storage_backend.update_project_docs(project_name, docs_json)
try:
project_registry.record_doc_update(
project_name=project_name,
doc=doc_key,
action="create",
before_hash=None,
after_hash=after_hash,
)
except Exception as reg_exc:
registration_warning = f"Registry update failed: {reg_exc}"
except Exception as exc:
registration_warning = f"Doc registration failed: {exc}"
if storage_backend and project and index_path and index_path.exists():
try:
project_name = project.get("name")
if project_name:
index_key = f"{doc_label}_index"
current_project = await storage_backend.fetch_project(project_name)
if current_project and current_project.docs_json:
current_docs = json.loads(current_project.docs_json)
else:
current_docs = project.get("docs", {})
current_docs[index_key] = str(index_path)
docs_json = json.dumps(current_docs)
await storage_backend.update_project_docs(project_name, docs_json)
except Exception as exc:
if registration_warning:
registration_warning += f"; Index registration failed: {exc}"
else:
registration_warning = f"Index registration failed: {exc}"
success_payload: Dict[str, Any] = {
"ok": True,
"path": str(target_path),
"document_type": doc_label,
"file_size": target_path.stat().st_size,
}
if log_warning:
success_payload["log_warning"] = log_warning
if registration_warning:
success_payload["registration_warning"] = registration_warning
if placement_warning:
success_payload["placement_warning"] = placement_warning
return helper.apply_context_payload(success_payload, context)
except Exception as exc:
return helper.apply_context_payload(
helper.error_response(f"Failed to create document: {exc}"),
context,
)
async def _update_research_index(research_dir: Path, agent_id: str) -> None:
await special_indexes_shared.update_research_index(research_dir, agent_id)
async def _update_bug_index(bugs_dir: Path, agent_id: str) -> None:
await special_indexes_shared.update_bug_index(bugs_dir, agent_id)
async def _update_review_index(docs_dir: Path, agent_id: str) -> None:
await special_indexes_shared.update_review_index(docs_dir, agent_id)
async def _update_agent_card_index(docs_dir: Path, agent_id: str) -> None:
await special_indexes_shared.update_agent_card_index(docs_dir, agent_id)
async def _render_review_report_template(
project: Dict[str, Any],
agent_id: str,
prepared_metadata: Dict[str, Any],
logger: logging.Logger,
) -> str:
return await special_indexes_shared.render_review_report_template(
project=project,
agent_id=agent_id,
prepared_metadata=prepared_metadata,
logger=logger,
)
async def _render_agent_report_card_template(
project: Dict[str, Any],
agent_id: str,
prepared_metadata: Dict[str, Any],
logger: logging.Logger,
) -> str:
return await special_indexes_shared.render_agent_report_card_template(
project=project,
agent_id=agent_id,
prepared_metadata=prepared_metadata,
logger=logger,
)