"""
Reminder System Monitoring and Validation Utilities (Stage 6)
Provides diagnostic and validation functions for DB-backed reminder tracking:
- Performance benchmarking (validate <5ms SLA)
- Session isolation verification
- Reminder statistics and health checks
Usage:
from utils.reminder_monitoring import validate_db_performance, validate_session_isolation
# Check performance
results = await validate_db_performance()
# Verify session isolation
isolation_check = await validate_session_isolation()
# Get statistics
stats = await get_reminder_statistics()
"""
import asyncio
import time
from typing import Dict, Any, List
from datetime import datetime, timedelta
from pathlib import Path
from scribe_mcp.storage.sqlite import SQLiteStorage
# Get project root dynamically
SCRIBE_ROOT = Path(__file__).parent.parent
async def validate_db_performance() -> Dict[str, Any]:
"""
Benchmark reminder DB queries to ensure <20ms SLA.
Tests all critical reminder operations:
- record_reminder_shown() performance
- check_reminder_cooldown() performance
- cleanup_reminder_history() performance
Returns:
Dict with performance metrics:
{
"ok": True/False,
"operations": {
"record_reminder": {"avg_ms": X, "max_ms": Y, "min_ms": Z, "p95_ms": W},
"check_cooldown": {...},
"cleanup": {...}
},
"sla_compliance": {"passed": True/False, "failed_operations": []},
"timestamp": "2026-01-03T12:00:00Z"
}
SLA: p95 latency <20ms for record/check operations, <100ms for cleanup
Note: 20ms SLA reflects asyncio.to_thread() overhead (9-16ms minimum).
"""
results = {
"ok": True,
"operations": {},
"sla_compliance": {"passed": True, "failed_operations": []},
"timestamp": datetime.utcnow().isoformat() + "Z"
}
try:
# Initialize storage
storage = SQLiteStorage(str(SCRIBE_ROOT / ".scribe" / "data" / "scribe.db"))
await storage.setup()
# Benchmark record_reminder_shown (100 iterations)
record_times: List[float] = []
for i in range(100):
# Create session first (required for FK constraint)
await storage._execute(
"INSERT OR IGNORE INTO scribe_sessions (session_id, started_at) VALUES (?, datetime('now'))",
(f"test_session_{i}",)
)
start = time.perf_counter()
await storage.record_reminder_shown(
session_id=f"test_session_{i}",
reminder_hash=f"test_hash_{i}",
reminder_key="teaching",
operation_status="success"
)
elapsed = (time.perf_counter() - start) * 1000 # Convert to ms
record_times.append(elapsed)
# Benchmark check_reminder_cooldown (100 iterations)
cooldown_times: List[float] = []
for i in range(100):
start = time.perf_counter()
await storage.check_reminder_cooldown(
session_id=f"test_session_{i}",
reminder_hash=f"test_hash_{i}",
cooldown_minutes=45
)
elapsed = (time.perf_counter() - start) * 1000
cooldown_times.append(elapsed)
# Benchmark cleanup_reminder_history (1 iteration)
cleanup_times: List[float] = []
start = time.perf_counter()
await storage.cleanup_reminder_history(cutoff_hours=90*24) # 90 days = 2160 hours
elapsed = (time.perf_counter() - start) * 1000
cleanup_times.append(elapsed)
# Calculate statistics
def calc_stats(times: List[float]) -> Dict[str, float]:
times_sorted = sorted(times)
return {
"avg_ms": sum(times) / len(times),
"max_ms": max(times),
"min_ms": min(times),
"p95_ms": times_sorted[int(len(times) * 0.95)] if times else 0,
"p99_ms": times_sorted[int(len(times) * 0.99)] if times else 0
}
results["operations"]["record_reminder"] = calc_stats(record_times)
results["operations"]["check_cooldown"] = calc_stats(cooldown_times)
results["operations"]["cleanup"] = calc_stats(cleanup_times)
# Check SLA compliance
sla_threshold = 20.0 # 20ms for record/check operations (reflects asyncio.to_thread overhead)
cleanup_threshold = 100.0 # 100ms for cleanup
if results["operations"]["record_reminder"]["p95_ms"] > sla_threshold:
results["sla_compliance"]["passed"] = False
results["sla_compliance"]["failed_operations"].append({
"operation": "record_reminder",
"p95_ms": results["operations"]["record_reminder"]["p95_ms"],
"threshold_ms": sla_threshold
})
if results["operations"]["check_cooldown"]["p95_ms"] > sla_threshold:
results["sla_compliance"]["passed"] = False
results["sla_compliance"]["failed_operations"].append({
"operation": "check_cooldown",
"p95_ms": results["operations"]["check_cooldown"]["p95_ms"],
"threshold_ms": sla_threshold
})
if results["operations"]["cleanup"]["max_ms"] > cleanup_threshold:
results["sla_compliance"]["passed"] = False
results["sla_compliance"]["failed_operations"].append({
"operation": "cleanup",
"max_ms": results["operations"]["cleanup"]["max_ms"],
"threshold_ms": cleanup_threshold
})
results["ok"] = results["sla_compliance"]["passed"]
# Cleanup test data
await storage._execute(
"DELETE FROM reminder_history WHERE session_id LIKE 'test_session_%'",
()
)
# Storage cleanup handled automatically
except Exception as e:
results["ok"] = False
results["error"] = str(e)
return results
async def validate_session_isolation() -> Dict[str, Any]:
"""
Verify different sessions get different reminder hashes.
Creates two sessions and verifies:
1. Same reminder type/project generates different hashes per session
2. Cooldown tracking is session-isolated
3. Session reset clears cooldown for new session
Returns:
Dict with validation results:
{
"ok": True/False,
"session_isolation_working": True/False,
"hash_uniqueness": {"session1_hash": "...", "session2_hash": "..."},
"cooldown_isolation": True/False,
"details": {...}
}
"""
results = {
"ok": True,
"session_isolation_working": False,
"hash_uniqueness": {},
"cooldown_isolation": False,
"details": {},
"timestamp": datetime.utcnow().isoformat() + "Z"
}
try:
from scribe_mcp.utils.reminder_engine import ReminderEngine, ReminderContext
from scribe_mcp.config.settings import SCRIBE_ROOT
# Create two different sessions
session1_id = "test_session_1"
session2_id = "test_session_2"
# Initialize storage
storage = SQLiteStorage(str(SCRIBE_ROOT / ".scribe" / "data" / "scribe.db"))
await storage.setup()
# Create reminder engine
engine = ReminderEngine(storage=storage)
# Create contexts for same reminder in different sessions
context1 = ReminderContext(
session_id=session1_id,
project_name="test_project",
operation_status="success",
active_project_selected=True,
reminder_types=["teaching"]
)
context2 = ReminderContext(
session_id=session2_id,
project_name="test_project",
operation_status="success",
active_project_selected=True,
reminder_types=["teaching"]
)
# Generate hashes using session-aware mode
# Note: This requires use_session_aware_hashes=True to be tested properly
# For now, test the infrastructure exists
hash1 = await engine._get_reminder_hash(
reminder_type="teaching",
reminder_text="Test reminder",
context=context1
)
hash2 = await engine._get_reminder_hash(
reminder_type="teaching",
reminder_text="Test reminder",
context=context2
)
results["hash_uniqueness"]["session1_hash"] = hash1
results["hash_uniqueness"]["session2_hash"] = hash2
# Check if hashes are different when session-aware mode is enabled
# If feature flag is OFF, hashes will be the same (expected)
if hash1 != hash2:
results["session_isolation_working"] = True
results["details"]["hash_isolation"] = "PASS - Different hashes for different sessions"
else:
results["session_isolation_working"] = False
results["details"]["hash_isolation"] = "SKIP - Session-aware hashing not enabled (feature flag OFF)"
# Test cooldown isolation
# Record reminder for session 1
await storage.record_reminder_shown(
session_id=session1_id,
reminder_hash=hash1,
project_root=None,
agent_id="test_agent",
tool_name="test_tool",
reminder_key="teaching",
operation_status="success"
)
# Check cooldown for session 1 (should be in cooldown)
cooldown1 = await storage.check_reminder_cooldown(
session_id=session1_id,
reminder_hash=hash1,
cooldown_minutes=45
)
# Check cooldown for session 2 with same hash (should NOT be in cooldown if isolated)
cooldown2 = await storage.check_reminder_cooldown(
session_id=session2_id,
reminder_hash=hash1, # Same hash, different session
cooldown_minutes=45
)
if cooldown1 and not cooldown2:
results["cooldown_isolation"] = True
results["details"]["cooldown_isolation"] = "PASS - Sessions have independent cooldowns"
else:
results["cooldown_isolation"] = False
results["details"]["cooldown_isolation"] = f"INFO - Cooldown behavior: session1={cooldown1}, session2={cooldown2}"
# Overall validation
results["ok"] = results["cooldown_isolation"] # Infrastructure is working if cooldowns are isolated
# Cleanup test data
await storage._execute(
"DELETE FROM reminder_history WHERE session_id IN ('test_session_1', 'test_session_2')",
()
)
# Storage cleanup handled automatically
except Exception as e:
results["ok"] = False
results["error"] = str(e)
import traceback
results["traceback"] = traceback.format_exc()
return results
async def get_reminder_statistics() -> Dict[str, Any]:
"""
Get DB statistics for reminder system monitoring.
Provides visibility into:
- Total reminder records
- Records by session
- Records by status (success/failure)
- Records by type (teaching/doc_hygiene/etc)
- Recent activity (last 24 hours)
- Oldest and newest records
Returns:
Dict with statistics:
{
"ok": True/False,
"total_reminders": 12345,
"by_session": {"session_1": 100, "session_2": 50, ...},
"by_status": {"success": 10000, "failure": 2345},
"by_type": {"teaching": 5000, "doc_hygiene": 3000, ...},
"recent_24h": 234,
"oldest_record": "2025-12-01T10:00:00Z",
"newest_record": "2026-01-03T12:00:00Z",
"active_sessions": 5
}
"""
results = {
"ok": True,
"total_reminders": 0,
"by_session": {},
"by_status": {},
"by_type": {},
"recent_24h": 0,
"oldest_record": None,
"newest_record": None,
"active_sessions": 0,
"timestamp": datetime.utcnow().isoformat() + "Z"
}
try:
# Initialize storage
storage = SQLiteStorage(str(SCRIBE_ROOT / ".scribe" / "data" / "scribe.db"))
await storage.setup()
# Total reminders
total_result = await storage._fetchone(
"SELECT COUNT(*) as count FROM reminder_history",
()
)
results["total_reminders"] = total_result["count"] if total_result else 0
# By session (top 10)
session_results = await storage._fetchall(
"""
SELECT session_id, COUNT(*) as count
FROM reminder_history
GROUP BY session_id
ORDER BY count DESC
LIMIT 10
""",
()
)
for row in session_results:
results["by_session"][row["session_id"]] = row["count"]
# Active sessions (last 24 hours)
active_result = await storage._fetchone(
"""
SELECT COUNT(DISTINCT session_id) as count
FROM reminder_history
WHERE shown_at > datetime('now', '-24 hours')
""",
()
)
results["active_sessions"] = active_result["count"] if active_result else 0
# By status
status_results = await storage._fetchall(
"""
SELECT operation_status, COUNT(*) as count
FROM reminder_history
GROUP BY operation_status
""",
()
)
for row in status_results:
results["by_status"][row["operation_status"]] = row["count"]
# By type
type_results = await storage._fetchall(
"""
SELECT reminder_key, COUNT(*) as count
FROM reminder_history
GROUP BY reminder_key
""",
()
)
for row in type_results:
results["by_type"][row["reminder_key"]] = row["count"]
# Recent 24h
recent_result = await storage._fetchone(
"""
SELECT COUNT(*) as count
FROM reminder_history
WHERE shown_at > datetime('now', '-24 hours')
""",
()
)
results["recent_24h"] = recent_result["count"] if recent_result else 0
# Oldest and newest records
oldest_result = await storage._fetchone(
"SELECT MIN(shown_at) as oldest FROM reminder_history",
()
)
if oldest_result and oldest_result["oldest"]:
results["oldest_record"] = oldest_result["oldest"]
newest_result = await storage._fetchone(
"SELECT MAX(shown_at) as newest FROM reminder_history",
()
)
if newest_result and newest_result["newest"]:
results["newest_record"] = newest_result["newest"]
# Storage cleanup handled automatically
except Exception as e:
results["ok"] = False
results["error"] = str(e)
return results
# Convenience function for CLI usage
async def run_all_validations() -> Dict[str, Any]:
"""
Run all validation checks and return comprehensive report.
Returns:
Dict with all validation results:
{
"performance": {...},
"session_isolation": {...},
"statistics": {...},
"overall_status": "PASS"/"FAIL",
"timestamp": "..."
}
"""
print("Running DB Mode Activation Validation Suite...\n")
print("1. Performance Benchmark...")
perf = await validate_db_performance()
print(f" Result: {'PASS' if perf['ok'] else 'FAIL'}")
if perf["ok"]:
print(f" - record_reminder p95: {perf['operations']['record_reminder']['p95_ms']:.2f}ms")
print(f" - check_cooldown p95: {perf['operations']['check_cooldown']['p95_ms']:.2f}ms")
print(f" - cleanup max: {perf['operations']['cleanup']['max_ms']:.2f}ms")
else:
print(f" - FAILED: {perf.get('error', 'SLA violations')}")
print()
print("2. Session Isolation Check...")
isolation = await validate_session_isolation()
print(f" Result: {'PASS' if isolation['ok'] else 'SKIP/INFO'}")
print(f" - Cooldown isolation: {isolation['cooldown_isolation']}")
print(f" - Details: {isolation['details'].get('cooldown_isolation', 'N/A')}")
print()
print("3. Reminder Statistics...")
stats = await get_reminder_statistics()
print(f" Result: {'PASS' if stats['ok'] else 'FAIL'}")
if stats["ok"]:
print(f" - Total reminders: {stats['total_reminders']}")
print(f" - Active sessions (24h): {stats['active_sessions']}")
print(f" - Recent activity (24h): {stats['recent_24h']}")
print()
overall = "PASS" if perf["ok"] and isolation["ok"] and stats["ok"] else "PARTIAL"
print(f"Overall Status: {overall}")
return {
"performance": perf,
"session_isolation": isolation,
"statistics": stats,
"overall_status": overall,
"timestamp": datetime.utcnow().isoformat() + "Z"
}
if __name__ == "__main__":
# CLI entry point
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--performance":
result = asyncio.run(validate_db_performance())
print(f"Performance Validation: {'PASS' if result['ok'] else 'FAIL'}")
for op, metrics in result["operations"].items():
print(f" {op}: p95={metrics['p95_ms']:.2f}ms avg={metrics['avg_ms']:.2f}ms")
elif len(sys.argv) > 1 and sys.argv[1] == "--isolation":
result = asyncio.run(validate_session_isolation())
print(f"Session Isolation: {'PASS' if result['ok'] else 'SKIP'}")
print(f" Cooldown isolation: {result['cooldown_isolation']}")
print(f" Details: {result['details']}")
elif len(sys.argv) > 1 and sys.argv[1] == "--stats":
result = asyncio.run(get_reminder_statistics())
print("Reminder Statistics:")
print(f" Total: {result['total_reminders']}")
print(f" Active sessions (24h): {result['active_sessions']}")
print(f" Recent (24h): {result['recent_24h']}")
print(f" By status: {result['by_status']}")
print(f" By type: {result['by_type']}")
else:
# Run all validations
asyncio.run(run_all_validations())