from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import sqlite3
from scribe_mcp.config.settings import settings
from scribe_mcp.storage.models import ProjectRecord
@dataclass
class ProjectInfo:
"""High-level view of a project's registry state.
This is a logical view, not a 1:1 mapping to any single table.
Fields are computed from `scribe_projects`, `scribe_metrics`,
and dev plan tables where available.
"""
project_slug: str
project_name: str
description: Optional[str]
status: str
created_at: Optional[datetime]
last_entry_at: Optional[datetime]
last_access_at: Optional[datetime]
last_status_change: Optional[datetime]
total_entries: int
total_files: int
total_phases: int
tags: List[str]
meta: Dict[str, Any]
class ProjectRegistry:
"""SQLite-first Project Registry helper.
For v1 this helper focuses on the SQLite backend defined by
`settings.sqlite_path`. The SQL is written to remain portable
so that a future Postgres implementation can mirror behaviour.
"""
def __init__(self, db_path: Optional[Path] = None) -> None:
self._db_path = Path(db_path or settings.sqlite_path).expanduser()
self._db_path.parent.mkdir(parents=True, exist_ok=True)
self._ensure_schema()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def ensure_project(
self,
project: ProjectRecord,
*,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
meta: Optional[Dict[str, Any]] = None,
) -> None:
"""Ensure registry row exists for this project.
SQLiteStorage already guarantees a scribe_projects row; here we
opportunistically backfill new registry-focused columns.
"""
tags_str = ",".join(sorted(set(tags))) if tags else None
meta_str = None
if meta:
# Store as a simple JSON string; avoid importing json at top-level
import json
meta_str = json.dumps(meta, separators=(",", ":"), sort_keys=True)
now = self._now_iso()
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"""
UPDATE scribe_projects
SET
description = COALESCE(description, ?),
tags = COALESCE(tags, ?),
meta = COALESCE(meta, ?),
last_access_at = COALESCE(last_access_at, ?)
WHERE name = ?
""",
(description, tags_str, meta_str, now, project.name),
)
def touch_access(self, project_name: str) -> None:
"""Update last_access_at when a project is (re)selected."""
now = self._now_iso()
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"UPDATE scribe_projects SET last_access_at = ? WHERE name = ?",
(now, project_name),
)
def touch_entry(self, project_name: str, log_type: Optional[str] = None) -> None:
"""Update last_entry_at when we write logs/docs.
Also applies soft lifecycle rules:
- If status == 'planning'
- AND core dev_plan docs exist (architecture, phase_plan, checklist)
- AND at least one *progress* log entry has been written
→ auto-promote to 'in_progress' and set last_status_change.
"""
now = self._now_iso()
normalized_log_type = (log_type or "progress").lower()
with sqlite3.connect(self._db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE scribe_projects SET last_entry_at = ? WHERE name = ?",
(now, project_name),
)
# Fetch project id + current status for lifecycle checks
cursor.execute(
"SELECT id, COALESCE(status, 'planning') FROM scribe_projects WHERE name = ?",
(project_name,),
)
row = cursor.fetchone()
if not row:
conn.commit()
return
project_id, status = row
if status != "planning":
conn.commit()
return
# Check for core dev_plan docs: architecture, phase_plan, checklist
try:
cursor.execute(
"""
SELECT COUNT(*) FROM dev_plans
WHERE project_id = ?
AND plan_type IN ('architecture', 'phase_plan', 'checklist')
""",
(project_id,),
)
docs_count = cursor.fetchone()[0] or 0
except sqlite3.Error:
docs_count = 0
if docs_count < 3:
conn.commit()
return
# Only auto-promote on progress log writes; other log types
# (e.g., doc_updates) still update last_entry_at but do not
# change lifecycle state.
if normalized_log_type != "progress":
conn.commit()
return
# All conditions met; promote planning -> in_progress
cursor.execute(
"""
UPDATE scribe_projects
SET status = 'in_progress',
last_status_change = ?
WHERE id = ?
""",
(now, project_id),
)
conn.commit()
def set_status(
self,
project_name: str,
status: str,
) -> None:
"""Set lifecycle status and bump last_status_change."""
now = self._now_iso()
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"""
UPDATE scribe_projects
SET status = ?, last_status_change = ?
WHERE name = ?
""",
(status, now, project_name),
)
def record_doc_update(
self,
project_name: str,
*,
doc: str,
action: str,
before_hash: Optional[str] = None,
after_hash: Optional[str] = None,
) -> None:
"""Record manage_docs-specific metrics in the registry meta blob.
We treat these as lightweight counters/timestamps so agents can
distinguish doc updates (usually at phase boundaries) from the
more frequent progress log traffic.
"""
now = self._now_iso()
with sqlite3.connect(self._db_path) as conn:
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT meta FROM scribe_projects WHERE name = ?",
(project_name,),
).fetchone()
meta: Dict[str, Any] = {}
if row and row["meta"]:
try:
import json
meta = json.loads(row["meta"])
except Exception:
meta = {"raw": row["meta"]}
docs_meta = meta.get("docs") or {}
# Simple counters + timestamps; easy to extend later.
docs_meta["last_update_at"] = now
docs_meta["last_doc_type"] = doc
docs_meta["last_action"] = action
docs_meta["update_count"] = int(docs_meta.get("update_count") or 0) + 1
# Baseline and current hashes per doc type
baseline_map = docs_meta.get("baseline_hashes") or {}
current_map = docs_meta.get("current_hashes") or {}
if doc not in baseline_map and before_hash:
baseline_map[doc] = before_hash
if after_hash:
current_map[doc] = after_hash
docs_meta["baseline_hashes"] = baseline_map
docs_meta["current_hashes"] = current_map
# Derive simple doc-hygiene flags from hashes so agents
# don't need to compare them manually.
flags = docs_meta.get("flags") or {}
core_docs = {"architecture", "phase_plan", "checklist"}
seen_docs = set(baseline_map.keys()) | set(current_map.keys())
for doc_name in seen_docs:
baseline_val = baseline_map.get(doc_name)
current_val = current_map.get(doc_name)
touched = bool(baseline_val or current_val)
modified = (
bool(baseline_val)
and bool(current_val)
and baseline_val != current_val
)
flags[f"{doc_name}_touched"] = touched
flags[f"{doc_name}_modified"] = modified
# Aggregate readiness hints for core dev_plan docs.
if core_docs & seen_docs:
flags["docs_started"] = any(
flags.get(f"{name}_touched") for name in core_docs
)
flags["docs_ready_for_work"] = all(
flags.get(f"{name}_touched") for name in core_docs
)
docs_meta["flags"] = flags
meta["docs"] = docs_meta
try:
import json
meta_str = json.dumps(meta, separators=(",", ":"), sort_keys=True)
except Exception:
meta_str = str(meta)
conn.execute(
"""
UPDATE scribe_projects
SET meta = ?, last_entry_at = COALESCE(last_entry_at, ?)
WHERE name = ?
""",
(meta_str, now, project_name),
)
def get_project(self, project_name: str) -> Optional[ProjectInfo]:
"""Fetch registry view for a single project."""
with sqlite3.connect(self._db_path) as conn:
conn.row_factory = sqlite3.Row
row = conn.execute(
"""
SELECT
p.name AS project_slug,
p.name AS project_name,
p.description,
COALESCE(p.status, 'planning') AS status,
p.created_at,
p.last_entry_at,
p.last_access_at,
p.last_status_change,
COALESCE(m.total_entries, 0) AS total_entries,
COALESCE(df.total_files, 0) AS total_files,
COALESCE(ph.total_phases, 0) AS total_phases,
p.tags,
p.meta
FROM scribe_projects p
LEFT JOIN scribe_metrics m
ON m.project_id = p.id
LEFT JOIN (
SELECT project_id, COUNT(*) AS total_files
FROM dev_plans
GROUP BY project_id
) AS df
ON df.project_id = p.id
LEFT JOIN (
SELECT project_id, COUNT(*) AS total_phases
FROM phases
GROUP BY project_id
) AS ph
ON ph.project_id = p.id
WHERE p.name = ?
""",
(project_name,),
).fetchone()
if not row:
return None
return self._row_to_project_info(row)
def list_projects(
self,
*,
status: Optional[List[str]] = None,
limit: int = 50,
) -> List[ProjectInfo]:
"""List projects with basic filtering.
This is a minimal v1 implementation; richer filters will be
layered on when we add the dedicated MCP tools.
"""
clauses = ["1=1"]
params: List[Any] = []
if status:
placeholders = ",".join("?" for _ in status)
clauses.append(f"p.status IN ({placeholders})")
params.extend(status)
where_clause = " AND ".join(clauses)
query = f"""
SELECT
p.name AS project_slug,
p.name AS project_name,
p.description,
COALESCE(p.status, 'planning') AS status,
p.created_at,
p.last_entry_at,
p.last_access_at,
p.last_status_change,
COALESCE(m.total_entries, 0) AS total_entries,
COALESCE(df.total_files, 0) AS total_files,
COALESCE(ph.total_phases, 0) AS total_phases,
p.tags,
p.meta
FROM scribe_projects p
LEFT JOIN scribe_metrics m
ON m.project_id = p.id
LEFT JOIN (
SELECT project_id, COUNT(*) AS total_files
FROM dev_plans
GROUP BY project_id
) AS df
ON df.project_id = p.id
LEFT JOIN (
SELECT project_id, COUNT(*) AS total_phases
FROM phases
GROUP BY project_id
) AS ph
ON ph.project_id = p.id
WHERE {where_clause}
ORDER BY COALESCE(p.last_entry_at, p.created_at) DESC
LIMIT ?
"""
params.append(limit)
with sqlite3.connect(self._db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(query, params).fetchall()
return [self._row_to_project_info(r) for r in rows]
def get_last_known_project(
self,
*,
candidates: Optional[List[str]] = None,
) -> Optional[ProjectInfo]:
"""Return the most recently accessed project (optionally among candidates)."""
clauses = ["1=1"]
params: List[Any] = []
if candidates:
placeholders = ",".join("?" for _ in candidates)
clauses.append(f"p.name IN ({placeholders})")
params.extend(candidates)
where_clause = " AND ".join(clauses)
query = f"""
SELECT
p.name AS project_slug,
p.name AS project_name,
p.description,
COALESCE(p.status, 'planning') AS status,
p.created_at,
p.last_entry_at,
p.last_access_at,
p.last_status_change,
COALESCE(m.total_entries, 0) AS total_entries,
COALESCE(df.total_files, 0) AS total_files,
COALESCE(ph.total_phases, 0) AS total_phases,
p.tags,
p.meta
FROM scribe_projects p
LEFT JOIN scribe_metrics m
ON m.project_id = p.id
LEFT JOIN (
SELECT project_id, COUNT(*) AS total_files
FROM dev_plans
GROUP BY project_id
) AS df
ON df.project_id = p.id
LEFT JOIN (
SELECT project_id, COUNT(*) AS total_phases
FROM phases
GROUP BY project_id
) AS ph
ON ph.project_id = p.id
WHERE {where_clause}
ORDER BY COALESCE(p.last_access_at, p.last_entry_at, p.created_at) DESC
LIMIT 1
"""
with sqlite3.connect(self._db_path) as conn:
conn.row_factory = sqlite3.Row
row = conn.execute(query, params).fetchone()
if not row:
return None
return self._row_to_project_info(row)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _ensure_schema(self) -> None:
"""Ensure registry-specific columns exist on scribe_projects.
We only add columns that are safe no-ops on existing installs.
Older fields like status/phase/last_activity are preserved.
"""
with sqlite3.connect(self._db_path) as conn:
cursor = conn.cursor()
# Create a minimal table if it does not exist. This avoids first-run
# failures when running Scribe against a fresh repo before the main
# storage backend has created schema.
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS scribe_projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
repo_root TEXT,
progress_log_path TEXT,
description TEXT,
status TEXT,
progress_log TEXT,
root TEXT,
created_at TEXT,
last_entry_at TEXT,
last_access_at TEXT,
last_status_change TEXT,
tags TEXT,
meta TEXT
)
"""
)
cursor.execute("PRAGMA table_info(scribe_projects)")
existing = {row[1] for row in cursor.fetchall()}
def add_column(name: str, ddl: str) -> None:
if name in existing:
return
cursor.execute(f"ALTER TABLE scribe_projects ADD COLUMN {ddl}")
# New registry-focused fields for v1
add_column("description", "description TEXT")
add_column("last_entry_at", "last_entry_at TEXT")
add_column("last_access_at", "last_access_at TEXT")
add_column("last_status_change", "last_status_change TEXT")
add_column("tags", "tags TEXT")
add_column("meta", "meta TEXT")
conn.commit()
@staticmethod
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
@staticmethod
def _parse_ts(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
dt = datetime.fromisoformat(value)
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
except ValueError:
return None
def _row_to_project_info(self, row: sqlite3.Row) -> ProjectInfo:
tags_raw = row["tags"]
tags: List[str] = []
if tags_raw:
tags = [t for t in tags_raw.split(",") if t]
meta_raw = row["meta"]
meta: Dict[str, Any] = {}
if meta_raw:
try:
import json
meta = json.loads(meta_raw)
except Exception:
meta = {"raw": meta_raw}
# Ensure project-scoped metadata container exists and has a work_blockers list.
project_meta = (meta.get("project") or {}).copy()
project_meta.setdefault("work_blockers", [])
meta["project"] = project_meta
# Derive activity metrics for this project (non-persistent).
now = datetime.now(timezone.utc)
created_ts = self._parse_ts(row["created_at"]) or now
last_entry_ts = self._parse_ts(row["last_entry_at"]) or created_ts
last_access_ts = self._parse_ts(row["last_access_at"]) or created_ts
age_days = max(0.0, (now - created_ts).total_seconds() / 86400.0)
since_entry = max(0.0, (now - last_entry_ts).total_seconds() / 86400.0)
since_access = max(0.0, (now - last_access_ts).total_seconds() / 86400.0)
if age_days <= 2:
staleness_level = "fresh"
elif age_days <= 7:
staleness_level = "warming"
elif age_days <= 30:
staleness_level = "stale"
else:
staleness_level = "frozen"
# Basic activity metrics.
activity_meta: Dict[str, Any] = {
"project_age_days": age_days,
"days_since_last_entry": since_entry,
"days_since_last_access": since_access,
"staleness_level": staleness_level,
}
# Activity score: higher means "more active / higher priority".
entries = int(row["total_entries"])
entry_rate = entries / age_days if age_days > 0 else float(entries)
priority_raw = project_meta.get("priority")
priority_score = 0.0
if isinstance(priority_raw, (int, float)):
priority_score = float(priority_raw)
elif isinstance(priority_raw, str):
_prio_map = {
"low": 0.0,
"medium": 1.0,
"high": 2.0,
"critical": 3.0,
}
priority_score = _prio_map.get(priority_raw.lower(), 0.0)
# Doc flags may influence activity score (e.g., docs ready for work).
docs_meta = (meta.get("docs") or {}).copy()
flags = (docs_meta.get("flags") or {}).copy()
docs_ready = bool(flags.get("docs_ready_for_work"))
activity_score = (
-since_entry
- 0.5 * since_access
+ 1.5 * entry_rate
+ (2.0 if docs_ready else 0.0)
+ 0.5 * priority_score
)
activity_meta["activity_score"] = activity_score
# Doc drift hints based on docs meta + lifecycle.
status = row["status"]
last_docs_update_ts = None
last_docs_update_raw = docs_meta.get("last_update_at")
if isinstance(last_docs_update_raw, str):
last_docs_update_ts = self._parse_ts(last_docs_update_raw)
doc_drift = False
doc_drift_days = None
if status in ("in_progress", "complete"):
if not docs_ready:
doc_drift = True
if last_entry_ts and not last_docs_update_ts:
doc_drift = True
elif last_entry_ts and last_docs_update_ts:
diff_days = (last_entry_ts - last_docs_update_ts).total_seconds() / 86400.0
doc_drift_days = diff_days
if diff_days >= 7.0:
doc_drift = True
if doc_drift_days is not None:
docs_meta["doc_drift_days_since_update"] = doc_drift_days
# Drift score: single scalar for how "bad" drift is.
drift_score = 0.0
if doc_drift:
if doc_drift_days is not None:
drift_score += max(0.0, doc_drift_days)
if not docs_ready:
drift_score += 5.0
docs_meta["drift_score"] = drift_score
flags["doc_drift_suspected"] = doc_drift
docs_meta["flags"] = flags
if docs_meta:
meta["docs"] = docs_meta
meta.setdefault("activity", activity_meta)
return ProjectInfo(
project_slug=row["project_slug"],
project_name=row["project_name"],
description=row["description"],
status=row["status"],
created_at=self._parse_ts(row["created_at"]),
last_entry_at=self._parse_ts(row["last_entry_at"]),
last_access_at=self._parse_ts(row["last_access_at"]),
last_status_change=self._parse_ts(row["last_status_change"]),
total_entries=int(row["total_entries"]),
total_files=int(row["total_files"]),
total_phases=int(row["total_phases"]),
tags=tags,
meta=meta,
)
__all__ = ["ProjectInfo", "ProjectRegistry"]