Skip to main content
Glama

TimeLooker MCP Server

init_db.pyโ€ข7.86 kB
""" Database initialization script for TimeLooker MCP Server. Handles database creation, migration, and sample data setup. Supports both local SQLite and AWS cloud PostgreSQL modes. """ import os import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.core.models import db_manager, Task, SearchResult, Execution from src.utils.logging_config import setup_logging from src.utils.aws_secrets import setup_environment_from_secrets, is_cloud_mode logger = setup_logging(__name__) def init_database(): """Initialize the database with tables.""" logger.info("Initializing database...") # Setup environment from AWS Secrets Manager if in cloud mode if is_cloud_mode(): logger.info("๐ŸŒฉ๏ธ Cloud mode detected - retrieving credentials from AWS Secrets Manager...") setup_environment_from_secrets() else: logger.info("๐Ÿ  Local mode detected - using local configuration...") try: # Create all tables db_manager.create_tables() logger.info("โœ“ Database tables created successfully") # Test database connection with proper session management session = db_manager.get_session() try: # Check if tables exist by trying to query them task_count = session.query(Task).count() result_count = session.query(SearchResult).count() execution_count = session.query(Execution).count() logger.info("โœ“ Database connection test successful") logger.info(f" - Tasks: {task_count}") logger.info(f" - Search Results: {result_count}") logger.info(f" - Executions: {execution_count}") return True finally: session.close() except Exception as e: logger.error(f"โœ— Database initialization failed: {e}") return False def reset_database(): """Reset the database by dropping and recreating all tables.""" print("Resetting database...") try: # Drop all tables db_manager.drop_tables() print("โœ“ Dropped existing tables") # Create all tables db_manager.create_tables() print("โœ“ Recreated all tables") except Exception as e: print(f"โœ— Database reset failed: {e}") sys.exit(1) def create_sample_task(): """Create a sample task for testing using core TaskManager.""" logger.info("Creating sample task...") try: from src.core.task_manager import TaskManager task_manager = TaskManager() task_id = task_manager.create_task( task_description="AI Ethics and Safety openings fit for a PhD in Computer Science", frequency_minutes=60, runtime_minutes=300, recipient_email="user@example.com", sender_email="test@example.com" ) logger.info(f"โœ“ Created sample task with ID: {task_id}") return task_id except Exception as e: logger.error(f"โœ— Failed to create sample task: {e}") return None def check_schema_version(): """Check current database schema version.""" logger.info("Checking database schema version...") try: session = db_manager.get_session() try: # Try to check if we have any version tracking table # For now, just check if basic tables exist task_count = session.query(Task).count() logger.info(f"โœ“ Schema check complete - {task_count} tasks in database") return "1.0.0" # Current version finally: session.close() except Exception as e: logger.warning(f"Schema version check failed: {e}") return None def validate_database_integrity(): """Validate database integrity and relationships.""" logger.info("Validating database integrity...") try: session = db_manager.get_session() try: # Check for orphaned records tasks_with_results = session.query(SearchResult.task_id).distinct().all() tasks_with_executions = session.query(Execution.task_id).distinct().all() all_task_ids = [t.id for t in session.query(Task.id).all()] orphaned_results = [r[0] for r in tasks_with_results if r[0] not in all_task_ids] orphaned_executions = [e[0] for e in tasks_with_executions if e[0] not in all_task_ids] if orphaned_results: logger.warning(f"Found {len(orphaned_results)} orphaned search results") if orphaned_executions: logger.warning(f"Found {len(orphaned_executions)} orphaned executions") if not orphaned_results and not orphaned_executions: logger.info("โœ“ Database integrity check passed") return True else: logger.warning("โš ๏ธ Database integrity issues found") return False finally: session.close() except Exception as e: logger.error(f"Database integrity check failed: {e}") return False def migrate_database(): """Handle database migrations for schema changes.""" logger.info("Checking for database migrations...") current_version = check_schema_version() target_version = "1.0.0" # Current target version if current_version == target_version: logger.info(f"โœ“ Database is up to date (version {current_version})") return True if current_version is None: logger.info("No existing schema found - creating new database") return init_database() # Future migrations would go here logger.info(f"Migration from {current_version} to {target_version} not needed") return True if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Database management for TimeLooker MCP Server") parser.add_argument("--reset", action="store_true", help="Reset database (drop and recreate tables)") parser.add_argument("--sample", action="store_true", help="Create sample task") parser.add_argument("--migrate", action="store_true", help="Run database migrations") parser.add_argument("--validate", action="store_true", help="Validate database integrity") parser.add_argument("--version", action="store_true", help="Check schema version") args = parser.parse_args() success = True if args.version: version = check_schema_version() if version: logger.info(f"Current schema version: {version}") else: logger.error("Could not determine schema version") success = False elif args.validate: success = validate_database_integrity() elif args.migrate: success = migrate_database() elif args.reset: logger.info("Resetting database...") try: db_manager.drop_tables() logger.info("โœ“ Dropped existing tables") success = init_database() except Exception as e: logger.error(f"โœ— Database reset failed: {e}") success = False else: success = init_database() if args.sample and success: sample_task_id = create_sample_task() if sample_task_id: logger.info(f"Sample task created with ID: {sample_task_id}") else: success = False if success: logger.info("โœ“ Database operations completed successfully!") else: logger.error("โŒ Database operations failed!") sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fortnightly-devs/mcp-x402-task-scheduler'

If you have feedback or need assistance with the MCP directory API, please join our Discord server