Skip to main content
Glama

Scribe MCP Server

by paxocial
sqlite.py46.8 kB
"""SQLite storage backend (default).""" from __future__ import annotations import asyncio import json import sqlite3 from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from scribe_mcp.storage.base import StorageBackend from scribe_mcp.storage.models import ( ProjectRecord, DevPlanRecord, PhaseRecord, MilestoneRecord, BenchmarkRecord, ChecklistRecord, PerformanceMetricsRecord, DocumentSectionRecord, CustomTemplateRecord, DocumentChangeRecord, SyncStatusRecord ) from scribe_mcp.utils.time import format_utc, utcnow from scribe_mcp.utils.search import message_matches SQLITE_TIMEOUT_SECONDS = 30 SQLITE_BUSY_TIMEOUT_MS = 5000 class SQLiteStorage(StorageBackend): """SQLite-backed persistence with lazy connections.""" def __init__(self, db_path: Path | str) -> None: self._path = Path(db_path).expanduser() self._init_lock = asyncio.Lock() self._write_lock = asyncio.Lock() self._initialised = False async def setup(self) -> None: await self._initialise() async def close(self) -> None: # pragma: no cover - nothing persistent to close # Connections are short-lived; nothing to do. return None async def upsert_project( self, *, name: str, repo_root: str, progress_log_path: str, ) -> ProjectRecord: await self._initialise() async with self._write_lock: await self._execute( """ INSERT INTO scribe_projects (name, repo_root, progress_log_path) VALUES (?, ?, ?) ON CONFLICT(name) DO UPDATE SET repo_root = excluded.repo_root, progress_log_path = excluded.progress_log_path; """, (name, repo_root, progress_log_path), ) row = await self._fetchone( """ SELECT id, name, repo_root, progress_log_path FROM scribe_projects WHERE name = ?; """, (name,), ) return ProjectRecord( id=row["id"], name=row["name"], repo_root=row["repo_root"], progress_log_path=row["progress_log_path"], ) async def fetch_project(self, name: str) -> Optional[ProjectRecord]: await self._initialise() row = await self._fetchone( """ SELECT id, name, repo_root, progress_log_path FROM scribe_projects WHERE name = ?; """, (name,), ) if not row: return None return ProjectRecord( id=row["id"], name=row["name"], repo_root=row["repo_root"], progress_log_path=row["progress_log_path"], ) async def list_projects(self) -> List[ProjectRecord]: await self._initialise() rows = await self._fetchall( """ SELECT id, name, repo_root, progress_log_path FROM scribe_projects ORDER BY name; """ ) records: List[ProjectRecord] = [] for row in rows: records.append( ProjectRecord( id=row["id"], name=row["name"], repo_root=row["repo_root"], progress_log_path=row["progress_log_path"], ) ) return records async def insert_entry( self, *, entry_id: str, project: ProjectRecord, ts: datetime, emoji: str, agent: Optional[str], message: str, meta: Optional[Dict[str, Any]], raw_line: str, sha256: str, ) -> None: await self._initialise() ts_iso = ts.isoformat() meta_json = json.dumps(meta or {}, sort_keys=True) async with self._write_lock: await self._execute( """ INSERT OR IGNORE INTO scribe_entries (id, project_id, ts, emoji, agent, message, meta, raw_line, sha256, ts_iso) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?); """, ( entry_id, project.id, format_utc(ts), emoji, agent, message, meta_json, raw_line, sha256, ts_iso, ), ) await self._execute( """ INSERT INTO scribe_metrics (project_id, total_entries, success_count, warn_count, error_count, last_update) VALUES (?, 1, ?, ?, ?, ?) ON CONFLICT(project_id) DO UPDATE SET total_entries = scribe_metrics.total_entries + 1, success_count = scribe_metrics.success_count + excluded.success_count, warn_count = scribe_metrics.warn_count + excluded.warn_count, error_count = scribe_metrics.error_count + excluded.error_count, last_update = excluded.last_update; """, ( project.id, 1 if emoji == "✅" else 0, 1 if emoji == "⚠️" else 0, 1 if emoji == "❌" else 0, utcnow().isoformat(), ), ) async def record_doc_change( self, project: ProjectRecord, *, doc: str, section: Optional[str], action: str, agent: Optional[str], metadata: Optional[Dict[str, Any]], sha_before: str, sha_after: str, ) -> None: await self._initialise() meta_json = json.dumps(metadata or {}, sort_keys=True) async with self._write_lock: await self._execute( """ INSERT INTO doc_changes (project_id, doc_name, section, action, agent, metadata, sha_before, sha_after) VALUES (?, ?, ?, ?, ?, ?, ?, ?); """, ( project.id, doc, section, action, agent, meta_json, sha_before, sha_after, ), ) await self._execute( """ DELETE FROM doc_changes WHERE id IN ( SELECT id FROM doc_changes WHERE project_id = ? ORDER BY created_at DESC LIMIT -1 OFFSET 500 ); """, (project.id,), ) async def fetch_recent_entries( self, *, project: ProjectRecord, limit: int, filters: Optional[Dict[str, Any]] = None, ) -> List[Dict[str, Any]]: await self._initialise() filters = filters or {} clauses = ["project_id = ?"] params: List[Any] = [project.id] agent = filters.get("agent") if agent: clauses.append("agent = ?") params.append(agent) emoji = filters.get("emoji") if emoji: clauses.append("emoji = ?") params.append(emoji) where_clause = " AND ".join(clauses) rows = await self._fetchall( f""" SELECT id, ts, emoji, agent, message, meta, raw_line FROM scribe_entries WHERE {where_clause} ORDER BY ts_iso DESC LIMIT ?; """, (*params, limit), ) results: List[Dict[str, Any]] = [] for row in rows: meta_value = json.loads(row["meta"]) if row["meta"] else {} results.append( { "id": row["id"], "ts": row["ts"], "emoji": row["emoji"], "agent": row["agent"], "message": row["message"], "meta": meta_value, "raw_line": row["raw_line"], } ) return results async def query_entries( self, *, project: ProjectRecord, limit: int, start: Optional[str] = None, end: Optional[str] = None, agents: Optional[List[str]] = None, emojis: Optional[List[str]] = None, message: Optional[str] = None, message_mode: str = "substring", case_sensitive: bool = False, meta_filters: Optional[Dict[str, str]] = None, ) -> List[Dict[str, Any]]: await self._initialise() limit = max(1, min(limit, 500)) fetch_limit = min(max(limit * 3, limit), 1000) clauses = ["project_id = ?"] params: List[Any] = [project.id] if start: clauses.append("ts_iso >= ?") params.append(start) if end: clauses.append("ts_iso <= ?") params.append(end) if agents: agent_placeholders = ", ".join("?" for _ in agents) clauses.append(f"agent IN ({agent_placeholders})") params.extend(agents) if emojis: emoji_placeholders = ", ".join("?" for _ in emojis) clauses.append(f"emoji IN ({emoji_placeholders})") params.extend(emojis) if meta_filters: for key, value in sorted(meta_filters.items()): clauses.append("json_extract(meta, ?) = ?") params.append(f"$.{key}") params.append(value) where_clause = " AND ".join(clauses) rows = await self._fetchall( f""" SELECT id, ts, ts_iso, emoji, agent, message, meta, raw_line FROM scribe_entries WHERE {where_clause} ORDER BY ts_iso DESC LIMIT ?; """, (*params, fetch_limit), ) results: List[Dict[str, Any]] = [] for row in rows: meta_value = json.loads(row["meta"]) if row["meta"] else {} entry = { "id": row["id"], "ts": row["ts"], "emoji": row["emoji"], "agent": row["agent"], "message": row["message"], "meta": meta_value, "raw_line": row["raw_line"], } if not message_matches( entry["message"], message, mode=message_mode, case_sensitive=case_sensitive, ): continue results.append(entry) if len(results) >= limit: break return results async def _initialise(self) -> None: async with self._init_lock: if self._initialised: return await asyncio.to_thread(self._path.parent.mkdir, parents=True, exist_ok=True) await self._execute_many( [ """ CREATE TABLE IF NOT EXISTS scribe_projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, repo_root TEXT NOT NULL, progress_log_path TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); """, """ CREATE TABLE IF NOT EXISTS scribe_entries ( id TEXT PRIMARY KEY, project_id INTEGER NOT NULL REFERENCES scribe_projects(id) ON DELETE CASCADE, ts TEXT NOT NULL, ts_iso TEXT NOT NULL, emoji TEXT NOT NULL, agent TEXT, message TEXT NOT NULL, meta TEXT, raw_line TEXT NOT NULL, sha256 TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); """, """ CREATE TABLE IF NOT EXISTS scribe_metrics ( project_id INTEGER PRIMARY KEY REFERENCES scribe_projects(id) ON DELETE CASCADE, total_entries INTEGER NOT NULL DEFAULT 0, success_count INTEGER NOT NULL DEFAULT 0, warn_count INTEGER NOT NULL DEFAULT 0, error_count INTEGER NOT NULL DEFAULT 0, last_update TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); """, """ CREATE TABLE IF NOT EXISTS agent_sessions ( id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, started_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, last_active_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, status TEXT NOT NULL CHECK (status IN ('active','expired')) DEFAULT 'active', metadata TEXT ); """, """ CREATE INDEX IF NOT EXISTS idx_agent_sessions_agent ON agent_sessions(agent_id); """, """ CREATE TABLE IF NOT EXISTS agent_projects ( agent_id TEXT PRIMARY KEY, project_name TEXT, version INTEGER NOT NULL DEFAULT 0, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_by TEXT, session_id TEXT, FOREIGN KEY(project_name) REFERENCES scribe_projects(name) ON DELETE SET NULL ); """, """ CREATE INDEX IF NOT EXISTS idx_agent_projects_updated_at ON agent_projects(updated_at DESC); """, """ CREATE TABLE IF NOT EXISTS agent_project_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_id TEXT NOT NULL, session_id TEXT NOT NULL, event_type TEXT NOT NULL CHECK (event_type IN ('project_set', 'project_switched', 'session_started', 'session_ended', 'conflict_detected')), from_project TEXT, to_project TEXT NOT NULL, expected_version INTEGER, actual_version INTEGER, success BOOLEAN NOT NULL DEFAULT 1, error_message TEXT, metadata TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); """, """ CREATE INDEX IF NOT EXISTS idx_agent_project_events_agent_id ON agent_project_events(agent_id); """, """ CREATE INDEX IF NOT EXISTS idx_agent_project_events_created_at ON agent_project_events(created_at); """, """ CREATE TABLE IF NOT EXISTS doc_changes ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL REFERENCES scribe_projects(id) ON DELETE CASCADE, doc_name TEXT NOT NULL, section TEXT, action TEXT NOT NULL, agent TEXT, metadata TEXT, sha_before TEXT, sha_after TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); """, """ CREATE INDEX IF NOT EXISTS idx_doc_changes_project ON doc_changes(project_id, created_at DESC); """, """ CREATE TABLE IF NOT EXISTS dev_plans ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL REFERENCES scribe_projects(id) ON DELETE CASCADE, project_name TEXT NOT NULL, plan_type TEXT NOT NULL CHECK (plan_type IN ('architecture', 'phase_plan', 'checklist', 'progress_log')), file_path TEXT NOT NULL, version TEXT NOT NULL DEFAULT '1.0', created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, metadata TEXT, UNIQUE(project_id, plan_type) ); """, """ CREATE TABLE IF NOT EXISTS phases ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL REFERENCES scribe_projects(id) ON DELETE CASCADE, dev_plan_id INTEGER NOT NULL REFERENCES dev_plans(id) ON DELETE CASCADE, phase_number INTEGER NOT NULL, phase_name TEXT NOT NULL, status TEXT NOT NULL CHECK (status IN ('planned', 'in_progress', 'completed', 'blocked')) DEFAULT 'planned', start_date TEXT, end_date TEXT, deliverables_count INTEGER NOT NULL DEFAULT 0, deliverables_completed INTEGER NOT NULL DEFAULT 0, confidence_score REAL NOT NULL DEFAULT 0.0 CHECK (confidence_score >= 0.0 AND confidence_score <= 1.0), metadata TEXT, UNIQUE(project_id, phase_number) ); """, """ CREATE TABLE IF NOT EXISTS milestones ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL REFERENCES scribe_projects(id) ON DELETE CASCADE, phase_id INTEGER REFERENCES phases(id) ON DELETE SET NULL, milestone_name TEXT NOT NULL, description TEXT, status TEXT NOT NULL CHECK (status IN ('pending', 'in_progress', 'completed', 'overdue')) DEFAULT 'pending', target_date TEXT, completed_date TEXT, evidence_url TEXT, metadata TEXT ); """, """ CREATE TABLE IF NOT EXISTS benchmarks ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL REFERENCES scribe_projects(id) ON DELETE CASCADE, benchmark_type TEXT NOT NULL CHECK (benchmark_type IN ('hash_performance', 'throughput', 'latency', 'stress_test', 'integrity', 'concurrency')), test_name TEXT NOT NULL, metric_name TEXT NOT NULL, metric_value REAL NOT NULL, metric_unit TEXT NOT NULL, test_parameters TEXT, environment_info TEXT, test_timestamp TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, requirement_target REAL, requirement_met BOOLEAN NOT NULL DEFAULT FALSE ); """, """ CREATE TABLE IF NOT EXISTS checklists ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL REFERENCES scribe_projects(id) ON DELETE CASCADE, phase_id INTEGER REFERENCES phases(id) ON DELETE SET NULL, checklist_item TEXT NOT NULL, status TEXT NOT NULL CHECK (status IN ('pending', 'in_progress', 'completed', 'blocked')) DEFAULT 'pending', acceptance_criteria TEXT NOT NULL, proof_required BOOLEAN NOT NULL DEFAULT TRUE, proof_url TEXT, assignee TEXT, priority TEXT NOT NULL CHECK (priority IN ('low', 'medium', 'high', 'critical')) DEFAULT 'medium', created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, completed_at TEXT, metadata TEXT ); """, """ CREATE TABLE IF NOT EXISTS performance_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL REFERENCES scribe_projects(id) ON DELETE CASCADE, metric_category TEXT NOT NULL CHECK (metric_category IN ('development', 'testing', 'deployment', 'operations')), metric_name TEXT NOT NULL, metric_value REAL NOT NULL, metric_unit TEXT NOT NULL, baseline_value REAL, improvement_percentage REAL, collection_timestamp TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, metadata TEXT ); """, # Indexes for performance "CREATE INDEX IF NOT EXISTS idx_entries_project_ts ON scribe_entries(project_id, ts_iso DESC);", "CREATE INDEX IF NOT EXISTS idx_dev_plans_project_type ON dev_plans(project_id, plan_type);", "CREATE INDEX IF NOT EXISTS idx_phases_project_status ON phases(project_id, status);", "CREATE INDEX IF NOT EXISTS idx_milestones_project_status ON milestones(project_id, status);", "CREATE INDEX IF NOT EXISTS idx_benchmarks_project_type ON benchmarks(project_id, benchmark_type);", "CREATE INDEX IF NOT EXISTS idx_benchmarks_timestamp ON benchmarks(test_timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_checklists_project_status ON checklists(project_id, status);", "CREATE INDEX IF NOT EXISTS idx_checklists_phase ON checklists(phase_id);", "CREATE INDEX IF NOT EXISTS idx_metrics_project_category ON performance_metrics(project_id, metric_category);", "CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON performance_metrics(collection_timestamp DESC);", # Document Management 2.0 Tables """ CREATE TABLE IF NOT EXISTS document_sections ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL REFERENCES scribe_projects(id) ON DELETE CASCADE, document_type TEXT NOT NULL, section_id TEXT NOT NULL, content TEXT NOT NULL, file_hash TEXT NOT NULL, metadata TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, UNIQUE(project_id, document_type, section_id) ); """, """ CREATE TABLE IF NOT EXISTS custom_templates ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL REFERENCES scribe_projects(id) ON DELETE CASCADE, template_name TEXT NOT NULL, template_content TEXT NOT NULL, variables TEXT, is_global BOOLEAN NOT NULL DEFAULT FALSE, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, UNIQUE(project_id, template_name) ); """, """ CREATE TABLE IF NOT EXISTS document_changes ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL REFERENCES scribe_projects(id) ON DELETE CASCADE, document_path TEXT NOT NULL, change_type TEXT NOT NULL, old_content_hash TEXT, new_content_hash TEXT, change_summary TEXT NOT NULL, metadata TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); """, """ CREATE TABLE IF NOT EXISTS sync_status ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL REFERENCES scribe_projects(id) ON DELETE CASCADE, file_path TEXT NOT NULL, last_sync_at TEXT, last_file_hash TEXT, last_db_hash TEXT, sync_status TEXT NOT NULL DEFAULT 'synced', conflict_details TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, UNIQUE(project_id, file_path) ); """, # Document Management 2.0 Indexes "CREATE INDEX IF NOT EXISTS idx_document_sections_project ON document_sections(project_id);", "CREATE INDEX IF NOT EXISTS idx_document_sections_updated ON document_sections(updated_at);", "CREATE INDEX IF NOT EXISTS idx_document_changes_project ON document_changes(project_id);", "CREATE INDEX IF NOT EXISTS idx_document_changes_created ON document_changes(created_at);", "CREATE INDEX IF NOT EXISTS idx_sync_status_project ON sync_status(project_id);", "CREATE INDEX IF NOT EXISTS idx_sync_status_status ON sync_status(sync_status);", # Full-text search for document content """ CREATE VIRTUAL TABLE IF NOT EXISTS document_sections_fts USING fts5(document_type, section_id, content, content=document_sections, content_rowid=id) """, """ CREATE TRIGGER IF NOT EXISTS document_sections_fts_insert AFTER INSERT ON document_sections BEGIN INSERT INTO document_sections_fts(rowid, document_type, section_id, content) VALUES (new.id, new.document_type, new.section_id, new.content); END """, """ CREATE TRIGGER IF NOT EXISTS document_sections_fts_delete AFTER DELETE ON document_sections BEGIN INSERT INTO document_sections_fts(document_sections_fts, rowid, document_type, section_id, content) VALUES ('delete', old.id, old.document_type, old.section_id, old.content); END """, """ CREATE TRIGGER IF NOT EXISTS document_sections_fts_update AFTER UPDATE ON document_sections BEGIN INSERT INTO document_sections_fts(document_sections_fts, rowid, document_type, section_id, content) VALUES ('delete', old.id, old.document_type, old.section_id, old.content); INSERT INTO document_sections_fts(rowid, document_type, section_id, content) VALUES (new.id, new.document_type, new.section_id, new.content); END """, ] ) self._initialised = True async def _execute(self, query: str, params: tuple[Any, ...]) -> None: await asyncio.to_thread(self._execute_sync, query, params) def _execute_sync(self, query: str, params: tuple[Any, ...]) -> None: conn = self._connect() try: conn.execute(query, params) conn.commit() finally: conn.close() async def _execute_many(self, statements: List[str]) -> None: await asyncio.to_thread(self._execute_many_sync, statements) def _execute_many_sync(self, statements: List[str]) -> None: conn = self._connect() try: for statement in statements: conn.execute(statement) conn.commit() finally: conn.close() async def _fetchone(self, query: str, params: tuple[Any, ...]) -> Optional[sqlite3.Row]: return await asyncio.to_thread(self._fetchone_sync, query, params) def _fetchone_sync(self, query: str, params: tuple[Any, ...]) -> Optional[sqlite3.Row]: conn = self._connect() try: cursor = conn.execute(query, params) row = cursor.fetchone() return row finally: conn.close() async def _fetchall(self, query: str, params: tuple[Any, ...] | tuple = ()) -> List[sqlite3.Row]: return await asyncio.to_thread(self._fetchall_sync, query, params) def _fetchall_sync(self, query: str, params: tuple[Any, ...] | tuple = ()) -> List[sqlite3.Row]: conn = self._connect() try: cursor = conn.execute(query, params) rows = cursor.fetchall() return rows finally: conn.close() def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect( self._path, detect_types=sqlite3.PARSE_DECLTYPES, timeout=SQLITE_TIMEOUT_SECONDS, check_same_thread=False, ) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON;") conn.execute(f"PRAGMA busy_timeout = {SQLITE_BUSY_TIMEOUT_MS};") return conn # Development Plan Tracking Methods async def upsert_dev_plan( self, *, project_id: int, project_name: str, plan_type: str, file_path: str, version: str = "1.0", metadata: Optional[Dict[str, Any]] = None, ) -> DevPlanRecord: """Insert or update a development plan record.""" await self._initialise() meta_json = json.dumps(metadata or {}, sort_keys=True) async with self._write_lock: await self._execute( """ INSERT INTO dev_plans (project_id, project_name, plan_type, file_path, version, metadata, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(project_id, plan_type) DO UPDATE SET file_path = excluded.file_path, version = excluded.version, metadata = excluded.metadata, updated_at = excluded.updated_at; """, (project_id, project_name, plan_type, file_path, version, meta_json, utcnow().isoformat()), ) row = await self._fetchone( """ SELECT id, project_id, project_name, plan_type, file_path, version, created_at, updated_at, metadata FROM dev_plans WHERE project_id = ? AND plan_type = ?; """, (project_id, plan_type), ) return DevPlanRecord( id=row["id"], project_id=row["project_id"], project_name=row["project_name"], plan_type=row["plan_type"], file_path=row["file_path"], version=row["version"], created_at=datetime.fromisoformat(row["created_at"]), updated_at=datetime.fromisoformat(row["updated_at"]), metadata=json.loads(row["metadata"]) if row["metadata"] else None, ) async def upsert_phase( self, *, project_id: int, dev_plan_id: int, phase_number: int, phase_name: str, status: str = "planned", start_date: Optional[str] = None, end_date: Optional[str] = None, deliverables_count: int = 0, deliverables_completed: int = 0, confidence_score: float = 0.0, metadata: Optional[Dict[str, Any]] = None, ) -> PhaseRecord: """Insert or update a phase record.""" await self._initialise() meta_json = json.dumps(metadata or {}, sort_keys=True) async with self._write_lock: await self._execute( """ INSERT INTO phases (project_id, dev_plan_id, phase_number, phase_name, status, start_date, end_date, deliverables_count, deliverables_completed, confidence_score, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(project_id, phase_number) DO UPDATE SET phase_name = excluded.phase_name, status = excluded.status, start_date = excluded.start_date, end_date = excluded.end_date, deliverables_count = excluded.deliverables_count, deliverables_completed = excluded.deliverables_completed, confidence_score = excluded.confidence_score, metadata = excluded.metadata; """, (project_id, dev_plan_id, phase_number, phase_name, status, start_date, end_date, deliverables_count, deliverables_completed, confidence_score, meta_json), ) row = await self._fetchone( """ SELECT id, project_id, dev_plan_id, phase_number, phase_name, status, start_date, end_date, deliverables_count, deliverables_completed, confidence_score, metadata FROM phases WHERE project_id = ? AND phase_number = ?; """, (project_id, phase_number), ) return PhaseRecord( id=row["id"], project_id=row["project_id"], dev_plan_id=row["dev_plan_id"], phase_number=row["phase_number"], phase_name=row["phase_name"], status=row["status"], start_date=datetime.fromisoformat(row["start_date"]) if row["start_date"] else None, end_date=datetime.fromisoformat(row["end_date"]) if row["end_date"] else None, deliverables_count=row["deliverables_count"], deliverables_completed=row["deliverables_completed"], confidence_score=row["confidence_score"], metadata=json.loads(row["metadata"]) if row["metadata"] else None, ) async def store_benchmark( self, *, project_id: int, benchmark_type: str, test_name: str, metric_name: str, metric_value: float, metric_unit: str, test_parameters: Optional[Dict[str, Any]] = None, environment_info: Optional[Dict[str, Any]] = None, requirement_target: Optional[float] = None, ) -> BenchmarkRecord: """Store a benchmark result.""" await self._initialise() test_params_json = json.dumps(test_parameters or {}, sort_keys=True) env_info_json = json.dumps(environment_info or {}, sort_keys=True) requirement_met = (requirement_target is not None and ((benchmark_type in ['throughput', 'hash_performance'] and metric_value >= requirement_target) or (benchmark_type in ['latency', 'time'] and metric_value <= requirement_target) or (requirement_target > 0 and metric_value <= requirement_target) or (requirement_target < 0 and metric_value >= requirement_target))) row = await self._fetchone( """ INSERT INTO benchmarks (project_id, benchmark_type, test_name, metric_name, metric_value, metric_unit, test_parameters, environment_info, requirement_target, requirement_met) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id, project_id, benchmark_type, test_name, metric_name, metric_value, metric_unit, test_parameters, environment_info, test_timestamp, requirement_target, requirement_met; """, (project_id, benchmark_type, test_name, metric_name, metric_value, metric_unit, test_params_json, env_info_json, requirement_target, requirement_met), ) return BenchmarkRecord( id=row["id"], project_id=row["project_id"], benchmark_type=row["benchmark_type"], test_name=row["test_name"], metric_name=row["metric_name"], metric_value=row["metric_value"], metric_unit=row["metric_unit"], test_parameters=json.loads(row["test_parameters"]) if row["test_parameters"] else None, environment_info=json.loads(row["environment_info"]) if row["environment_info"] else None, test_timestamp=datetime.fromisoformat(row["test_timestamp"]), requirement_target=row["requirement_target"], requirement_met=bool(row["requirement_met"]), ) async def get_project_benchmarks( self, *, project_id: int, benchmark_type: Optional[str] = None, limit: int = 100, ) -> List[BenchmarkRecord]: """Get benchmark results for a project.""" await self._initialise() params = [project_id] query = """ SELECT id, project_id, benchmark_type, test_name, metric_name, metric_value, metric_unit, test_parameters, environment_info, test_timestamp, requirement_target, requirement_met FROM benchmarks WHERE project_id = ? """ if benchmark_type: query += " AND benchmark_type = ?" params.append(benchmark_type) query += " ORDER BY test_timestamp DESC LIMIT ?" params.append(limit) rows = await self._fetchall(query, tuple(params)) results = [] for row in rows: results.append(BenchmarkRecord( id=row["id"], project_id=row["project_id"], benchmark_type=row["benchmark_type"], test_name=row["test_name"], metric_name=row["metric_name"], metric_value=row["metric_value"], metric_unit=row["metric_unit"], test_parameters=json.loads(row["test_parameters"]) if row["test_parameters"] else None, environment_info=json.loads(row["environment_info"]) if row["environment_info"] else None, test_timestamp=datetime.fromisoformat(row["test_timestamp"]), requirement_target=row["requirement_target"], requirement_met=bool(row["requirement_met"]), )) return results async def store_performance_metric( self, *, project_id: int, metric_category: str, metric_name: str, metric_value: float, metric_unit: str, baseline_value: Optional[float] = None, metadata: Optional[Dict[str, Any]] = None, ) -> PerformanceMetricsRecord: """Store a performance metric.""" await self._initialise() meta_json = json.dumps(metadata or {}, sort_keys=True) # Calculate improvement percentage if baseline provided improvement_percentage = None if baseline_value is not None and baseline_value != 0: improvement_percentage = ((metric_value - baseline_value) / abs(baseline_value)) * 100 row = await self._fetchone( """ INSERT INTO performance_metrics (project_id, metric_category, metric_name, metric_value, metric_unit, baseline_value, improvement_percentage, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id, project_id, metric_category, metric_name, metric_value, metric_unit, baseline_value, improvement_percentage, collection_timestamp, metadata; """, (project_id, metric_category, metric_name, metric_value, metric_unit, baseline_value, improvement_percentage, meta_json), ) return PerformanceMetricsRecord( id=row["id"], project_id=row["project_id"], metric_category=row["metric_category"], metric_name=row["metric_name"], metric_value=row["metric_value"], metric_unit=row["metric_unit"], baseline_value=row["baseline_value"], improvement_percentage=row["improvement_percentage"], collection_timestamp=datetime.fromisoformat(row["collection_timestamp"]), metadata=json.loads(row["metadata"]) if row["metadata"] else None, ) # Agent session and project context management methods async def upsert_agent_session(self, agent_id: str, session_id: str, metadata: Optional[Dict[str, Any]]) -> None: """Create or update an agent session.""" await self._initialise() metadata_json = json.dumps(metadata or {}) if metadata else None async with self._write_lock: await self._execute( """ INSERT INTO agent_sessions (id, agent_id, started_at, last_active_at, status, metadata) VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 'active', ?) ON CONFLICT(id) DO UPDATE SET last_active_at = CURRENT_TIMESTAMP, status = 'active', metadata = excluded.metadata; """, (session_id, agent_id, metadata_json) ) async def heartbeat_session(self, session_id: str) -> None: """Update session last_active_at timestamp.""" await self._initialise() async with self._write_lock: await self._execute( """ UPDATE agent_sessions SET last_active_at = CURRENT_TIMESTAMP WHERE id = ? AND status = 'active'; """, (session_id,) ) async def end_session(self, session_id: str) -> None: """Mark a session as expired.""" await self._initialise() async with self._write_lock: await self._execute( """ UPDATE agent_sessions SET status = 'expired', last_active_at = CURRENT_TIMESTAMP WHERE id = ?; """, (session_id,) ) async def get_agent_project(self, agent_id: str) -> Optional[Dict[str, Any]]: """Get an agent's current project with version info.""" await self._initialise() row = await self._fetchone( """ SELECT agent_id, project_name, version, updated_at, updated_by, session_id FROM agent_projects WHERE agent_id = ?; """, (agent_id,) ) if not row: return None return { "agent_id": row["agent_id"], "project_name": row["project_name"], "version": row["version"], "updated_at": row["updated_at"], "updated_by": row["updated_by"], "session_id": row["session_id"] } async def set_agent_project(self, agent_id: str, project_name: Optional[str], expected_version: Optional[int], updated_by: str, session_id: str) -> Dict[str, Any]: """Set an agent's current project with optimistic concurrency control.""" await self._initialise() async with self._write_lock: if expected_version is not None: # Optimistic concurrency check cursor = await self._fetchone( """ UPDATE agent_projects SET project_name = ?, version = version + 1, updated_at = CURRENT_TIMESTAMP, updated_by = ?, session_id = ? WHERE agent_id = ? AND version = ? RETURNING agent_id, project_name, version, updated_at, updated_by, session_id; """, (project_name, updated_by, session_id, agent_id, expected_version) ) if not cursor: from scribe_mcp.storage.base import ConflictError raise ConflictError(f"Version conflict for agent {agent_id}: expected version {expected_version}") result = cursor else: # First time or no version check await self._execute( """ INSERT INTO agent_projects (agent_id, project_name, version, updated_by, session_id) VALUES (?, ?, 1, ?, ?) ON CONFLICT(agent_id) DO UPDATE SET project_name = excluded.project_name, version = version + 1, updated_at = CURRENT_TIMESTAMP, updated_by = excluded.updated_by, session_id = excluded.session_id; """, (agent_id, project_name, updated_by, session_id) ) result = await self.get_agent_project(agent_id) return result or await self.get_agent_project(agent_id)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/paxocial/scribe_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server