Skip to main content
Glama
database.py67 kB
"""Database interaction logic using sqlite3.""" import sqlite3 import json import os from pathlib import Path from typing import List, Optional, Dict, Any, Tuple from datetime import datetime, timedelta, timezone from alembic.config import Config from alembic import command import logging from ..core.config import get_database_path from ..core.exceptions import DatabaseError, ConfigurationError from . import models # Import models from the same directory import shutil # For copying directories import inspect log = logging.getLogger(__name__) # Get a logger for this module # --- SQLite datetime adapters/converters (UTC) --- def _adapt_datetime(dt: datetime) -> str: if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) else: dt = dt.astimezone(timezone.utc) return dt.isoformat(timespec="seconds") def _convert_datetime(b: bytes) -> datetime: s = b.decode() try: dt = datetime.fromisoformat(s) except ValueError: # Fallback for SQLite CURRENT_TIMESTAMP format 'YYYY-MM-DD HH:MM:SS' dt = datetime.fromisoformat(s.replace(" ", "T")) if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) sqlite3.register_adapter(datetime, _adapt_datetime) sqlite3.register_converter("DATETIME", _convert_datetime) sqlite3.register_converter("TIMESTAMP", _convert_datetime) # --- Alembic File Content Constants --- ALEMBIC_INI_CONTENT = """ # A generic Alembic configuration file. [alembic] # path to migration scripts script_location = alembic # The database URL is now set dynamically by ConPort's run_migrations function. # sqlalchemy.url = sqlite:///your_database.db # ... other Alembic settings ... [loggers] keys = root,sqlalchemy,alembic [handlers] keys = console [formatters] keys = generic [logger_root] level = WARN handlers = console qualname = [logger_sqlalchemy] level = WARN handlers = qualname = sqlalchemy.engine [logger_alembic] level = INFO handlers = qualname = alembic [handler_console] class = StreamHandler args = (sys.stderr,) level = NOTSET formatter = generic [formatter_generic] format = %(levelname)-5.5s [%(name)s] %(message)s datefmt = %H:%M:%S """ ENV_PY_CONTENT = """ from logging.config import fileConfig from sqlalchemy import engine_from_config from sqlalchemy import pool from alembic import context # this is the Alembic Config object, which provides # access to the values within the .ini file in use. config = context.config # Interpret the config file for Python logging. # This line prevents the need to have a separate logging config file. if config.config_file_name is not None: fileConfig(config.config_file_name) # add your model's MetaData object here # for 'autogenerate' support # from myapp import mymodel # target_metadata = mymodel.Base.metadata target_metadata = None # other values from the config, defined by the needs of env.py, # can be acquired: # my_important_option = config.get_main_option("my_important_option") # ... etc. def run_migrations_offline() -> None: \"\"\"Run migrations in 'offline' mode. This configures the context with just a URL and not an Engine, though an Engine is acceptable here as well. By skipping the Engine creation we don't even need a DBAPI to be available. Calls to context.execute() here emit the given string to the script output. \"\"\" url = config.get_main_option("sqlalchemy.url") context.configure( url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, ) with context.begin_transaction(): context.run_migrations() def run_migrations_online() -> None: \"\"\"Run migrations in 'online' mode. In this scenario we need to create an Engine and associate a connection with the context. \"\"\" connectable = engine_from_config( config.get_section(config.config_ini_section, {}), prefix="sqlalchemy.", poolclass=pool.NullPool, ) with connectable.connect() as connection: context.configure( connection=connection, target_metadata=target_metadata ) with context.begin_transaction(): context.run_migrations() if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online() """ INITIAL_SCHEMA_CONTENT = """ \"\"\"Initial schema Revision ID: 20250617 Revises: Create Date: 2025-06-17 15:00:00.000000 \"\"\" from alembic import op import sqlalchemy as sa import json # revision identifiers, used by Alembic. revision = '20250617' down_revision = None branch_labels = None depends_on = None def upgrade() -> None: # ### commands auto-generated by Alembic - please adjust! ### op.create_table('active_context', sa.Column('id', sa.Integer(), nullable=False), sa.Column('content', sa.Text(), nullable=False), sa.PrimaryKeyConstraint('id') ) op.create_table('active_context_history', sa.Column('id', sa.Integer(), nullable=False), sa.Column('timestamp', sa.DateTime(), nullable=False), sa.Column('version', sa.Integer(), nullable=False), sa.Column('content', sa.Text(), nullable=False), sa.Column('change_source', sa.String(length=255), nullable=True), sa.PrimaryKeyConstraint('id') ) op.create_table('context_links', sa.Column('id', sa.Integer(), nullable=False), sa.Column('workspace_id', sa.String(length=1024), nullable=False), sa.Column('source_item_type', sa.String(length=255), nullable=False), sa.Column('source_item_id', sa.String(length=255), nullable=False), sa.Column('target_item_type', sa.String(length=255), nullable=False), sa.Column('target_item_id', sa.String(length=255), nullable=False), sa.Column('relationship_type', sa.String(length=255), nullable=False), sa.Column('description', sa.Text(), nullable=True), sa.Column('timestamp', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=False), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('ix_context_links_source_item_id'), 'context_links', ['source_item_id'], unique=False) op.create_index(op.f('ix_context_links_source_item_type'), 'context_links', ['source_item_type'], unique=False) op.create_index(op.f('ix_context_links_target_item_id'), 'context_links', ['target_item_id'], unique=False) op.create_index(op.f('ix_context_links_target_item_type'), 'context_links', ['target_item_type'], unique=False) op.create_table('custom_data', sa.Column('id', sa.Integer(), nullable=False), sa.Column('timestamp', sa.DateTime(), nullable=False), sa.Column('category', sa.String(length=255), nullable=False), sa.Column('key', sa.String(length=255), nullable=False), sa.Column('value', sa.Text(), nullable=False), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('category', 'key') ) op.create_table('decisions', sa.Column('id', sa.Integer(), nullable=False), sa.Column('timestamp', sa.DateTime(), nullable=False), sa.Column('summary', sa.Text(), nullable=False), sa.Column('rationale', sa.Text(), nullable=True), sa.Column('implementation_details', sa.Text(), nullable=True), sa.Column('tags', sa.Text(), nullable=True), sa.PrimaryKeyConstraint('id') ) op.create_table('product_context', sa.Column('id', sa.Integer(), nullable=False), sa.Column('content', sa.Text(), nullable=False), sa.PrimaryKeyConstraint('id') ) op.create_table('product_context_history', sa.Column('id', sa.Integer(), nullable=False), sa.Column('timestamp', sa.DateTime(), nullable=False), sa.Column('version', sa.Integer(), nullable=False), sa.Column('content', sa.Text(), nullable=False), sa.Column('change_source', sa.String(length=255), nullable=True), sa.PrimaryKeyConstraint('id') ) op.create_table('progress_entries', sa.Column('id', sa.Integer(), nullable=False), sa.Column('timestamp', sa.DateTime(), nullable=False), sa.Column('status', sa.String(length=50), nullable=False), sa.Column('description', sa.Text(), nullable=False), sa.Column('parent_id', sa.Integer(), nullable=True), sa.ForeignKeyConstraint(['parent_id'], ['progress_entries.id'], ondelete='SET NULL'), sa.PrimaryKeyConstraint('id') ) op.create_table('system_patterns', sa.Column('id', sa.Integer(), nullable=False), sa.Column('timestamp', sa.DateTime(), nullable=False), sa.Column('name', sa.String(length=255), nullable=False), sa.Column('description', sa.Text(), nullable=True), sa.Column('tags', sa.Text(), nullable=True), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('name') ) # Seed initial data op.execute("INSERT INTO product_context (id, content) VALUES (1, '{}')") op.execute("INSERT INTO active_context (id, content) VALUES (1, '{}')") # Create FTS5 virtual table for decisions (guarded) try: op.execute(''' CREATE VIRTUAL TABLE decisions_fts USING fts5( summary, rationale, implementation_details, tags, content="decisions", content_rowid="id" ); ''') # Create triggers to keep the FTS table in sync with the decisions table op.execute(''' CREATE TRIGGER decisions_after_insert AFTER INSERT ON decisions BEGIN INSERT INTO decisions_fts (rowid, summary, rationale, implementation_details, tags) VALUES (new.id, new.summary, new.rationale, new.implementation_details, new.tags); END; ''') op.execute(''' CREATE TRIGGER decisions_after_delete AFTER DELETE ON decisions BEGIN INSERT INTO decisions_fts (decisions_fts, rowid, summary, rationale, implementation_details, tags) VALUES ('delete', old.id, old.summary, old.rationale, old.implementation_details, old.tags); END; ''') op.execute(''' CREATE TRIGGER decisions_after_update AFTER UPDATE ON decisions BEGIN INSERT INTO decisions_fts (decisions_fts, rowid, summary, rationale, implementation_details, tags) VALUES ('delete', old.id, old.summary, old.rationale, old.implementation_details, old.tags); INSERT INTO decisions_fts (rowid, summary, rationale, implementation_details, tags) VALUES (new.id, new.summary, new.rationale, new.implementation_details, new.tags); END; ''') except Exception as e: print(f"Warning: decisions FTS5 not created: {e}") # Create FTS5 virtual table for custom_data (guarded) try: op.execute(''' CREATE VIRTUAL TABLE custom_data_fts USING fts5( category, key, value_text, content="custom_data", content_rowid="id" ); ''') # Create triggers for custom_data_fts op.execute(''' CREATE TRIGGER custom_data_after_insert AFTER INSERT ON custom_data BEGIN INSERT INTO custom_data_fts (rowid, category, key, value_text) VALUES (new.id, new.category, new.key, new.value); END; ''') op.execute(''' CREATE TRIGGER custom_data_after_delete AFTER DELETE ON custom_data BEGIN INSERT INTO custom_data_fts (custom_data_fts, rowid, category, key, value_text) VALUES ('delete', old.id, old.category, old.key, old.value); END; ''') op.execute(''' CREATE TRIGGER custom_data_after_update AFTER UPDATE ON custom_data BEGIN INSERT INTO custom_data_fts (custom_data_fts, rowid, category, key, value_text) VALUES ('delete', old.id, old.category, old.key, old.value); INSERT INTO custom_data_fts (rowid, category, key, value_text) VALUES (new.id, new.category, new.key, new.value); END; ''') except Exception as e: print(f"Warning: custom_data FTS5 not created: {e}") # ### end Alembic commands ### def downgrade() -> None: # ### commands auto-generated by Alembic - please adjust! ### op.drop_table('system_patterns') op.drop_table('progress_entries') op.drop_table('product_context_history') op.drop_table('product_context') op.drop_table('decisions') op.drop_table('custom_data') op.drop_index(op.f('ix_context_links_target_item_type'), table_name='context_links') op.drop_index(op.f('ix_context_links_target_item_id'), table_name='context_links') op.drop_index(op.f('ix_context_links_source_item_type'), table_name='context_links') op.drop_index(op.f('ix_context_links_source_item_id'), table_name='context_links') op.drop_table('context_links') op.drop_table('active_context_history') op.drop_table('active_context') # ### end Alembic commands ### """ # --- Connection Handling --- _connections: Dict[str, sqlite3.Connection] = {} def get_db_connection(workspace_id: str) -> sqlite3.Connection: """ Gets or creates a database connection for the given workspace. This function now orchestrates the entire workspace initialization on first call. """ if workspace_id in _connections: return _connections[workspace_id] # 1. Ensure all necessary directories and Alembic files are present. # This is the core of the deferred initialization. ensure_alembic_files_exist(workspace_id) # 2. Get the database path (which should now exist within the created directories). db_path = get_database_path(workspace_id) # 3. Run migrations to create/update the database schema. run_migrations(db_path) # 4. Establish and cache the database connection. try: conn = sqlite3.connect(db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) conn.row_factory = sqlite3.Row # Access columns by name _connections[workspace_id] = conn log.info(f"Successfully initialized and connected to database for workspace: {workspace_id}") return conn except ConfigurationError as e: log.error(f"Configuration error during DB connection for {workspace_id}: {e}") raise DatabaseError(f"Configuration error getting DB path for {workspace_id}: {e}") except sqlite3.Error as e: log.error(f"SQLite error during DB connection for {workspace_id} at {db_path}: {e}") raise DatabaseError(f"Failed to connect to database for {workspace_id} at {db_path}: {e}") def close_db_connection(workspace_id: str): """Closes the database connection for the given workspace, if open.""" if workspace_id in _connections: _connections[workspace_id].close() del _connections[workspace_id] def close_all_connections(): """Closes all active database connections.""" for workspace_id in list(_connections.keys()): close_db_connection(workspace_id) # --- Alembic Migration Integration --- def ensure_alembic_files_exist(workspace_id: str): """ Ensures that alembic.ini and the alembic/ directory exist within the database directory. If not, copies them from the server's internal templates. """ # The actual directory where context.db resides and where Alembic files should be # Get the database path to determine where Alembic files should be located from ..core.config import get_database_path db_path = get_database_path(workspace_id) conport_db_dir = db_path.parent conport_db_dir.mkdir(exist_ok=True, parents=True) # Ensure this directory exists alembic_ini_path = conport_db_dir / "alembic.ini" alembic_dir_path = conport_db_dir / "alembic" # Check for alembic.ini if not alembic_ini_path.exists(): log.info(f"Alembic.ini not found. Creating at {alembic_ini_path}") try: with open(alembic_ini_path, 'w') as f: f.write(ALEMBIC_INI_CONTENT) except IOError as e: log.error(f"Failed to write alembic.ini at {alembic_ini_path}: {e}") raise DatabaseError(f"Could not create alembic.ini: {e}") # Check for alembic/env.py alembic_env_py_path = alembic_dir_path / "env.py" if not alembic_env_py_path.exists(): log.info(f"Alembic env.py not found. Creating at {alembic_env_py_path}") try: # ensure parent directory exists os.makedirs(alembic_dir_path, exist_ok=True) with open(alembic_env_py_path, 'w') as f: f.write(ENV_PY_CONTENT) except IOError as e: log.error(f"Failed to write env.py at {alembic_env_py_path}: {e}") raise DatabaseError(f"Could not create env.py: {e}") # Check for alembic/versions directory and initial schema alembic_versions_path = alembic_dir_path / "versions" initial_schema_path = alembic_versions_path / "2025_06_17_initial_schema.py" if not initial_schema_path.exists(): log.info(f"Initial schema not found. Creating at {initial_schema_path}") try: os.makedirs(alembic_versions_path, exist_ok=True) with open(initial_schema_path, 'w') as f: f.write(INITIAL_SCHEMA_CONTENT) except OSError as e: log.error(f"Failed to create initial schema at {initial_schema_path}: {e}") raise DatabaseError(f"Could not create initial schema: {e}") def run_migrations(db_path: Path): """ Runs Alembic migrations to upgrade the database to the latest version. This function is called on database connection to ensure schema is up-to-date. Alembic files are expected to be in the same directory as the database file. """ # The directory where alembic.ini and alembic/ scripts are expected to be, # which is now the same directory as the database file. alembic_config_and_scripts_dir = db_path.parent alembic_ini_path = alembic_config_and_scripts_dir / "alembic.ini" alembic_scripts_path = alembic_config_and_scripts_dir / "alembic" # Initialize Alembic Config with the path to alembic.ini log.debug(f"Alembic: Current working directory (os.getcwd()): {os.getcwd()}") log.debug(f"Alembic: Initializing Config with alembic_ini_path = {alembic_ini_path.resolve()}") log.debug(f"Alembic: Setting script_location to alembic_scripts_path = {alembic_scripts_path.resolve()}") alembic_cfg = Config(str(alembic_ini_path)) # Explicitly set the script location as a main option. # This is often more robust than relying on the .ini file or cmd_opts for this specific setting. alembic_cfg.set_main_option("script_location", alembic_scripts_path.as_posix()) # Override sqlalchemy.url in alembic.ini to point to the specific workspace's DB # This is crucial for multi-workspace support. alembic_cfg.set_main_option("sqlalchemy.url", f"sqlite:///{db_path.as_posix()}") # Configure logging for Alembic (optional, can be done via Python's root logger) # The fileConfig call was causing issues and is not strictly necessary if alembic.ini # is only used for script_location and sqlalchemy.url. # Alembic's command.upgrade will handle its own logging if not explicitly configured. log.debug(f"Alembic Config: script_location = {alembic_cfg.get_main_option('script_location')}") log.debug(f"Alembic Config: sqlalchemy.url = {alembic_cfg.get_main_option('sqlalchemy.url')}") # Add explicit path existence check resolved_script_path = Path(alembic_cfg.get_main_option('script_location')) log.debug(f"Alembic: Resolved script path for existence check: {resolved_script_path}") if not resolved_script_path.exists(): log.error(f"Alembic: CRITICAL - Script directory {resolved_script_path} does NOT exist according to Python!") raise DatabaseError(f"Alembic scripts directory not found: {resolved_script_path}") else: log.info(f"Alembic: Script directory {resolved_script_path} confirmed to exist by Python.") log.info(f"Running Alembic migrations for database: {db_path}") try: command.upgrade(alembic_cfg, "head") log.info(f"Alembic migrations completed successfully for {db_path}.") except Exception as e: log.error(f"Alembic migration failed for {db_path}: {e}", exc_info=True) raise DatabaseError(f"Database migration failed: {e}") # --- Helper functions for history --- def _get_latest_context_version(cursor: sqlite3.Cursor, table_name: str) -> int: """Retrieves the latest version number from a history table.""" try: cursor.execute(f"SELECT MAX(version) FROM {table_name}") row = cursor.fetchone() return row[0] if row and row[0] is not None else 0 except sqlite3.Error as e: # Log this error appropriately in a real application print(f"Error getting latest version from {table_name}: {e}") return 0 # Default to 0 if error or no versions found def _add_context_history_entry( cursor: sqlite3.Cursor, history_table_name: str, version: int, content_dict: Dict[str, Any], change_source: Optional[str] ) -> None: """Adds an entry to the specified context history table.""" content_json = json.dumps(content_dict) timestamp = datetime.now(timezone.utc) try: cursor.execute( f""" INSERT INTO {history_table_name} (timestamp, version, content, change_source) VALUES (?, ?, ?, ?) """, (timestamp, version, content_json, change_source) ) except sqlite3.Error as e: # This error should be handled by the calling function's rollback raise DatabaseError(f"Failed to add entry to {history_table_name}: {e}") # --- CRUD Operations --- def get_product_context(workspace_id: str) -> models.ProductContext: """Retrieves the product context.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block try: cursor = conn.cursor() cursor.execute("SELECT id, content FROM product_context WHERE id = 1") row = cursor.fetchone() if row: content_dict = json.loads(row['content']) return models.ProductContext(id=row['id'], content=content_dict) else: # Should not happen if initialized correctly, but handle defensively raise DatabaseError("Product context row not found.") except (sqlite3.Error, json.JSONDecodeError) as e: raise DatabaseError(f"Failed to retrieve product context: {e}") finally: if cursor: cursor.close() def update_product_context(workspace_id: str, update_args: models.UpdateContextArgs) -> None: """Updates the product context using either full content or a patch.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block try: cursor = conn.cursor() # Fetch current content to log to history cursor.execute("SELECT content FROM product_context WHERE id = 1") current_row = cursor.fetchone() if not current_row: raise DatabaseError("Product context row not found for updating (cannot log history).") current_content_dict = json.loads(current_row['content']) # Determine new content new_final_content = {} if update_args.content is not None: new_final_content = update_args.content elif update_args.patch_content is not None: # Apply patch to a copy of current_content_dict for the new state new_final_content = current_content_dict.copy() # Iterate over patch_content to handle __DELETE__ sentinel for key, value in update_args.patch_content.items(): if value == "__DELETE__": new_final_content.pop(key, None) # Remove key, do nothing if key not found else: new_final_content[key] = value else: # This case should be prevented by Pydantic model validation, but handle defensively raise ValueError("No content or patch_content provided for update.") # Log previous version to history latest_version = _get_latest_context_version(cursor, "product_context_history") new_version = latest_version + 1 _add_context_history_entry( cursor, "product_context_history", new_version, current_content_dict, # Log the content *before* the update "update_product_context" # Basic change source ) # Update the main product_context table new_content_json = json.dumps(new_final_content) cursor.execute("UPDATE product_context SET content = ? WHERE id = 1", (new_content_json,)) conn.commit() # No need to check rowcount here as history is logged regardless of content identity except (sqlite3.Error, TypeError, json.JSONDecodeError, DatabaseError) as e: # Added DatabaseError conn.rollback() raise DatabaseError(f"Failed to update product_context: {e}") finally: if cursor: cursor.close() def get_active_context(workspace_id: str) -> models.ActiveContext: """Retrieves the active context.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block try: cursor = conn.cursor() cursor.execute("SELECT id, content FROM active_context WHERE id = 1") row = cursor.fetchone() if row: content_dict = json.loads(row['content']) return models.ActiveContext(id=row['id'], content=content_dict) else: raise DatabaseError("Active context row not found.") except (sqlite3.Error, json.JSONDecodeError) as e: raise DatabaseError(f"Failed to retrieve active context: {e}") finally: if cursor: cursor.close() def update_active_context(workspace_id: str, update_args: models.UpdateContextArgs) -> None: """Updates the active context using either full content or a patch.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block try: cursor = conn.cursor() # Fetch current content to log to history cursor.execute("SELECT content FROM active_context WHERE id = 1") current_row = cursor.fetchone() if not current_row: raise DatabaseError("Active context row not found for updating (cannot log history).") current_content_dict = json.loads(current_row['content']) # Determine new content new_final_content = {} if update_args.content is not None: new_final_content = update_args.content elif update_args.patch_content is not None: new_final_content = current_content_dict.copy() # Iterate over patch_content to handle __DELETE__ sentinel for key, value in update_args.patch_content.items(): if value == "__DELETE__": new_final_content.pop(key, None) # Remove key, do nothing if key not found else: new_final_content[key] = value else: # This case should be prevented by Pydantic model validation, but handle defensively raise ValueError("No content or patch_content provided for update.") # Log previous version to history latest_version = _get_latest_context_version(cursor, "active_context_history") new_version = latest_version + 1 _add_context_history_entry( cursor, "active_context_history", new_version, current_content_dict, # Log the content *before* the update "update_active_context" # Basic change source ) # Update the main active_context table new_content_json = json.dumps(new_final_content) cursor.execute("UPDATE active_context SET content = ? WHERE id = 1", (new_content_json,)) conn.commit() except (sqlite3.Error, TypeError, json.JSONDecodeError, DatabaseError) as e: # Added DatabaseError conn.rollback() raise DatabaseError(f"Failed to update active context: {e}") finally: if cursor: cursor.close() # --- Add more CRUD functions for other models (ActiveContext, Decision, etc.) --- # Example: log_decision def log_decision(workspace_id: str, decision_data: models.Decision) -> models.Decision: """Logs a new decision.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block sql = """ INSERT INTO decisions (timestamp, summary, rationale, implementation_details, tags) VALUES (?, ?, ?, ?, ?) """ tags_json = json.dumps(decision_data.tags) if decision_data.tags is not None else None params = ( decision_data.timestamp, decision_data.summary, decision_data.rationale, decision_data.implementation_details, tags_json ) try: cursor = conn.cursor() cursor.execute(sql, params) decision_id = cursor.lastrowid conn.commit() # Return the full decision object including the new ID decision_data.id = decision_id return decision_data except sqlite3.Error as e: conn.rollback() raise DatabaseError(f"Failed to log decision: {e}") finally: if cursor: cursor.close() def get_decisions( workspace_id: str, limit: Optional[int] = None, tags_filter_include_all: Optional[List[str]] = None, tags_filter_include_any: Optional[List[str]] = None ) -> List[models.Decision]: """Retrieves decisions, optionally limited, and filtered by tags.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block base_sql = "SELECT id, timestamp, summary, rationale, implementation_details, tags FROM decisions" conditions = [] params_list: List[Any] = [] if tags_filter_include_all: # For each tag in the list, we need to ensure it exists in the 'tags' JSON array. # This is tricky with pure SQL LIKE on a JSON array string. # A more robust way is to fetch and filter in Python, or use json_each if available and suitable. # For simplicity here, we'll filter in Python after fetching. # This means 'limit' will apply before this specific tag filter. # A true SQL solution would be more complex, e.g., using json_tree or json_each and subqueries. pass # Will be handled post-query if tags_filter_include_any: # Similar to above, this is easier to handle post-query for now. pass # Will be handled post-query # ORDER BY must come before LIMIT order_by_clause = " ORDER BY timestamp DESC" limit_clause = "" if limit is not None and limit > 0: limit_clause = " LIMIT ?" params_list.append(limit) # Construct the SQL query # Since tag filtering will be done in Python for now, conditions list remains empty for SQL sql = base_sql if conditions: # This block will not be hit with current Python-based tag filtering sql += " WHERE " + " AND ".join(conditions) sql += order_by_clause + limit_clause params_tuple = tuple(params_list) try: cursor = conn.cursor() cursor.execute(sql, params_tuple) rows = cursor.fetchall() decisions = [ models.Decision( id=row['id'], timestamp=row['timestamp'], summary=row['summary'], rationale=row['rationale'], implementation_details=row['implementation_details'], tags=json.loads(row['tags']) if row['tags'] else None ) for row in rows ] # Python-based filtering for tags if tags_filter_include_all: decisions = [ d for d in decisions if d.tags and all(tag in d.tags for tag in tags_filter_include_all) ] if tags_filter_include_any: decisions = [ d for d in decisions if d.tags and any(tag in d.tags for tag in tags_filter_include_any) ] return decisions except (sqlite3.Error, json.JSONDecodeError) as e: # Added JSONDecodeError raise DatabaseError(f"Failed to retrieve decisions: {e}") finally: if cursor: cursor.close() def search_decisions_fts(workspace_id: str, query_term: str, limit: Optional[int] = 10) -> List[models.Decision]: """Searches decisions using FTS5 for the given query term.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block # The MATCH operator is used for FTS queries. # We join back to the original 'decisions' table to get all columns. # 'rank' is an FTS5 auxiliary function that indicates relevance. sql = """ SELECT d.id, d.timestamp, d.summary, d.rationale, d.implementation_details, d.tags FROM decisions_fts f JOIN decisions d ON f.rowid = d.id WHERE f.decisions_fts MATCH ? ORDER BY rank """ params_list = [query_term] if limit is not None and limit > 0: sql += " LIMIT ?" params_list.append(limit) try: cursor = conn.cursor() cursor.execute(sql, tuple(params_list)) rows = cursor.fetchall() decisions_found = [ models.Decision( id=row['id'], timestamp=row['timestamp'], summary=row['summary'], rationale=row['rationale'], implementation_details=row['implementation_details'], tags=json.loads(row['tags']) if row['tags'] else None ) for row in rows ] return decisions_found except sqlite3.Error as e: raise DatabaseError(f"Failed FTS search on decisions for term '{query_term}': {e}") finally: if cursor: cursor.close() def delete_decision_by_id(workspace_id: str, decision_id: int) -> bool: """Deletes a decision by its ID. Returns True if deleted, False otherwise.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block sql = "DELETE FROM decisions WHERE id = ?" try: cursor = conn.cursor() cursor.execute(sql, (decision_id,)) # The FTS table 'decisions_fts' should be updated automatically by its AFTER DELETE trigger. conn.commit() return cursor.rowcount > 0 except sqlite3.Error as e: conn.rollback() raise DatabaseError(f"Failed to delete decision with ID {decision_id}: {e}") finally: if cursor: cursor.close() def log_progress(workspace_id: str, progress_data: models.ProgressEntry) -> models.ProgressEntry: """Logs a new progress entry.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block sql = """ INSERT INTO progress_entries (timestamp, status, description, parent_id) VALUES (?, ?, ?, ?) """ params = ( progress_data.timestamp, progress_data.status, progress_data.description, progress_data.parent_id ) try: cursor = conn.cursor() cursor.execute(sql, params) progress_id = cursor.lastrowid conn.commit() progress_data.id = progress_id return progress_data except sqlite3.Error as e: conn.rollback() # Consider checking for foreign key constraint errors if parent_id is invalid raise DatabaseError(f"Failed to log progress entry: {e}") finally: if cursor: cursor.close() def get_progress( workspace_id: str, status_filter: Optional[str] = None, parent_id_filter: Optional[int] = None, limit: Optional[int] = None ) -> List[models.ProgressEntry]: """Retrieves progress entries, optionally filtered and limited.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block sql = "SELECT id, timestamp, status, description, parent_id FROM progress_entries" conditions = [] params_list = [] if status_filter: conditions.append("status = ?") params_list.append(status_filter) if parent_id_filter is not None: # Check for None explicitly as 0 could be a valid parent_id conditions.append("parent_id = ?") params_list.append(parent_id_filter) # Add more filters if needed (e.g., date range) if conditions: sql += " WHERE " + " AND ".join(conditions) sql += " ORDER BY timestamp DESC" # Default order: newest first if limit is not None and limit > 0: sql += " LIMIT ?" params_list.append(limit) params = tuple(params_list) try: cursor = conn.cursor() cursor.execute(sql, params) rows = cursor.fetchall() progress_entries = [ models.ProgressEntry( id=row['id'], timestamp=row['timestamp'], status=row['status'], description=row['description'], parent_id=row['parent_id'] ) for row in rows ] # progress_entries.reverse() # Optional: uncomment to return oldest first return progress_entries except sqlite3.Error as e: raise DatabaseError(f"Failed to retrieve progress entries: {e}") finally: if cursor: cursor.close() def update_progress_entry(workspace_id: str, update_args: models.UpdateProgressArgs) -> bool: """ Updates an existing progress entry by its ID. Returns True if the entry was found and updated, False otherwise. """ conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block sql = "UPDATE progress_entries SET" updates = [] params_list: List[Any] = [] if update_args.status is not None: updates.append("status = ?") params_list.append(update_args.status) if update_args.description is not None: updates.append("description = ?") params_list.append(update_args.description) # Handle parent_id update, including setting to NULL if explicitly None is intended (though Pydantic allows Optional[int]) # If parent_id is provided as 0 or a positive int, update it. # If parent_id is provided as None, set the DB column to NULL. # If parent_id is NOT provided in args (remains default None), do not include in update. # The Pydantic model check_at_least_one_field ensures at least one field is provided, # so we don't need to worry about an empty updates list here. if 'parent_id' in update_args.model_fields_set: # Check if parent_id was explicitly set in the input args updates.append("parent_id = ?") params_list.append(update_args.parent_id) # SQLite handles Python None as NULL if not updates: # This case should be prevented by Pydantic model validation, but as a safeguard raise ValueError("No fields provided for update.") sql += " " + ", ".join(updates) + " WHERE id = ?" params_list.append(update_args.progress_id) params = tuple(params_list) try: cursor = conn.cursor() cursor.execute(sql, params) conn.commit() return cursor.rowcount > 0 # Return True if one row was updated except sqlite3.Error as e: conn.rollback() raise DatabaseError(f"Failed to update progress entry with ID {update_args.progress_id}: {e}") finally: if cursor: cursor.close() def delete_progress_entry_by_id(workspace_id: str, progress_id: int) -> bool: """ Deletes a progress entry by its ID. Note: This will also set the parent_id of any child tasks to NULL due to FOREIGN KEY ON DELETE SET NULL. Returns True if deleted, False otherwise. """ conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block sql = "DELETE FROM progress_entries WHERE id = ?" try: cursor = conn.cursor() cursor.execute(sql, (progress_id,)) conn.commit() return cursor.rowcount > 0 # Return True if one row was deleted except sqlite3.Error as e: conn.rollback() raise DatabaseError(f"Failed to delete progress entry with ID {progress_id}: {e}") finally: if cursor: cursor.close() def log_system_pattern(workspace_id: str, pattern_data: models.SystemPattern) -> models.SystemPattern: """Logs or updates a system pattern. Uses INSERT OR REPLACE based on unique name.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block # Use INSERT OR REPLACE to handle unique constraint on 'name' # This will overwrite the description and tags if the name already exists. sql = """ INSERT OR REPLACE INTO system_patterns (timestamp, name, description, tags) VALUES (?, ?, ?, ?) """ tags_json = json.dumps(pattern_data.tags) if pattern_data.tags is not None else None params = ( pattern_data.timestamp, pattern_data.name, pattern_data.description, tags_json ) try: cursor = conn.cursor() cursor.execute(sql, params) # We might not get the correct lastrowid if it replaced, # so we need to query back to get the ID if needed. # For now, just commit and assume success or handle error. # If returning the model with ID is critical, add a SELECT query here. conn.commit() # Query back to get the ID (optional, adds overhead) cursor.execute("SELECT id FROM system_patterns WHERE name = ?", (pattern_data.name,)) row = cursor.fetchone() if row: pattern_data.id = row['id'] return pattern_data # Return original data, possibly updated with ID except sqlite3.Error as e: conn.rollback() raise DatabaseError(f"Failed to log system pattern '{pattern_data.name}': {e}") finally: if cursor: cursor.close() def get_system_patterns( workspace_id: str, tags_filter_include_all: Optional[List[str]] = None, tags_filter_include_any: Optional[List[str]] = None # limit: Optional[int] = None, # Add if pagination is desired ) -> List[models.SystemPattern]: """Retrieves system patterns, optionally filtered by tags.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block base_sql = "SELECT id, timestamp, name, description, tags FROM system_patterns" order_by_clause = " ORDER BY name ASC" # params_list: List[Any] = [] # Not used for SQL filtering of tags for now # limit_clause = "" # if limit is not None and limit > 0: # limit_clause = " LIMIT ?" # params_list.append(limit) sql = base_sql + order_by_clause # + limit_clause # params_tuple = tuple(params_list) try: cursor = conn.cursor() cursor.execute(sql) #, params_tuple) rows = cursor.fetchall() patterns = [ models.SystemPattern( id=row['id'], timestamp=row['timestamp'], name=row['name'], description=row['description'], tags=json.loads(row['tags']) if row['tags'] else None ) for row in rows ] # Python-based filtering for tags if tags_filter_include_all: patterns = [ p for p in patterns if p.tags and all(tag in p.tags for tag in tags_filter_include_all) ] if tags_filter_include_any: patterns = [ p for p in patterns if p.tags and any(tag in p.tags for tag in tags_filter_include_any) ] return patterns except (sqlite3.Error, json.JSONDecodeError) as e: # Added JSONDecodeError raise DatabaseError(f"Failed to retrieve system patterns: {e}") finally: if cursor: cursor.close() def delete_system_pattern_by_id(workspace_id: str, pattern_id: int) -> bool: """Deletes a system pattern by its ID. Returns True if deleted, False otherwise.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block sql = "DELETE FROM system_patterns WHERE id = ?" # Note: System patterns do not currently have an FTS table, so no trigger concerns here. try: cursor = conn.cursor() cursor.execute(sql, (pattern_id,)) conn.commit() return cursor.rowcount > 0 except sqlite3.Error as e: conn.rollback() raise DatabaseError(f"Failed to delete system pattern with ID {pattern_id}: {e}") finally: if cursor: cursor.close() def log_custom_data(workspace_id: str, data: models.CustomData) -> models.CustomData: """Logs or updates a custom data entry. Uses INSERT OR REPLACE based on unique (category, key).""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block sql = """ INSERT OR REPLACE INTO custom_data (timestamp, category, key, value) VALUES (?, ?, ?, ?) """ try: cursor = conn.cursor() # Ensure value is serialized to JSON string value_json = json.dumps(data.value) params = ( data.timestamp, data.category, data.key, value_json ) cursor.execute(sql, params) conn.commit() # Query back to get ID if needed (similar to log_system_pattern) cursor.execute("SELECT id FROM custom_data WHERE category = ? AND key = ?", (data.category, data.key)) row = cursor.fetchone() if row: data.id = row['id'] return data except (sqlite3.Error, TypeError) as e: # TypeError for json.dumps conn.rollback() raise DatabaseError(f"Failed to log custom data for '{data.category}/{data.key}': {e}") finally: if cursor: cursor.close() def get_custom_data( workspace_id: str, category: Optional[str] = None, key: Optional[str] = None ) -> List[models.CustomData]: """Retrieves custom data entries, optionally filtered by category and/or key.""" if key and not category: raise ValueError("Cannot filter by key without specifying a category.") conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block sql = "SELECT id, timestamp, category, key, value FROM custom_data" conditions = [] params_list = [] if category: conditions.append("category = ?") params_list.append(category) if key: # We already ensured category is present if key is conditions.append("key = ?") params_list.append(key) if conditions: sql += " WHERE " + " AND ".join(conditions) sql += " ORDER BY category ASC, key ASC" # Consistent ordering params = tuple(params_list) try: cursor = conn.cursor() cursor.execute(sql, params) rows = cursor.fetchall() custom_data_list = [] for row in rows: try: # Deserialize value from JSON string value_data = json.loads(row['value']) custom_data_list.append( models.CustomData( id=row['id'], timestamp=row['timestamp'], category=row['category'], key=row['key'], value=value_data ) ) except json.JSONDecodeError as e: # Log or handle error for specific row if JSON is invalid print(f"Warning: Failed to decode JSON for custom_data id={row['id']}: {e}") # Replace with proper logging continue # Skip this row return custom_data_list except sqlite3.Error as e: raise DatabaseError(f"Failed to retrieve custom data: {e}") finally: if cursor: cursor.close() def delete_custom_data(workspace_id: str, category: str, key: str) -> bool: """Deletes a specific custom data entry by category and key. Returns True if deleted, False otherwise.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block sql = "DELETE FROM custom_data WHERE category = ? AND key = ?" params = (category, key) try: cursor = conn.cursor() cursor.execute(sql, params) conn.commit() return cursor.rowcount > 0 # Return True if one row was deleted except sqlite3.Error as e: conn.rollback() raise DatabaseError(f"Failed to delete custom data for '{category}/{key}': {e}") finally: if cursor: cursor.close() def log_context_link(workspace_id: str, link_data: models.ContextLink) -> models.ContextLink: """Logs a new context link.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block sql = """ INSERT INTO context_links ( workspace_id, source_item_type, source_item_id, target_item_type, target_item_id, relationship_type, description, timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """ # Use link_data.timestamp if provided (e.g. from an import), else it defaults in DB # However, our Pydantic model ContextLink has default_factory=datetime.utcnow for timestamp # So, link_data.timestamp will always be populated. params = ( workspace_id, # Storing workspace_id explicitly in the table link_data.source_item_type, str(link_data.source_item_id), # Ensure IDs are stored as text link_data.target_item_type, str(link_data.target_item_id), # Ensure IDs are stored as text link_data.relationship_type, link_data.description, link_data.timestamp # Pydantic model ensures this is set ) try: cursor = conn.cursor() cursor.execute(sql, params) link_id = cursor.lastrowid conn.commit() link_data.id = link_id # The timestamp from the DB default might be slightly different if we didn't pass it, # but since our Pydantic model sets it, what we have in link_data.timestamp is accurate. return link_data except sqlite3.Error as e: conn.rollback() raise DatabaseError(f"Failed to log context link: {e}") finally: if cursor: cursor.close() def get_context_links( workspace_id: str, item_type: str, item_id: str, relationship_type_filter: Optional[str] = None, linked_item_type_filter: Optional[str] = None, limit: Optional[int] = None ) -> List[models.ContextLink]: """ Retrieves links for a given item, with optional filters. Finds links where the given item is EITHER the source OR the target. """ conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block # Ensure item_id is treated as string for consistent querying with TEXT columns str_item_id = str(item_id) base_sql = """ SELECT id, timestamp, workspace_id, source_item_type, source_item_id, target_item_type, target_item_id, relationship_type, description FROM context_links """ conditions = [] params_list = [] # Main condition: item is either source or target conditions.append( "((source_item_type = ? AND source_item_id = ?) OR (target_item_type = ? AND target_item_id = ?))" ) params_list.extend([item_type, str_item_id, item_type, str_item_id]) # Add workspace_id filter for safety, though connection is already workspace-specific conditions.append("workspace_id = ?") params_list.append(workspace_id) if relationship_type_filter: conditions.append("relationship_type = ?") params_list.append(relationship_type_filter) if linked_item_type_filter: # This filter applies to the "other end" of the link conditions.append( "((source_item_type = ? AND source_item_id = ? AND target_item_type = ?) OR " + "(target_item_type = ? AND target_item_id = ? AND source_item_type = ?))" ) params_list.extend([item_type, str_item_id, linked_item_type_filter, item_type, str_item_id, linked_item_type_filter]) if conditions: sql = base_sql + " WHERE " + " AND ".join(conditions) else: # Should not happen due to main condition and workspace_id sql = base_sql sql += " ORDER BY timestamp DESC" if limit is not None and limit > 0: sql += " LIMIT ?" params_list.append(limit) params = tuple(params_list) try: cursor = conn.cursor() cursor.execute(sql, params) rows = cursor.fetchall() links = [ models.ContextLink( id=row['id'], timestamp=row['timestamp'], # workspace_id=row['workspace_id'], # Not part of ContextLink Pydantic model source_item_type=row['source_item_type'], source_item_id=row['source_item_id'], target_item_type=row['target_item_type'], target_item_id=row['target_item_id'], relationship_type=row['relationship_type'], description=row['description'] ) for row in rows ] return links except sqlite3.Error as e: raise DatabaseError(f"Failed to retrieve context links: {e}") finally: if cursor: cursor.close() def search_project_glossary_fts(workspace_id: str, query_term: str, limit: Optional[int] = 10) -> List[models.CustomData]: """Searches ProjectGlossary entries in custom_data using FTS5.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block # Updated to use the new general custom_data_fts table structure sql = """ SELECT cd.id, cd.category, cd.key, cd.value FROM custom_data_fts fts JOIN custom_data cd ON fts.rowid = cd.id WHERE fts.custom_data_fts MATCH ? AND fts.category = 'ProjectGlossary' ORDER BY rank """ # The MATCH query will search category, key, and value_text. # We explicitly filter for ProjectGlossary category after the FTS match. # Note: The MATCH query will search across 'term' and 'definition_text' columns in custom_data_fts params_list = [query_term] if limit is not None and limit > 0: sql += " LIMIT ?" params_list.append(limit) try: cursor = conn.cursor() cursor.execute(sql, tuple(params_list)) rows = cursor.fetchall() glossary_entries = [] for row in rows: try: value_data = json.loads(row['value']) glossary_entries.append( models.CustomData( id=row['id'], category=row['category'], key=row['key'], value=value_data ) ) except json.JSONDecodeError as e: # Log or handle error for specific row if JSON is invalid print(f"Warning: Failed to decode JSON for glossary item id={row['id']}: {e}") # Replace with proper logging continue # Skip this row return glossary_entries except sqlite3.Error as e: raise DatabaseError(f"Failed FTS search on ProjectGlossary for term '{query_term}': {e}") finally: if cursor: cursor.close() def search_custom_data_value_fts( workspace_id: str, query_term: str, category_filter: Optional[str] = None, limit: Optional[int] = 10 ) -> List[models.CustomData]: """Searches all custom_data entries using FTS5 on category, key, and value. Optionally filters by category after FTS.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block sql = """ SELECT cd.id, cd.timestamp, cd.category, cd.key, cd.value FROM custom_data_fts fts JOIN custom_data cd ON fts.rowid = cd.id WHERE fts.custom_data_fts MATCH ? """ params_list = [query_term] if category_filter: sql += " AND fts.category = ?" # Filter by category on the FTS table params_list.append(category_filter) sql += " ORDER BY rank" if limit is not None and limit > 0: sql += " LIMIT ?" params_list.append(limit) try: cursor = conn.cursor() cursor.execute(sql, tuple(params_list)) rows = cursor.fetchall() results = [] for row in rows: try: cursor = conn.cursor() value_data = json.loads(row['value']) results.append( models.CustomData( id=row['id'], timestamp=row['timestamp'], category=row['category'], key=row['key'], value=value_data ) ) except json.JSONDecodeError as e: print(f"Warning: Failed to decode JSON for custom_data id={row['id']} (search_custom_data_value_fts): {e}") continue return results except sqlite3.Error as e: raise DatabaseError(f"Failed FTS search on custom_data for term '{query_term}': {e}") finally: if cursor: cursor.close() def get_item_history( workspace_id: str, args: models.GetItemHistoryArgs ) -> List[Dict[str, Any]]: # Returning list of dicts for now, could be Pydantic models """Retrieves history for product_context or active_context.""" conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block if args.item_type == "product_context": history_table_name = "product_context_history" # history_model = models.ProductContextHistory # If returning Pydantic models elif args.item_type == "active_context": history_table_name = "active_context_history" # history_model = models.ActiveContextHistory # If returning Pydantic models else: # This should be caught by Pydantic validation in GetItemHistoryArgs raise ValueError("Invalid item_type for history retrieval.") sql = f"SELECT id, timestamp, version, content, change_source FROM {history_table_name}" conditions = [] params_list = [] if args.version is not None: conditions.append("version = ?") params_list.append(args.version) if args.before_timestamp: conditions.append("timestamp < ?") params_list.append(args.before_timestamp) if args.after_timestamp: conditions.append("timestamp > ?") params_list.append(args.after_timestamp) # Add workspace_id filter if it were part of the history table (it's not currently) # conditions.append("workspace_id = ?") # params_list.append(workspace_id) if conditions: sql += " WHERE " + " AND ".join(conditions) sql += " ORDER BY version DESC, timestamp DESC" # Most recent version/timestamp first if args.limit is not None and args.limit > 0: sql += " LIMIT ?" params_list.append(args.limit) params = tuple(params_list) try: cursor = conn.cursor() cursor.execute(sql, params) rows = cursor.fetchall() history_entries = [] for row in rows: content_dict = json.loads(row['content']) history_entries.append({ "id": row['id'], "timestamp": row['timestamp'], # Already datetime object "version": row['version'], "content": content_dict, "change_source": row['change_source'] }) # Or if using Pydantic models: # history_entries.append(history_model(id=row['id'], timestamp=row['timestamp'], ...)) return history_entries except (sqlite3.Error, json.JSONDecodeError) as e: raise DatabaseError(f"Failed to retrieve history for {args.item_type}: {e}") finally: if cursor: cursor.close() # --- Recent Activity Summary --- def get_recent_activity_summary_data( workspace_id: str, hours_ago: Optional[int] = None, since_timestamp: Optional[datetime] = None, limit_per_type: int = 5 ) -> Dict[str, Any]: """ Retrieves a summary of recent activity across various ConPort items. """ conn = get_db_connection(workspace_id) cursor = None # Initialize cursor for finally block summary_results: Dict[str, Any] = { "recent_decisions": [], "recent_progress_entries": [], "recent_product_context_updates": [], "recent_active_context_updates": [], "recent_links_created": [], "recent_system_patterns": [], # Added for System Patterns "notes": [] } now_utc = datetime.now(timezone.utc) summary_results["summary_period_end"] = now_utc.isoformat() if since_timestamp: start_datetime = since_timestamp elif hours_ago: start_datetime = now_utc - timedelta(hours=hours_ago) else: start_datetime = now_utc - timedelta(hours=24) # Default to last 24 hours summary_results["summary_period_start"] = start_datetime.isoformat() try: cursor = conn.cursor() # Recent Decisions cursor.execute( """ SELECT id, timestamp, summary, rationale, implementation_details, tags FROM decisions WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT ? """, (start_datetime, limit_per_type) ) rows = cursor.fetchall() summary_results["recent_decisions"] = [ models.Decision( id=row['id'], timestamp=row['timestamp'], summary=row['summary'], rationale=row['rationale'], implementation_details=row['implementation_details'], tags=json.loads(row['tags']) if row['tags'] else None ).model_dump(mode='json') for row in rows ] # Recent Progress Entries cursor.execute( """ SELECT id, timestamp, status, description, parent_id FROM progress_entries WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT ? """, (start_datetime, limit_per_type) ) rows = cursor.fetchall() summary_results["recent_progress_entries"] = [ models.ProgressEntry( id=row['id'], timestamp=row['timestamp'], status=row['status'], description=row['description'], parent_id=row['parent_id'] ).model_dump(mode='json') for row in rows ] # Recent Product Context Updates (from history) cursor.execute( """ SELECT id, timestamp, version, content, change_source FROM product_context_history WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT ? """, (start_datetime, limit_per_type) ) rows = cursor.fetchall() summary_results["recent_product_context_updates"] = [ models.ProductContextHistory( id=row['id'], timestamp=row['timestamp'], version=row['version'], content=json.loads(row['content']), change_source=row['change_source'] ).model_dump(mode='json') for row in rows ] # Recent Active Context Updates (from history) cursor.execute( """ SELECT id, timestamp, version, content, change_source FROM active_context_history WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT ? """, (start_datetime, limit_per_type) ) rows = cursor.fetchall() summary_results["recent_active_context_updates"] = [ models.ActiveContextHistory( id=row['id'], timestamp=row['timestamp'], version=row['version'], content=json.loads(row['content']), change_source=row['change_source'] ).model_dump(mode='json') for row in rows ] # Recent Links Created cursor.execute( """ SELECT id, timestamp, source_item_type, source_item_id, target_item_type, target_item_id, relationship_type, description FROM context_links WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT ? """, (start_datetime, limit_per_type) ) rows = cursor.fetchall() summary_results["recent_links_created"] = [ models.ContextLink( id=row['id'], timestamp=row['timestamp'], source_item_type=row['source_item_type'], source_item_id=row['source_item_id'], target_item_type=row['target_item_type'], target_item_id=row['target_item_id'], relationship_type=row['relationship_type'], description=row['description'] ).model_dump(mode='json') for row in rows ] # Recent System Patterns cursor.execute( """ SELECT id, timestamp, name, description, tags FROM system_patterns WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT ? """, (start_datetime, limit_per_type) ) rows = cursor.fetchall() summary_results["recent_system_patterns"] = [ models.SystemPattern( id=row['id'], timestamp=row['timestamp'], name=row['name'], description=row['description'], tags=json.loads(row['tags']) if row['tags'] else None ).model_dump(mode='json') for row in rows ] # Note about missing timestamps (removed as all now have timestamps) # summary_results["notes"].append( # "General Custom Data entries are not included in this summary " # "as they currently do not have creation/update timestamps in the database." # ) return summary_results except (sqlite3.Error, json.JSONDecodeError) as e: raise DatabaseError(f"Failed to retrieve recent activity summary: {e}") finally: if cursor: cursor.close() # (All planned CRUD functions implemented) # --- Cleanup --- # Consider using a context manager or atexit to ensure connections are closed import atexit atexit.register(close_all_connections)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GreatScottyMac/context-portal'

If you have feedback or need assistance with the MCP directory API, please join our Discord server