init_db.pyโข7.86 kB
"""
Database initialization script for TimeLooker MCP Server.
Handles database creation, migration, and sample data setup.
Supports both local SQLite and AWS cloud PostgreSQL modes.
"""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.core.models import db_manager, Task, SearchResult, Execution
from src.utils.logging_config import setup_logging
from src.utils.aws_secrets import setup_environment_from_secrets, is_cloud_mode
logger = setup_logging(__name__)
def init_database():
"""Initialize the database with tables."""
logger.info("Initializing database...")
# Setup environment from AWS Secrets Manager if in cloud mode
if is_cloud_mode():
logger.info("๐ฉ๏ธ Cloud mode detected - retrieving credentials from AWS Secrets Manager...")
setup_environment_from_secrets()
else:
logger.info("๐ Local mode detected - using local configuration...")
try:
# Create all tables
db_manager.create_tables()
logger.info("โ Database tables created successfully")
# Test database connection with proper session management
session = db_manager.get_session()
try:
# Check if tables exist by trying to query them
task_count = session.query(Task).count()
result_count = session.query(SearchResult).count()
execution_count = session.query(Execution).count()
logger.info("โ Database connection test successful")
logger.info(f" - Tasks: {task_count}")
logger.info(f" - Search Results: {result_count}")
logger.info(f" - Executions: {execution_count}")
return True
finally:
session.close()
except Exception as e:
logger.error(f"โ Database initialization failed: {e}")
return False
def reset_database():
"""Reset the database by dropping and recreating all tables."""
print("Resetting database...")
try:
# Drop all tables
db_manager.drop_tables()
print("โ Dropped existing tables")
# Create all tables
db_manager.create_tables()
print("โ Recreated all tables")
except Exception as e:
print(f"โ Database reset failed: {e}")
sys.exit(1)
def create_sample_task():
"""Create a sample task for testing using core TaskManager."""
logger.info("Creating sample task...")
try:
from src.core.task_manager import TaskManager
task_manager = TaskManager()
task_id = task_manager.create_task(
task_description="AI Ethics and Safety openings fit for a PhD in Computer Science",
frequency_minutes=60,
runtime_minutes=300,
recipient_email="user@example.com",
sender_email="test@example.com"
)
logger.info(f"โ Created sample task with ID: {task_id}")
return task_id
except Exception as e:
logger.error(f"โ Failed to create sample task: {e}")
return None
def check_schema_version():
"""Check current database schema version."""
logger.info("Checking database schema version...")
try:
session = db_manager.get_session()
try:
# Try to check if we have any version tracking table
# For now, just check if basic tables exist
task_count = session.query(Task).count()
logger.info(f"โ Schema check complete - {task_count} tasks in database")
return "1.0.0" # Current version
finally:
session.close()
except Exception as e:
logger.warning(f"Schema version check failed: {e}")
return None
def validate_database_integrity():
"""Validate database integrity and relationships."""
logger.info("Validating database integrity...")
try:
session = db_manager.get_session()
try:
# Check for orphaned records
tasks_with_results = session.query(SearchResult.task_id).distinct().all()
tasks_with_executions = session.query(Execution.task_id).distinct().all()
all_task_ids = [t.id for t in session.query(Task.id).all()]
orphaned_results = [r[0] for r in tasks_with_results if r[0] not in all_task_ids]
orphaned_executions = [e[0] for e in tasks_with_executions if e[0] not in all_task_ids]
if orphaned_results:
logger.warning(f"Found {len(orphaned_results)} orphaned search results")
if orphaned_executions:
logger.warning(f"Found {len(orphaned_executions)} orphaned executions")
if not orphaned_results and not orphaned_executions:
logger.info("โ Database integrity check passed")
return True
else:
logger.warning("โ ๏ธ Database integrity issues found")
return False
finally:
session.close()
except Exception as e:
logger.error(f"Database integrity check failed: {e}")
return False
def migrate_database():
"""Handle database migrations for schema changes."""
logger.info("Checking for database migrations...")
current_version = check_schema_version()
target_version = "1.0.0" # Current target version
if current_version == target_version:
logger.info(f"โ Database is up to date (version {current_version})")
return True
if current_version is None:
logger.info("No existing schema found - creating new database")
return init_database()
# Future migrations would go here
logger.info(f"Migration from {current_version} to {target_version} not needed")
return True
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Database management for TimeLooker MCP Server")
parser.add_argument("--reset", action="store_true", help="Reset database (drop and recreate tables)")
parser.add_argument("--sample", action="store_true", help="Create sample task")
parser.add_argument("--migrate", action="store_true", help="Run database migrations")
parser.add_argument("--validate", action="store_true", help="Validate database integrity")
parser.add_argument("--version", action="store_true", help="Check schema version")
args = parser.parse_args()
success = True
if args.version:
version = check_schema_version()
if version:
logger.info(f"Current schema version: {version}")
else:
logger.error("Could not determine schema version")
success = False
elif args.validate:
success = validate_database_integrity()
elif args.migrate:
success = migrate_database()
elif args.reset:
logger.info("Resetting database...")
try:
db_manager.drop_tables()
logger.info("โ Dropped existing tables")
success = init_database()
except Exception as e:
logger.error(f"โ Database reset failed: {e}")
success = False
else:
success = init_database()
if args.sample and success:
sample_task_id = create_sample_task()
if sample_task_id:
logger.info(f"Sample task created with ID: {sample_task_id}")
else:
success = False
if success:
logger.info("โ Database operations completed successfully!")
else:
logger.error("โ Database operations failed!")
sys.exit(1)