# Aidderall MCP Server - Hierarchical task management for AI assistants
# Copyright (C) 2024 Briam R. <briamr@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import json
import logging
import os
import shutil
import sqlite3
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from .models import MainTask, SubTask, Task, TaskStatus
logger = logging.getLogger("aidderall-mcp")
DEFAULT_DB_DIR = Path.home() / ".aidderall" / "sessions"
SCHEMA_VERSION = 1
_SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS global_tasks (
position INTEGER NOT NULL,
data TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS completed_tasks (
position INTEGER NOT NULL,
task_id TEXT NOT NULL,
task_type TEXT NOT NULL,
data TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS state (
key TEXT PRIMARY KEY,
value TEXT
);
"""
class StateStore:
"""SQLite-backed persistence for TaskManager state.
Each server instance gets a unique session-scoped DB to prevent
concurrent sessions from clobbering each other's state.
"""
def __init__(self, db_path: Optional[Path] = None) -> None:
if db_path is not None:
self.db_path = db_path
else:
session_id = uuid.uuid4().hex[:12]
self.db_path = DEFAULT_DB_DIR / session_id / "state.db"
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn: Optional[sqlite3.Connection] = None
self._init_db()
def _get_conn(self) -> sqlite3.Connection:
if self._conn is None:
self._conn = sqlite3.connect(str(self.db_path))
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA foreign_keys=ON")
return self._conn
def _init_db(self) -> None:
conn = self._get_conn()
conn.executescript(_SCHEMA_SQL)
row = conn.execute("SELECT version FROM schema_version").fetchone()
if row is None:
conn.execute(
"INSERT INTO schema_version (version) VALUES (?)",
(SCHEMA_VERSION,),
)
conn.commit()
def save(
self,
global_tasks: List[MainTask],
completed_tasks: List[Task],
manual_current_task_id: Optional[str],
) -> None:
"""Persist the full TaskManager state to SQLite."""
conn = self._get_conn()
try:
conn.execute("DELETE FROM global_tasks")
conn.execute("DELETE FROM completed_tasks")
for i, main_task in enumerate(global_tasks):
conn.execute(
"INSERT INTO global_tasks (position, data) VALUES (?, ?)",
(i, json.dumps(main_task.to_dict())),
)
# Build id->object map from the active hierarchy for reference tracking
active_ids = set()
for mt in global_tasks:
active_ids.add(mt.id)
for sub in mt.sub_tasks:
active_ids.add(sub.id)
for i, task in enumerate(completed_tasks):
task_type = "main" if isinstance(task, MainTask) else "sub"
conn.execute(
"INSERT INTO completed_tasks (position, task_id, task_type, data) "
"VALUES (?, ?, ?, ?)",
(i, task.id, task_type, json.dumps(task.to_dict())),
)
conn.execute(
"INSERT OR REPLACE INTO state (key, value) VALUES (?, ?)",
("manual_current_task_id", manual_current_task_id),
)
conn.commit()
except Exception:
conn.rollback()
raise
def load(
self,
) -> Tuple[List[MainTask], List[Task], Optional[str]]:
"""Load TaskManager state from SQLite.
Returns:
Tuple of (global_tasks, completed_tasks, manual_current_task_id)
"""
conn = self._get_conn()
# Load global tasks
rows = conn.execute(
"SELECT data FROM global_tasks ORDER BY position"
).fetchall()
global_tasks = [MainTask.from_dict(json.loads(row[0])) for row in rows]
# Build id->object map for reference sharing with completed_tasks
obj_map: Dict[str, Task] = {}
for mt in global_tasks:
obj_map[mt.id] = mt
for sub in mt.sub_tasks:
obj_map[sub.id] = sub
# Load completed tasks
rows = conn.execute(
"SELECT task_id, task_type, data FROM completed_tasks ORDER BY position"
).fetchall()
completed_tasks: List[Task] = []
for task_id, task_type, data_str in rows:
if task_id in obj_map:
# Same object still exists in the hierarchy — reuse the reference
completed_tasks.append(obj_map[task_id])
else:
# Task was removed from hierarchy — reconstruct standalone
data = json.loads(data_str)
if task_type == "main":
completed_tasks.append(MainTask.from_dict(data))
else:
completed_tasks.append(SubTask.from_dict(data))
# Load manual focus
row = conn.execute(
"SELECT value FROM state WHERE key = 'manual_current_task_id'"
).fetchone()
manual_current_task_id = row[0] if row and row[0] else None
return global_tasks, completed_tasks, manual_current_task_id
def has_state(self) -> bool:
"""Check if any persisted state exists."""
conn = self._get_conn()
row = conn.execute("SELECT COUNT(*) FROM global_tasks").fetchone()
if row and row[0] > 0:
return True
row = conn.execute("SELECT COUNT(*) FROM completed_tasks").fetchone()
if row and row[0] > 0:
return True
row = conn.execute(
"SELECT value FROM state WHERE key = 'manual_current_task_id'"
).fetchone()
return bool(row and row[0])
@property
def session_id(self) -> Optional[str]:
"""Return the session ID derived from the DB path, or None if custom path."""
parent = self.db_path.parent
if parent.parent == DEFAULT_DB_DIR:
return parent.name
return None
def resume_session(self, session_id: str) -> None:
"""Switch this store to an existing session's DB."""
target_db = DEFAULT_DB_DIR / session_id / "state.db"
if not target_db.exists():
raise ValueError(f"Session '{session_id}' not found")
self.close()
self.db_path = target_db
self._init_db()
@staticmethod
def list_sessions() -> List[Dict[str, Any]]:
"""List all session directories with metadata."""
sessions: List[Dict[str, Any]] = []
if not DEFAULT_DB_DIR.exists():
return sessions
for session_dir in sorted(DEFAULT_DB_DIR.iterdir()):
db_file = session_dir / "state.db"
if not session_dir.is_dir() or not db_file.exists():
continue
session_id = session_dir.name
modified_ts = db_file.stat().st_mtime
modified_at = datetime.fromtimestamp(modified_ts, tz=timezone.utc)
# Peek into the DB for task summary
task_count = 0
current_task_title = None
try:
conn = sqlite3.connect(str(db_file))
row = conn.execute("SELECT COUNT(*) FROM global_tasks").fetchone()
task_count = row[0] if row else 0
# Find current task title from the data
rows = conn.execute(
"SELECT data FROM global_tasks ORDER BY position"
).fetchall()
for (data_str,) in rows:
data = json.loads(data_str)
if data.get("status") == "current":
current_task_title = data["title"]
break
for sub in data.get("sub_tasks", []):
if sub.get("status") == "current":
current_task_title = sub["title"]
break
if current_task_title:
break
conn.close()
except Exception:
pass
sessions.append({
"session_id": session_id,
"modified_at": modified_at.isoformat(),
"task_count": task_count,
"current_task": current_task_title,
})
return sessions
@staticmethod
def delete_session(session_id: str) -> bool:
"""Delete a session directory and its DB."""
session_dir = DEFAULT_DB_DIR / session_id
if not session_dir.exists() or not session_dir.is_dir():
raise ValueError(f"Session '{session_id}' not found")
shutil.rmtree(session_dir)
return True
def close(self) -> None:
if self._conn:
self._conn.close()
self._conn = None