Skip to main content
Glama

MCP Standards

by airmcp-com
schema_migration.py11.2 kB
"""Database Schema Migration for Self-Learning Features Extends the existing MCP Standards schema with new tables for: - Tool execution tracking - Pattern frequency analysis - Tool preferences (learned rules) - Agent performance tracking - Validation gates - Knowledge evolution (temporal tracking) - Spec validation """ import sqlite3 from pathlib import Path from datetime import datetime class SchemaMigration: """Handles database schema migrations""" def __init__(self, db_path: Path): self.db_path = db_path def migrate(self) -> bool: """ Run all migrations Returns: True if migrations successful, False otherwise """ try: with sqlite3.connect(self.db_path) as conn: # Create migrations table if it doesn't exist conn.execute(""" CREATE TABLE IF NOT EXISTS schema_migrations ( id INTEGER PRIMARY KEY, version INTEGER NOT NULL UNIQUE, description TEXT, applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Run migrations in order self._migrate_v1_tool_executions(conn) self._migrate_v2_pattern_frequency(conn) self._migrate_v3_tool_preferences(conn) self._migrate_v4_agent_performance(conn) self._migrate_v5_validation_gates(conn) self._migrate_v6_knowledge_evolution(conn) self._migrate_v7_spec_validation(conn) conn.commit() return True except Exception as e: print(f"Migration error: {e}") return False def _is_migration_applied(self, conn: sqlite3.Connection, version: int) -> bool: """Check if a migration version has been applied""" cursor = conn.execute(""" SELECT COUNT(*) FROM schema_migrations WHERE version = ? """, (version,)) return cursor.fetchone()[0] > 0 def _record_migration(self, conn: sqlite3.Connection, version: int, description: str): """Record that a migration has been applied""" conn.execute(""" INSERT INTO schema_migrations (version, description) VALUES (?, ?) """, (version, description)) def _migrate_v1_tool_executions(self, conn: sqlite3.Connection): """Migration 1: Tool executions table""" if self._is_migration_applied(conn, 1): return conn.execute(""" CREATE TABLE IF NOT EXISTS tool_executions ( id INTEGER PRIMARY KEY AUTOINCREMENT, tool_name TEXT NOT NULL, args TEXT NOT NULL, result TEXT, significance REAL NOT NULL, project_path TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, session_id TEXT ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_tool_exec_sig ON tool_executions(significance DESC)") conn.execute("CREATE INDEX IF NOT EXISTS idx_tool_exec_time ON tool_executions(timestamp DESC)") conn.execute("CREATE INDEX IF NOT EXISTS idx_tool_exec_project ON tool_executions(project_path)") self._record_migration(conn, 1, "Tool executions tracking table") def _migrate_v2_pattern_frequency(self, conn: sqlite3.Connection): """Migration 2: Pattern frequency tracking""" if self._is_migration_applied(conn, 2): return conn.execute(""" CREATE TABLE IF NOT EXISTS pattern_frequency ( id INTEGER PRIMARY KEY AUTOINCREMENT, pattern_key TEXT NOT NULL UNIQUE, tool_name TEXT NOT NULL, pattern_type TEXT, pattern_description TEXT, occurrence_count INTEGER DEFAULT 1, first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, promoted_to_preference BOOLEAN DEFAULT FALSE, confidence REAL DEFAULT 0.0, examples TEXT ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_pattern_freq ON pattern_frequency(occurrence_count DESC)") conn.execute("CREATE INDEX IF NOT EXISTS idx_pattern_tool ON pattern_frequency(tool_name)") conn.execute("CREATE INDEX IF NOT EXISTS idx_pattern_type ON pattern_frequency(pattern_type)") self._record_migration(conn, 2, "Pattern frequency tracking") def _migrate_v3_tool_preferences(self, conn: sqlite3.Connection): """Migration 3: Tool preferences (learned rules)""" if self._is_migration_applied(conn, 3): return conn.execute(""" CREATE TABLE IF NOT EXISTS tool_preferences ( id INTEGER PRIMARY KEY AUTOINCREMENT, category TEXT NOT NULL, context TEXT NOT NULL, preference TEXT NOT NULL, confidence REAL DEFAULT 0.5, examples TEXT, learned_from TEXT, last_validated TIMESTAMP, last_applied TIMESTAMP, apply_count INTEGER DEFAULT 0, project_specific BOOLEAN DEFAULT FALSE, project_path TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_pref_confidence ON tool_preferences(confidence DESC)") conn.execute("CREATE INDEX IF NOT EXISTS idx_pref_category ON tool_preferences(category)") conn.execute("CREATE INDEX IF NOT EXISTS idx_pref_project ON tool_preferences(project_path)") self._record_migration(conn, 3, "Tool preferences (learned rules)") def _migrate_v4_agent_performance(self, conn: sqlite3.Connection): """Migration 4: Agent performance tracking""" if self._is_migration_applied(conn, 4): return conn.execute(""" CREATE TABLE IF NOT EXISTS agent_performance ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_type TEXT NOT NULL, task_category TEXT, success BOOLEAN, correction_count INTEGER DEFAULT 0, execution_time REAL, context TEXT, project_path TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_agent_perf_type ON agent_performance(agent_type, success)") conn.execute("CREATE INDEX IF NOT EXISTS idx_agent_perf_time ON agent_performance(timestamp DESC)") self._record_migration(conn, 4, "Agent performance tracking") def _migrate_v5_validation_gates(self, conn: sqlite3.Connection): """Migration 5: Validation gates""" if self._is_migration_applied(conn, 5): return conn.execute(""" CREATE TABLE IF NOT EXISTS validation_gates ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_path TEXT NOT NULL, gate_type TEXT NOT NULL, required BOOLEAN DEFAULT TRUE, auto_check BOOLEAN DEFAULT TRUE, check_command TEXT, failure_count INTEGER DEFAULT 0, last_checked TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.execute(""" CREATE UNIQUE INDEX IF NOT EXISTS idx_gate_project_type ON validation_gates(project_path, gate_type) """) self._record_migration(conn, 5, "Validation gates") def _migrate_v6_knowledge_evolution(self, conn: sqlite3.Connection): """Migration 6: Knowledge evolution (temporal tracking)""" if self._is_migration_applied(conn, 6): return conn.execute(""" CREATE TABLE IF NOT EXISTS knowledge_evolution ( id INTEGER PRIMARY KEY AUTOINCREMENT, preference_id INTEGER, change_type TEXT, old_value TEXT, new_value TEXT, reason TEXT, event_time TIMESTAMP, record_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(preference_id) REFERENCES tool_preferences(id) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_evo_pref ON knowledge_evolution(preference_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_evo_time ON knowledge_evolution(event_time)") self._record_migration(conn, 6, "Knowledge evolution (temporal tracking)") def _migrate_v7_spec_validation(self, conn: sqlite3.Connection): """Migration 7: Spec validation tracking""" if self._is_migration_applied(conn, 7): return conn.execute(""" CREATE TABLE IF NOT EXISTS spec_validations ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_description TEXT NOT NULL, spec_captured TEXT NOT NULL, deliverable_summary TEXT, validation_status TEXT, gaps_found TEXT, project_path TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_spec_val_status ON spec_validations(validation_status)") conn.execute("CREATE INDEX IF NOT EXISTS idx_spec_val_project ON spec_validations(project_path)") self._record_migration(conn, 7, "Spec validation tracking") def get_schema_version(self) -> int: """Get current schema version""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(""" SELECT MAX(version) FROM schema_migrations """) result = cursor.fetchone() return result[0] if result[0] is not None else 0 except sqlite3.OperationalError: # Table doesn't exist yet return 0 def migrate_database(db_path: Path = None) -> bool: """ Convenience function to migrate database Args: db_path: Path to database (defaults to ~/.mcp-standards/knowledge.db) Returns: True if successful, False otherwise """ if db_path is None: db_path = Path.home() / ".mcp-standards" / "knowledge.db" db_path.parent.mkdir(parents=True, exist_ok=True) migration = SchemaMigration(db_path) success = migration.migrate() if success: version = migration.get_schema_version() print(f"✓ Database migrated to version {version}") else: print("✗ Migration failed") return success if __name__ == "__main__": # Run migration when script is executed directly import sys if len(sys.argv) > 1: db_path = Path(sys.argv[1]) else: db_path = None success = migrate_database(db_path) sys.exit(0 if success else 1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/airmcp-com/mcp-standards'

If you have feedback or need assistance with the MCP directory API, please join our Discord server