"""SQLite database connection management and migrations.
Provides thread-safe connection pooling and automatic schema migrations.
"""
from __future__ import annotations
import sqlite3
import threading
from collections.abc import Generator
from contextlib import contextmanager
from pathlib import Path
from typing import Any, cast
from loguru import logger
# Schema version for migrations
SCHEMA_VERSION = 2
# Migration SQL for v1 to v2: Training tables
MIGRATION_V1_TO_V2 = """
-- Training runs table for OpenPipe ART
CREATE TABLE IF NOT EXISTS training_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT UNIQUE NOT NULL,
status TEXT DEFAULT 'pending'
CHECK (status IN ('pending', 'generating', 'collecting', 'scoring',
'training', 'completed', 'failed', 'cancelled')),
model_name TEXT NOT NULL,
project_name TEXT NOT NULL,
trained_model_name TEXT,
task_description TEXT NOT NULL,
num_examples INTEGER DEFAULT 100,
rollouts_per_group INTEGER DEFAULT 4,
learning_rate REAL DEFAULT 0.00001,
num_epochs INTEGER DEFAULT 1,
max_steps INTEGER,
system_prompt TEXT,
created_at TEXT DEFAULT (datetime('now')),
started_at TEXT,
completed_at TEXT,
metrics TEXT,
error TEXT,
config TEXT
);
-- Trajectories table for storing rollout results
CREATE TABLE IF NOT EXISTS trajectories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT UNIQUE NOT NULL,
run_id INTEGER NOT NULL,
task_input TEXT NOT NULL,
messages TEXT NOT NULL,
reward REAL,
metadata TEXT,
created_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (run_id) REFERENCES training_runs(id) ON DELETE CASCADE
);
-- Indexes for training tables
CREATE INDEX IF NOT EXISTS idx_training_runs_status ON training_runs(status);
CREATE INDEX IF NOT EXISTS idx_training_runs_project ON training_runs(project_name);
CREATE INDEX IF NOT EXISTS idx_trajectories_run ON trajectories(run_id);
"""
# SQL schema definition for MCP Task Aggregator
SCHEMA_SQL = """
-- Extended todos table with external system fields
CREATE TABLE IF NOT EXISTS todos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT UNIQUE NOT NULL,
content TEXT NOT NULL,
status TEXT DEFAULT 'todo'
CHECK (status IN ('todo', 'in_progress', 'done', 'blocked', 'in_review', 'cancelled')),
priority INTEGER DEFAULT 0,
due_date TEXT,
completed_at TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now')),
-- External system fields
source_system TEXT DEFAULT 'local'
CHECK (source_system IN ('local', 'jira', 'github', 'linear', 'markdown', 'stm')),
source_id TEXT,
source_url TEXT,
external_metadata TEXT, -- JSON blob for system-specific metadata
last_synced_at TEXT,
sync_hash TEXT, -- Hash for change detection
-- Unique constraint for external tasks
UNIQUE (source_system, source_id)
);
-- Agent metadata table for AI suggestions
CREATE TABLE IF NOT EXISTS agent_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
todo_id INTEGER NOT NULL,
agent_type TEXT NOT NULL,
confidence_score REAL,
suggestions TEXT, -- JSON blob
auto_applied INTEGER DEFAULT 0,
human_approved INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (todo_id) REFERENCES todos(id) ON DELETE CASCADE
);
-- Sync log table for operation history
CREATE TABLE IF NOT EXISTS sync_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_system TEXT NOT NULL
CHECK (source_system IN ('jira', 'github', 'linear', 'markdown', 'stm')),
sync_type TEXT NOT NULL
CHECK (sync_type IN ('full', 'incremental')),
started_at TEXT DEFAULT (datetime('now')),
completed_at TEXT,
tasks_synced INTEGER DEFAULT 0,
tasks_created INTEGER DEFAULT 0,
tasks_updated INTEGER DEFAULT 0,
errors TEXT, -- JSON blob for error details
status TEXT DEFAULT 'running'
CHECK (status IN ('running', 'completed', 'failed', 'partial'))
);
-- Tags table
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
-- Junction table for todo tags
CREATE TABLE IF NOT EXISTS todo_tags (
todo_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (todo_id, tag_id),
FOREIGN KEY (todo_id) REFERENCES todos(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
);
-- Schema version tracking
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at TEXT DEFAULT (datetime('now'))
);
-- Training runs table for OpenPipe ART
CREATE TABLE IF NOT EXISTS training_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT UNIQUE NOT NULL,
status TEXT DEFAULT 'pending'
CHECK (status IN ('pending', 'generating', 'collecting', 'scoring', 'training', 'completed', 'failed', 'cancelled')),
model_name TEXT NOT NULL,
project_name TEXT NOT NULL,
trained_model_name TEXT,
task_description TEXT NOT NULL,
num_examples INTEGER DEFAULT 100,
rollouts_per_group INTEGER DEFAULT 4,
learning_rate REAL DEFAULT 0.00001,
num_epochs INTEGER DEFAULT 1,
max_steps INTEGER,
system_prompt TEXT,
created_at TEXT DEFAULT (datetime('now')),
started_at TEXT,
completed_at TEXT,
metrics TEXT, -- JSON blob for TrainingMetrics
error TEXT,
config TEXT -- JSON blob for additional config
);
-- Trajectories table for storing rollout results
CREATE TABLE IF NOT EXISTS trajectories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT UNIQUE NOT NULL,
run_id INTEGER NOT NULL,
task_input TEXT NOT NULL, -- JSON blob for TaskInput
messages TEXT NOT NULL, -- JSON array of messages
reward REAL,
metadata TEXT, -- JSON blob for additional metadata
created_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (run_id) REFERENCES training_runs(id) ON DELETE CASCADE
);
-- Indexes for common queries
CREATE INDEX IF NOT EXISTS idx_todos_status ON todos(status);
CREATE INDEX IF NOT EXISTS idx_todos_source_system ON todos(source_system);
CREATE INDEX IF NOT EXISTS idx_todos_source_id ON todos(source_system, source_id);
CREATE INDEX IF NOT EXISTS idx_todos_priority ON todos(priority);
CREATE INDEX IF NOT EXISTS idx_todos_due_date ON todos(due_date);
CREATE INDEX IF NOT EXISTS idx_agent_metadata_todo ON agent_metadata(todo_id);
CREATE INDEX IF NOT EXISTS idx_sync_log_source ON sync_log(source_system);
CREATE INDEX IF NOT EXISTS idx_sync_log_started ON sync_log(started_at);
CREATE INDEX IF NOT EXISTS idx_training_runs_status ON training_runs(status);
CREATE INDEX IF NOT EXISTS idx_training_runs_project ON training_runs(project_name);
CREATE INDEX IF NOT EXISTS idx_trajectories_run ON trajectories(run_id);
-- Composite indexes for tag filtering (batch queries)
CREATE INDEX IF NOT EXISTS idx_todo_tags_tag_todo ON todo_tags(tag_id, todo_id);
CREATE INDEX IF NOT EXISTS idx_todo_tags_todo_tag ON todo_tags(todo_id, tag_id);
CREATE INDEX IF NOT EXISTS idx_tags_name ON tags(name);
"""
def get_default_db_path() -> Path:
"""Get the default database path (~/.mcp-task-aggregator/tasks.db)."""
return Path.home() / ".mcp-task-aggregator" / "tasks.db"
class Database:
"""Thread-safe SQLite database manager with connection pooling.
Args:
db_path: Path to database file. Defaults to ~/.mcp-task-aggregator/tasks.db
check_same_thread: SQLite threading check. Set False for multi-threaded use.
"""
def __init__(
self,
db_path: Path | str | None = None,
*,
check_same_thread: bool = False,
) -> None:
self._db_path = Path(db_path) if db_path else get_default_db_path()
self._check_same_thread = check_same_thread
self._local = threading.local()
self._lock = threading.Lock()
# Ensure parent directory exists
self._db_path.parent.mkdir(parents=True, exist_ok=True)
# Initialize schema
self._init_schema()
@property
def path(self) -> Path:
"""Get the database file path."""
return self._db_path
def _get_connection(self) -> sqlite3.Connection:
"""Get or create a thread-local connection."""
conn: sqlite3.Connection | None = getattr(self._local, "connection", None)
if conn is None:
conn = sqlite3.connect(
str(self._db_path),
check_same_thread=self._check_same_thread,
)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA journal_mode = WAL") # Better concurrency
conn.execute("PRAGMA synchronous = NORMAL") # Faster writes, still safe
self._local.connection = conn
return conn
@contextmanager
def connection(self) -> Generator[sqlite3.Connection, None, None]:
"""Get a database connection context manager.
Yields:
SQLite connection with Row factory enabled.
"""
conn = self._get_connection()
try:
yield conn
except Exception:
conn.rollback()
raise
else:
conn.commit()
@contextmanager
def cursor(self) -> Generator[sqlite3.Cursor, None, None]:
"""Get a database cursor context manager.
Yields:
SQLite cursor.
"""
with self.connection() as conn:
cursor = conn.cursor()
try:
yield cursor
finally:
cursor.close()
def execute(
self,
sql: str,
params: tuple[Any, ...] | dict[str, Any] | None = None,
) -> sqlite3.Cursor:
"""Execute a SQL statement.
Args:
sql: SQL statement to execute.
params: Optional parameters for the statement.
Returns:
Cursor with results.
"""
with self.connection() as conn:
if params:
return conn.execute(sql, params)
return conn.execute(sql)
def executemany(
self,
sql: str,
params_seq: list[tuple[Any, ...]] | list[dict[str, Any]],
) -> sqlite3.Cursor:
"""Execute a SQL statement with multiple parameter sets.
Args:
sql: SQL statement to execute.
params_seq: Sequence of parameter sets.
Returns:
Cursor with results.
"""
with self.connection() as conn:
return conn.executemany(sql, params_seq)
def fetchone(
self,
sql: str,
params: tuple[Any, ...] | dict[str, Any] | None = None,
) -> sqlite3.Row | None:
"""Execute and fetch a single row.
Args:
sql: SQL statement to execute.
params: Optional parameters.
Returns:
Single row or None.
"""
cursor = self.execute(sql, params)
return cast("sqlite3.Row | None", cursor.fetchone())
def fetchall(
self,
sql: str,
params: tuple[Any, ...] | dict[str, Any] | None = None,
) -> list[sqlite3.Row]:
"""Execute and fetch all rows.
Args:
sql: SQL statement to execute.
params: Optional parameters.
Returns:
List of rows.
"""
cursor = self.execute(sql, params)
return cursor.fetchall()
def _init_schema(self) -> None:
"""Initialize database schema and run migrations."""
with self._lock, self.connection() as conn:
# Create schema
conn.executescript(SCHEMA_SQL)
# Check and record schema version
cursor = conn.execute("SELECT version FROM schema_version ORDER BY version DESC LIMIT 1")
row = cursor.fetchone()
current_version = row["version"] if row else 0
if current_version < SCHEMA_VERSION:
# Run migrations here if needed
self._run_migrations(conn, current_version, SCHEMA_VERSION)
# Record new version
conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",
(SCHEMA_VERSION,),
)
logger.debug(f"Database initialized at {self._db_path} (schema v{SCHEMA_VERSION})")
def _run_migrations(
self,
conn: sqlite3.Connection,
from_version: int,
to_version: int,
) -> None:
"""Run database migrations between versions.
Args:
conn: Database connection.
from_version: Current schema version.
to_version: Target schema version.
"""
_ = to_version # Used implicitly via version checks
# Migration from v1 to v2: Add training tables
if from_version < 2:
logger.info("Migrating database from v1 to v2: Adding training tables")
conn.executescript(MIGRATION_V1_TO_V2)
def close(self) -> None:
"""Close the thread-local connection."""
if hasattr(self._local, "connection") and self._local.connection:
self._local.connection.close()
self._local.connection = None