db.py•25.4 kB
"""SQLite database operations for project tracking."""
import json
import sqlite3
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any
from .models import (
Complexity,
Note,
NoteCreate,
Org,
OrgCreate,
OrgView,
Priority,
Project,
ProjectCreate,
ProjectView,
RoadmapView,
Task,
TaskCreate,
TaskStatus,
TaskUpdate,
TaskView,
Ticket,
TicketCreate,
TicketStatus,
TicketUpdate,
TicketView,
)
# Default database path
DEFAULT_DB_PATH = Path.home() / ".local" / "share" / "tpm-mcp" / "tpm.db"
def get_db_path() -> Path:
"""Get database path, creating parent directories if needed."""
db_path = Path(DEFAULT_DB_PATH)
db_path.parent.mkdir(parents=True, exist_ok=True)
return db_path
def init_db(db_path: Path | None = None) -> sqlite3.Connection:
"""Initialize database with schema."""
db_path = db_path or get_db_path()
conn = sqlite3.connect(str(db_path), check_same_thread=False)
conn.row_factory = sqlite3.Row
# Read and execute schema
schema_path = Path(__file__).parent / "schema.sql"
with open(schema_path) as f:
conn.executescript(f.read())
return conn
def _to_json(value: Any) -> str | None:
"""Convert a value to JSON string for storage."""
if value is None:
return None
return json.dumps(value)
def _from_json(value: str | None) -> Any:
"""Parse a JSON string from storage."""
if value is None:
return None
try:
return json.loads(value)
except (json.JSONDecodeError, TypeError):
return None
def _normalize_ticket_status(status: str) -> str:
"""Normalize ticket status (completed -> done)."""
if status == "completed":
return "done"
return status
def _normalize_task_status(status: str) -> str:
"""Normalize task status (completed -> done)."""
if status == "completed":
return "done"
return status
class TrackerDB:
"""Database operations for project tracking."""
def __init__(self, db_path: Path | None = None):
self.conn = init_db(db_path)
def _gen_id(self) -> str:
return str(uuid.uuid4())[:8]
def _now(self) -> str:
return datetime.utcnow().isoformat()
# --- Orgs ---
def create_org(self, data: OrgCreate) -> Org:
id = self._gen_id()
now = self._now()
self.conn.execute(
"INSERT INTO orgs (id, name, created_at) VALUES (?, ?, ?)", (id, data.name, now)
)
self.conn.commit()
return Org(id=id, name=data.name, created_at=datetime.fromisoformat(now))
def create_org_with_id(self, id: str, name: str, created_at: str | None = None) -> Org:
"""Create org with specific ID (for migration)."""
now = created_at or self._now()
self.conn.execute(
"INSERT OR REPLACE INTO orgs (id, name, created_at) VALUES (?, ?, ?)", (id, name, now)
)
self.conn.commit()
return Org(id=id, name=name, created_at=datetime.fromisoformat(now))
def get_org(self, org_id: str) -> Org | None:
row = self.conn.execute("SELECT * FROM orgs WHERE id = ?", (org_id,)).fetchone()
if row:
return Org(
id=row["id"], name=row["name"], created_at=datetime.fromisoformat(row["created_at"])
)
return None
def list_orgs(self) -> list[Org]:
rows = self.conn.execute("SELECT * FROM orgs ORDER BY name").fetchall()
return [
Org(id=r["id"], name=r["name"], created_at=datetime.fromisoformat(r["created_at"]))
for r in rows
]
# --- Projects ---
def create_project(self, data: ProjectCreate) -> Project:
id = self._gen_id()
now = self._now()
self.conn.execute(
"""INSERT INTO projects (id, org_id, name, repo_path, description, created_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(id, data.org_id, data.name, data.repo_path, data.description, now),
)
self.conn.commit()
return Project(
id=id,
org_id=data.org_id,
name=data.name,
repo_path=data.repo_path,
description=data.description,
created_at=datetime.fromisoformat(now),
)
def create_project_with_id(
self,
id: str,
org_id: str,
name: str,
repo_path: str | None = None,
description: str | None = None,
created_at: str | None = None,
) -> Project:
"""Create project with specific ID (for migration)."""
now = created_at or self._now()
self.conn.execute(
"""INSERT OR REPLACE INTO projects (id, org_id, name, repo_path, description, created_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(id, org_id, name, repo_path, description, now),
)
self.conn.commit()
return Project(
id=id,
org_id=org_id,
name=name,
repo_path=repo_path,
description=description,
created_at=datetime.fromisoformat(now),
)
def get_project(self, project_id: str) -> Project | None:
row = self.conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone()
if row:
return Project(
id=row["id"],
org_id=row["org_id"],
name=row["name"],
repo_path=row["repo_path"],
description=row["description"],
created_at=datetime.fromisoformat(row["created_at"]),
)
return None
def list_projects(self, org_id: str | None = None) -> list[Project]:
if org_id:
rows = self.conn.execute(
"SELECT * FROM projects WHERE org_id = ? ORDER BY name", (org_id,)
).fetchall()
else:
rows = self.conn.execute("SELECT * FROM projects ORDER BY name").fetchall()
return [
Project(
id=r["id"],
org_id=r["org_id"],
name=r["name"],
repo_path=r["repo_path"],
description=r["description"],
created_at=datetime.fromisoformat(r["created_at"]),
)
for r in rows
]
# --- Tickets ---
def _get_next_ticket_number(self, prefix: str) -> int:
"""Get the next sequential number for tickets with given prefix."""
# Find max existing number for this prefix (e.g., SENTRY-003 -> 3)
rows = self.conn.execute(
"SELECT id FROM tickets WHERE id LIKE ?", (f"{prefix}-%",)
).fetchall()
max_num = 0
for row in rows:
try:
# Extract number after prefix (e.g., "SENTRY-003" -> 3)
num_str = row["id"].split("-")[-1]
num = int(num_str)
max_num = max(max_num, num)
except (ValueError, IndexError):
continue
return max_num + 1
def create_ticket(self, data: TicketCreate) -> Ticket:
if data.id:
id = data.id
else:
# Auto-generate ID: PROJECT_ID-NNN (e.g., SENTRY-001, BACKEND-042)
project = self.get_project(data.project_id)
if project:
# Use project ID uppercased, replace spaces/special chars with nothing
prefix = project.id.upper().replace(" ", "").replace("-", "").replace("_", "")
else:
prefix = "TICKET"
next_num = self._get_next_ticket_number(prefix)
id = f"{prefix}-{next_num:03d}"
now = self._now()
self.conn.execute(
"""INSERT INTO tickets (id, project_id, title, description, status, priority, created_at,
assignees, tags, related_repos, acceptance_criteria, blockers, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
id,
data.project_id,
data.title,
data.description,
data.status.value,
data.priority.value,
now,
_to_json(data.assignees),
_to_json(data.tags),
_to_json(data.related_repos),
_to_json(data.acceptance_criteria),
_to_json(data.blockers),
_to_json(data.metadata),
),
)
self.conn.commit()
return Ticket(
id=id,
project_id=data.project_id,
title=data.title,
description=data.description,
status=data.status,
priority=data.priority,
created_at=datetime.fromisoformat(now),
assignees=data.assignees,
tags=data.tags,
related_repos=data.related_repos,
acceptance_criteria=data.acceptance_criteria,
blockers=data.blockers,
metadata=data.metadata,
)
def create_ticket_with_id(
self,
id: str,
project_id: str,
title: str,
description: str | None = None,
status: str = "backlog",
priority: str = "medium",
created_at: str | None = None,
started_at: str | None = None,
completed_at: str | None = None,
assignees: list | None = None,
tags: list | None = None,
related_repos: list | None = None,
acceptance_criteria: list | None = None,
blockers: list | None = None,
metadata: dict | None = None,
) -> Ticket:
"""Create ticket with specific ID (for migration)."""
now = created_at or self._now()
status = _normalize_ticket_status(status)
self.conn.execute(
"""INSERT OR REPLACE INTO tickets (id, project_id, title, description, status, priority,
created_at, started_at, completed_at, assignees, tags, related_repos,
acceptance_criteria, blockers, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
id,
project_id,
title,
description,
status,
priority,
now,
started_at,
completed_at,
_to_json(assignees),
_to_json(tags),
_to_json(related_repos),
_to_json(acceptance_criteria),
_to_json(blockers),
_to_json(metadata),
),
)
self.conn.commit()
return self.get_ticket(id)
def get_ticket(self, ticket_id: str) -> Ticket | None:
row = self.conn.execute("SELECT * FROM tickets WHERE id = ?", (ticket_id,)).fetchone()
if row:
return self._row_to_ticket(row)
return None
def _row_to_ticket(self, row) -> Ticket:
status = _normalize_ticket_status(row["status"])
return Ticket(
id=row["id"],
project_id=row["project_id"],
title=row["title"],
description=row["description"],
status=TicketStatus(status),
priority=Priority(row["priority"]),
created_at=datetime.fromisoformat(row["created_at"]),
started_at=datetime.fromisoformat(row["started_at"]) if row["started_at"] else None,
completed_at=datetime.fromisoformat(row["completed_at"])
if row["completed_at"]
else None,
assignees=_from_json(row["assignees"]),
tags=_from_json(row["tags"]),
related_repos=_from_json(row["related_repos"]),
acceptance_criteria=_from_json(row["acceptance_criteria"]),
blockers=_from_json(row["blockers"]),
metadata=_from_json(row["metadata"]),
)
def list_tickets(
self, project_id: str | None = None, status: TicketStatus | None = None
) -> list[Ticket]:
query = "SELECT * FROM tickets WHERE 1=1"
params = []
if project_id:
query += " AND project_id = ?"
params.append(project_id)
if status:
query += " AND status = ?"
params.append(status.value)
query += " ORDER BY priority, created_at"
rows = self.conn.execute(query, params).fetchall()
return [self._row_to_ticket(r) for r in rows]
def update_ticket(self, ticket_id: str, data: TicketUpdate) -> Ticket | None:
updates = []
params = []
if data.title is not None:
updates.append("title = ?")
params.append(data.title)
if data.description is not None:
updates.append("description = ?")
params.append(data.description)
if data.status is not None:
updates.append("status = ?")
params.append(data.status.value)
if data.status == TicketStatus.IN_PROGRESS:
updates.append("started_at = ?")
params.append(self._now())
elif data.status in (TicketStatus.DONE, TicketStatus.COMPLETED):
updates.append("completed_at = ?")
params.append(self._now())
if data.priority is not None:
updates.append("priority = ?")
params.append(data.priority.value)
if data.assignees is not None:
updates.append("assignees = ?")
params.append(_to_json(data.assignees))
if data.tags is not None:
updates.append("tags = ?")
params.append(_to_json(data.tags))
if data.related_repos is not None:
updates.append("related_repos = ?")
params.append(_to_json(data.related_repos))
if data.acceptance_criteria is not None:
updates.append("acceptance_criteria = ?")
params.append(_to_json(data.acceptance_criteria))
if data.blockers is not None:
updates.append("blockers = ?")
params.append(_to_json(data.blockers))
if data.metadata is not None:
updates.append("metadata = ?")
params.append(_to_json(data.metadata))
if not updates:
return self.get_ticket(ticket_id)
params.append(ticket_id)
self.conn.execute(f"UPDATE tickets SET {', '.join(updates)} WHERE id = ?", params)
self.conn.commit()
return self.get_ticket(ticket_id)
# --- Tasks ---
def create_task(self, data: TaskCreate) -> Task:
# Get ticket to extract prefix
ticket = self.get_ticket(data.ticket_id)
if not ticket:
raise ValueError(f"Ticket {data.ticket_id} not found")
# Count existing tasks for this ticket to generate task number
count = self.conn.execute(
"SELECT COUNT(*) FROM tasks WHERE ticket_id = ?", (data.ticket_id,)
).fetchone()[0]
# Generate task ID like TASK-TICKET-001-1
ticket_num = ticket.id.replace("TICKET-", "").replace("FEAT-", "").replace("ISSUE-", "")
id = f"TASK-{ticket_num}-{count + 1}"
now = self._now()
self.conn.execute(
"""INSERT INTO tasks (id, ticket_id, title, details, status, priority, complexity,
created_at, acceptance_criteria, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
id,
data.ticket_id,
data.title,
data.details,
data.status.value,
data.priority.value,
data.complexity.value,
now,
_to_json(data.acceptance_criteria),
_to_json(data.metadata),
),
)
self.conn.commit()
return Task(
id=id,
ticket_id=data.ticket_id,
title=data.title,
details=data.details,
status=data.status,
priority=data.priority,
complexity=data.complexity,
created_at=datetime.fromisoformat(now),
acceptance_criteria=data.acceptance_criteria,
metadata=data.metadata,
)
def create_task_with_id(
self,
id: str,
ticket_id: str,
title: str,
details: str | None = None,
status: str = "pending",
priority: str = "medium",
complexity: str = "medium",
created_at: str | None = None,
completed_at: str | None = None,
acceptance_criteria: list | None = None,
metadata: dict | None = None,
) -> Task:
"""Create task with specific ID (for migration)."""
now = created_at or self._now()
status = _normalize_task_status(status)
self.conn.execute(
"""INSERT OR REPLACE INTO tasks (id, ticket_id, title, details, status, priority, complexity,
created_at, completed_at, acceptance_criteria, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
id,
ticket_id,
title,
details,
status,
priority,
complexity,
now,
completed_at,
_to_json(acceptance_criteria),
_to_json(metadata),
),
)
self.conn.commit()
return self.get_task(id)
def get_task(self, task_id: str) -> Task | None:
row = self.conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone()
if row:
return self._row_to_task(row)
return None
def _row_to_task(self, row) -> Task:
status = _normalize_task_status(row["status"])
return Task(
id=row["id"],
ticket_id=row["ticket_id"],
title=row["title"],
details=row["details"],
status=TaskStatus(status),
priority=Priority(row["priority"] or "medium"),
complexity=Complexity(row["complexity"] or "medium"),
created_at=datetime.fromisoformat(row["created_at"]),
completed_at=datetime.fromisoformat(row["completed_at"])
if row["completed_at"]
else None,
acceptance_criteria=_from_json(row["acceptance_criteria"]),
metadata=_from_json(row["metadata"]),
)
def list_tasks(
self, ticket_id: str | None = None, status: TaskStatus | None = None
) -> list[Task]:
query = "SELECT * FROM tasks WHERE 1=1"
params = []
if ticket_id:
query += " AND ticket_id = ?"
params.append(ticket_id)
if status:
query += " AND status = ?"
params.append(status.value)
query += " ORDER BY created_at"
rows = self.conn.execute(query, params).fetchall()
return [self._row_to_task(r) for r in rows]
def update_task(self, task_id: str, data: TaskUpdate) -> Task | None:
updates = []
params = []
if data.title is not None:
updates.append("title = ?")
params.append(data.title)
if data.details is not None:
updates.append("details = ?")
params.append(data.details)
if data.status is not None:
updates.append("status = ?")
params.append(data.status.value)
if data.status in (TaskStatus.DONE, TaskStatus.COMPLETED):
updates.append("completed_at = ?")
params.append(self._now())
if data.priority is not None:
updates.append("priority = ?")
params.append(data.priority.value)
if data.complexity is not None:
updates.append("complexity = ?")
params.append(data.complexity.value)
if data.acceptance_criteria is not None:
updates.append("acceptance_criteria = ?")
params.append(_to_json(data.acceptance_criteria))
if data.metadata is not None:
updates.append("metadata = ?")
params.append(_to_json(data.metadata))
if not updates:
return self.get_task(task_id)
params.append(task_id)
self.conn.execute(f"UPDATE tasks SET {', '.join(updates)} WHERE id = ?", params)
self.conn.commit()
return self.get_task(task_id)
# --- Task Dependencies ---
def add_task_dependency(self, task_id: str, depends_on_id: str) -> bool:
"""Add a dependency between tasks."""
try:
self.conn.execute(
"INSERT INTO task_dependencies (task_id, depends_on_id) VALUES (?, ?)",
(task_id, depends_on_id),
)
self.conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_task_dependencies(self, task_id: str) -> list[str]:
"""Get IDs of tasks that this task depends on."""
rows = self.conn.execute(
"SELECT depends_on_id FROM task_dependencies WHERE task_id = ?", (task_id,)
).fetchall()
return [r["depends_on_id"] for r in rows]
# --- Notes ---
def add_note(self, data: NoteCreate) -> Note:
id = self._gen_id()
now = self._now()
self.conn.execute(
"INSERT INTO notes (id, entity_type, entity_id, content, created_at) VALUES (?, ?, ?, ?, ?)",
(id, data.entity_type, data.entity_id, data.content, now),
)
self.conn.commit()
return Note(
id=id,
entity_type=data.entity_type,
entity_id=data.entity_id,
content=data.content,
created_at=datetime.fromisoformat(now),
)
def get_notes(self, entity_type: str, entity_id: str) -> list[Note]:
rows = self.conn.execute(
"SELECT * FROM notes WHERE entity_type = ? AND entity_id = ? ORDER BY created_at",
(entity_type, entity_id),
).fetchall()
return [
Note(
id=r["id"],
entity_type=r["entity_type"],
entity_id=r["entity_id"],
content=r["content"],
created_at=datetime.fromisoformat(r["created_at"]),
)
for r in rows
]
# --- Roadmap View ---
def get_roadmap(self, org_id: str | None = None) -> RoadmapView:
"""Get full roadmap view with stats."""
orgs = self.list_orgs()
if org_id:
orgs = [o for o in orgs if o.id == org_id]
org_views = []
total_tickets = 0
tickets_done = 0
total_tasks = 0
tasks_done = 0
for org in orgs:
projects = self.list_projects(org.id)
project_views = []
for proj in projects:
tickets = self.list_tickets(proj.id)
ticket_views = []
proj_tickets_done = 0
for ticket in tickets:
tasks = self.list_tasks(ticket.id)
task_views = [
TaskView(
id=t.id,
title=t.title,
status=t.status,
priority=t.priority,
complexity=t.complexity,
)
for t in tasks
]
ticket_tasks_done = sum(
1 for t in tasks if t.status in (TaskStatus.DONE, TaskStatus.COMPLETED)
)
ticket_views.append(
TicketView(
id=ticket.id,
title=ticket.title,
status=ticket.status,
priority=ticket.priority,
tags=ticket.tags,
task_count=len(tasks),
tasks_done=ticket_tasks_done,
tasks=task_views,
)
)
total_tasks += len(tasks)
tasks_done += ticket_tasks_done
if ticket.status in (TicketStatus.DONE, TicketStatus.COMPLETED):
proj_tickets_done += 1
project_views.append(
ProjectView(
id=proj.id,
name=proj.name,
description=proj.description,
ticket_count=len(tickets),
tickets_done=proj_tickets_done,
tickets=ticket_views,
)
)
total_tickets += len(tickets)
tickets_done += proj_tickets_done
org_views.append(OrgView(id=org.id, name=org.name, projects=project_views))
return RoadmapView(
orgs=org_views,
stats={
"total_tickets": total_tickets,
"tickets_done": tickets_done,
"total_tasks": total_tasks,
"tasks_done": tasks_done,
"completion_pct": round(tasks_done / total_tasks * 100, 1)
if total_tasks > 0
else 0,
},
)