database-migrations.mdโข31.4 kB
# Database Migrations
This document provides comprehensive guidance on managing database schema changes in MockLoop MCP through the migration system. The migration system ensures safe, versioned, and reversible database schema updates.
## Overview
MockLoop MCP uses a robust migration system to manage database schema changes across different environments. The system supports:
- **Versioned Migrations**: Each migration has a unique version number
- **Reversible Changes**: All migrations can be rolled back
- **Environment Safety**: Migrations work across SQLite, PostgreSQL, and MySQL
- **Dependency Management**: Migrations can depend on other migrations
- **Validation**: Schema validation before and after migrations
- **Backup Integration**: Automatic backups before major changes
## Migration System Architecture
### Migration Structure
```python
from abc import ABC, abstractmethod
from typing import Optional, List
import logging
class Migration(ABC):
"""Base class for all database migrations."""
version: int
description: str
dependencies: List[int] = []
@abstractmethod
def up(self, connection: DatabaseConnection) -> None:
"""Apply the migration."""
pass
@abstractmethod
def down(self, connection: DatabaseConnection) -> None:
"""Reverse the migration."""
pass
def validate(self, connection: DatabaseConnection) -> bool:
"""Validate migration can be applied."""
return True
def backup_required(self) -> bool:
"""Whether this migration requires a backup."""
return False
class MigrationManager:
"""Manages database migrations."""
def __init__(self, database_url: str, migrations_path: str):
self.database = DatabaseConnection(database_url)
self.migrations_path = migrations_path
self.logger = logging.getLogger(__name__)
async def get_current_version(self) -> int:
"""Get current database schema version."""
async def get_pending_migrations(self) -> List[Migration]:
"""Get list of pending migrations."""
async def apply_migration(self, migration: Migration) -> bool:
"""Apply a single migration."""
async def rollback_migration(self, migration: Migration) -> bool:
"""Rollback a single migration."""
async def migrate_to_version(self, target_version: int) -> bool:
"""Migrate to specific version."""
```
### Migration Discovery
```python
class MigrationDiscovery:
"""Discovers and loads migration files."""
def __init__(self, migrations_path: str):
self.migrations_path = Path(migrations_path)
def discover_migrations(self) -> List[Migration]:
"""Discover all migration files."""
migrations = []
for file_path in self.migrations_path.glob("*.py"):
if file_path.name.startswith("migration_"):
migration = self.load_migration(file_path)
if migration:
migrations.append(migration)
return sorted(migrations, key=lambda m: m.version)
def load_migration(self, file_path: Path) -> Optional[Migration]:
"""Load migration from file."""
try:
spec = importlib.util.spec_from_file_location("migration", file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Find migration class
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (isinstance(attr, type) and
issubclass(attr, Migration) and
attr != Migration):
return attr()
except Exception as e:
self.logger.error(f"Failed to load migration {file_path}: {e}")
return None
```
## Creating Migrations
### Migration File Structure
Migration files follow a specific naming convention and structure:
```python
# migrations/migration_001_initial_schema.py
from mockloop_mcp.database.migration import Migration
from mockloop_mcp.database.connection import DatabaseConnection
class InitialSchema(Migration):
version = 1
description = "Create initial database schema"
def up(self, connection: DatabaseConnection) -> None:
"""Create initial tables."""
# Create request_logs table
connection.execute("""
CREATE TABLE request_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
method VARCHAR(10) NOT NULL,
path TEXT NOT NULL,
query_params TEXT,
headers TEXT,
request_body TEXT,
response_status INTEGER NOT NULL,
response_headers TEXT,
response_body TEXT,
response_time_ms INTEGER,
server_id VARCHAR(255),
client_ip VARCHAR(45),
user_agent TEXT,
request_id VARCHAR(255),
scenario_name VARCHAR(255),
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
# Create indexes
connection.execute("""
CREATE INDEX idx_request_logs_timestamp ON request_logs(timestamp)
""")
connection.execute("""
CREATE INDEX idx_request_logs_server_id ON request_logs(server_id)
""")
# Create mock_servers table
connection.execute("""
CREATE TABLE mock_servers (
id VARCHAR(255) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
spec_path TEXT NOT NULL,
spec_content TEXT,
output_directory TEXT NOT NULL,
port INTEGER,
status VARCHAR(50) NOT NULL DEFAULT 'stopped',
pid INTEGER,
config TEXT,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
started_at DATETIME,
stopped_at DATETIME
)
""")
# Create schema_version table
connection.execute("""
CREATE TABLE schema_version (
version INTEGER PRIMARY KEY,
description TEXT,
applied_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
# Insert initial version
connection.execute("""
INSERT INTO schema_version (version, description)
VALUES (1, 'Initial schema')
""")
def down(self, connection: DatabaseConnection) -> None:
"""Drop all tables."""
connection.execute("DROP TABLE IF EXISTS request_logs")
connection.execute("DROP TABLE IF EXISTS mock_servers")
connection.execute("DROP TABLE IF EXISTS schema_version")
```
### Adding New Tables
```python
# migrations/migration_002_add_scenarios.py
class AddScenariosTable(Migration):
version = 2
description = "Add scenarios table for mock response management"
dependencies = [1] # Depends on initial schema
def up(self, connection: DatabaseConnection) -> None:
"""Add scenarios table."""
connection.execute("""
CREATE TABLE scenarios (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(255) NOT NULL,
server_id VARCHAR(255) NOT NULL,
description TEXT,
config TEXT NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT FALSE,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_by VARCHAR(255),
FOREIGN KEY (server_id) REFERENCES mock_servers(id) ON DELETE CASCADE,
UNIQUE(name, server_id)
)
""")
# Add indexes
connection.execute("""
CREATE INDEX idx_scenarios_name ON scenarios(name)
""")
connection.execute("""
CREATE INDEX idx_scenarios_server_id ON scenarios(server_id)
""")
connection.execute("""
CREATE INDEX idx_scenarios_is_active ON scenarios(is_active)
""")
# Update schema version
connection.execute("""
INSERT INTO schema_version (version, description)
VALUES (2, 'Add scenarios table')
""")
def down(self, connection: DatabaseConnection) -> None:
"""Remove scenarios table."""
connection.execute("DROP TABLE IF EXISTS scenarios")
connection.execute("DELETE FROM schema_version WHERE version = 2")
```
### Modifying Existing Tables
```python
# migrations/migration_003_add_webhook_support.py
class AddWebhookSupport(Migration):
version = 3
description = "Add webhook tables and modify request_logs"
dependencies = [2]
def backup_required(self) -> bool:
"""This migration modifies existing data."""
return True
def up(self, connection: DatabaseConnection) -> None:
"""Add webhook support."""
# Add webhook_url column to request_logs
connection.execute("""
ALTER TABLE request_logs
ADD COLUMN webhook_url TEXT
""")
# Create webhooks table
connection.execute("""
CREATE TABLE webhooks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
server_id VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
url TEXT NOT NULL,
method VARCHAR(10) NOT NULL DEFAULT 'POST',
headers TEXT,
events TEXT NOT NULL,
secret_key VARCHAR(255),
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (server_id) REFERENCES mock_servers(id) ON DELETE CASCADE
)
""")
# Create webhook_deliveries table
connection.execute("""
CREATE TABLE webhook_deliveries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
webhook_id INTEGER NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload TEXT NOT NULL,
response_status INTEGER,
response_body TEXT,
delivery_time_ms INTEGER,
attempt_number INTEGER NOT NULL DEFAULT 1,
success BOOLEAN NOT NULL DEFAULT FALSE,
error_message TEXT,
delivered_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (webhook_id) REFERENCES webhooks(id) ON DELETE CASCADE
)
""")
# Add indexes
connection.execute("""
CREATE INDEX idx_webhooks_server_id ON webhooks(server_id)
""")
connection.execute("""
CREATE INDEX idx_webhook_deliveries_webhook_id ON webhook_deliveries(webhook_id)
""")
# Update schema version
connection.execute("""
INSERT INTO schema_version (version, description)
VALUES (3, 'Add webhook support')
""")
def down(self, connection: DatabaseConnection) -> None:
"""Remove webhook support."""
# Drop webhook tables
connection.execute("DROP TABLE IF EXISTS webhook_deliveries")
connection.execute("DROP TABLE IF EXISTS webhooks")
# Remove webhook_url column (SQLite doesn't support DROP COLUMN)
if connection.database_type == "sqlite":
# Recreate table without webhook_url column
connection.execute("""
CREATE TABLE request_logs_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
method VARCHAR(10) NOT NULL,
path TEXT NOT NULL,
query_params TEXT,
headers TEXT,
request_body TEXT,
response_status INTEGER NOT NULL,
response_headers TEXT,
response_body TEXT,
response_time_ms INTEGER,
server_id VARCHAR(255),
client_ip VARCHAR(45),
user_agent TEXT,
request_id VARCHAR(255),
scenario_name VARCHAR(255),
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
connection.execute("""
INSERT INTO request_logs_new
SELECT id, timestamp, method, path, query_params, headers,
request_body, response_status, response_headers,
response_body, response_time_ms, server_id, client_ip,
user_agent, request_id, scenario_name, created_at, updated_at
FROM request_logs
""")
connection.execute("DROP TABLE request_logs")
connection.execute("ALTER TABLE request_logs_new RENAME TO request_logs")
# Recreate indexes
connection.execute("CREATE INDEX idx_request_logs_timestamp ON request_logs(timestamp)")
connection.execute("CREATE INDEX idx_request_logs_server_id ON request_logs(server_id)")
else:
# PostgreSQL/MySQL support DROP COLUMN
connection.execute("ALTER TABLE request_logs DROP COLUMN webhook_url")
connection.execute("DELETE FROM schema_version WHERE version = 3")
```
## Database-Specific Migrations
### SQLite Migrations
```python
class SQLiteMigration(Migration):
"""Base class for SQLite-specific migrations."""
def recreate_table_without_column(self, connection: DatabaseConnection,
table_name: str, column_to_remove: str) -> None:
"""Helper to remove column from SQLite table."""
# Get table schema
result = connection.execute(f"PRAGMA table_info({table_name})")
columns = [row for row in result if row[1] != column_to_remove]
# Create new table
column_defs = []
for col in columns:
col_def = f"{col[1]} {col[2]}"
if col[3]: # NOT NULL
col_def += " NOT NULL"
if col[4]: # DEFAULT
col_def += f" DEFAULT {col[4]}"
if col[5]: # PRIMARY KEY
col_def += " PRIMARY KEY"
column_defs.append(col_def)
new_table_sql = f"""
CREATE TABLE {table_name}_new (
{', '.join(column_defs)}
)
"""
connection.execute(new_table_sql)
# Copy data
column_names = [col[1] for col in columns]
connection.execute(f"""
INSERT INTO {table_name}_new ({', '.join(column_names)})
SELECT {', '.join(column_names)} FROM {table_name}
""")
# Replace table
connection.execute(f"DROP TABLE {table_name}")
connection.execute(f"ALTER TABLE {table_name}_new RENAME TO {table_name}")
```
### PostgreSQL Migrations
```python
class PostgreSQLMigration(Migration):
"""Base class for PostgreSQL-specific migrations."""
def create_enum(self, connection: DatabaseConnection, enum_name: str, values: List[str]) -> None:
"""Create PostgreSQL enum type."""
values_str = "', '".join(values)
connection.execute(f"CREATE TYPE {enum_name} AS ENUM ('{values_str}')")
def add_column_with_default(self, connection: DatabaseConnection,
table_name: str, column_def: str, default_value: str) -> None:
"""Add column with default value efficiently."""
connection.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_def}")
connection.execute(f"UPDATE {table_name} SET {column_def.split()[0]} = {default_value}")
```
### MySQL Migrations
```python
class MySQLMigration(Migration):
"""Base class for MySQL-specific migrations."""
def modify_column(self, connection: DatabaseConnection,
table_name: str, column_name: str, new_definition: str) -> None:
"""Modify column definition in MySQL."""
connection.execute(f"ALTER TABLE {table_name} MODIFY COLUMN {column_name} {new_definition}")
def add_index_if_not_exists(self, connection: DatabaseConnection,
table_name: str, index_name: str, columns: str) -> None:
"""Add index only if it doesn't exist."""
connection.execute(f"""
CREATE INDEX {index_name} ON {table_name} ({columns})
""")
```
## Migration Management
### Running Migrations
```bash
# Apply all pending migrations
mockloop db migrate
# Migrate to specific version
mockloop db migrate --version 5
# Check migration status
mockloop db status
# Show pending migrations
mockloop db pending
# Validate migrations without applying
mockloop db validate
```
### Migration CLI Commands
```python
class MigrationCLI:
"""Command-line interface for migrations."""
def __init__(self, migration_manager: MigrationManager):
self.manager = migration_manager
async def migrate(self, target_version: Optional[int] = None) -> None:
"""Apply migrations."""
if target_version:
await self.manager.migrate_to_version(target_version)
else:
await self.manager.migrate_to_latest()
async def rollback(self, target_version: int) -> None:
"""Rollback to specific version."""
current_version = await self.manager.get_current_version()
if target_version >= current_version:
print(f"Target version {target_version} is not lower than current version {current_version}")
return
# Confirm rollback
if not self.confirm_rollback(current_version, target_version):
return
await self.manager.rollback_to_version(target_version)
async def status(self) -> None:
"""Show migration status."""
current_version = await self.manager.get_current_version()
pending_migrations = await self.manager.get_pending_migrations()
print(f"Current database version: {current_version}")
print(f"Pending migrations: {len(pending_migrations)}")
for migration in pending_migrations:
print(f" - Version {migration.version}: {migration.description}")
def confirm_rollback(self, current_version: int, target_version: int) -> bool:
"""Confirm rollback operation."""
print(f"WARNING: This will rollback from version {current_version} to {target_version}")
print("This operation may result in data loss.")
response = input("Are you sure you want to continue? (yes/no): ")
return response.lower() == "yes"
```
### Automated Migration Testing
```python
class MigrationTester:
"""Tests migration up and down operations."""
def __init__(self, database_url: str):
self.database_url = database_url
self.test_database_url = self.create_test_database_url()
async def test_migration(self, migration: Migration) -> TestResult:
"""Test a migration's up and down operations."""
# Create test database
test_db = DatabaseConnection(self.test_database_url)
try:
# Apply migration
migration.up(test_db)
# Validate schema
up_validation = await self.validate_schema_after_up(test_db, migration)
# Test rollback
migration.down(test_db)
# Validate rollback
down_validation = await self.validate_schema_after_down(test_db, migration)
return TestResult(
migration_version=migration.version,
up_success=up_validation.success,
down_success=down_validation.success,
errors=up_validation.errors + down_validation.errors
)
finally:
# Cleanup test database
await self.cleanup_test_database(test_db)
async def test_all_migrations(self) -> List[TestResult]:
"""Test all migrations."""
discovery = MigrationDiscovery("./migrations")
migrations = discovery.discover_migrations()
results = []
for migration in migrations:
result = await self.test_migration(migration)
results.append(result)
return results
```
## Data Migrations
### Migrating Existing Data
```python
class DataMigration(Migration):
"""Base class for data migrations."""
def migrate_data_in_batches(self, connection: DatabaseConnection,
query: str, batch_size: int = 1000) -> None:
"""Migrate data in batches to avoid memory issues."""
offset = 0
while True:
batch_query = f"{query} LIMIT {batch_size} OFFSET {offset}"
rows = connection.execute(batch_query).fetchall()
if not rows:
break
self.process_batch(connection, rows)
offset += batch_size
def process_batch(self, connection: DatabaseConnection, rows: List[tuple]) -> None:
"""Process a batch of rows."""
raise NotImplementedError
# Example data migration
class MigrateRequestLogFormat(DataMigration):
version = 4
description = "Migrate request log format from JSON to structured columns"
dependencies = [3]
def backup_required(self) -> bool:
return True
def up(self, connection: DatabaseConnection) -> None:
"""Migrate request log data format."""
# Add new columns
connection.execute("ALTER TABLE request_logs ADD COLUMN parsed_headers TEXT")
connection.execute("ALTER TABLE request_logs ADD COLUMN parsed_query_params TEXT")
# Migrate existing data
self.migrate_data_in_batches(
connection,
"SELECT id, headers, query_params FROM request_logs WHERE parsed_headers IS NULL"
)
# Update schema version
connection.execute("""
INSERT INTO schema_version (version, description)
VALUES (4, 'Migrate request log format')
""")
def process_batch(self, connection: DatabaseConnection, rows: List[tuple]) -> None:
"""Process batch of request logs."""
for row_id, headers_json, query_params_json in rows:
try:
# Parse and restructure data
parsed_headers = self.parse_headers(headers_json)
parsed_query_params = self.parse_query_params(query_params_json)
# Update row
connection.execute("""
UPDATE request_logs
SET parsed_headers = ?, parsed_query_params = ?
WHERE id = ?
""", (parsed_headers, parsed_query_params, row_id))
except Exception as e:
self.logger.error(f"Failed to migrate row {row_id}: {e}")
def parse_headers(self, headers_json: str) -> str:
"""Parse headers JSON into structured format."""
# Implementation for parsing headers
pass
def parse_query_params(self, query_params_json: str) -> str:
"""Parse query parameters JSON into structured format."""
# Implementation for parsing query parameters
pass
def down(self, connection: DatabaseConnection) -> None:
"""Rollback data migration."""
connection.execute("ALTER TABLE request_logs DROP COLUMN parsed_headers")
connection.execute("ALTER TABLE request_logs DROP COLUMN parsed_query_params")
connection.execute("DELETE FROM schema_version WHERE version = 4")
```
## Migration Best Practices
### 1. Migration Safety
```python
class SafeMigration(Migration):
"""Template for safe migrations."""
def validate(self, connection: DatabaseConnection) -> bool:
"""Validate migration can be safely applied."""
# Check database constraints
if not self.check_constraints(connection):
return False
# Check data integrity
if not self.check_data_integrity(connection):
return False
# Check disk space
if not self.check_disk_space(connection):
return False
return True
def check_constraints(self, connection: DatabaseConnection) -> bool:
"""Check database constraints."""
# Verify foreign key constraints
# Check unique constraints
# Validate data types
return True
def check_data_integrity(self, connection: DatabaseConnection) -> bool:
"""Check data integrity before migration."""
# Verify data consistency
# Check for orphaned records
# Validate data formats
return True
def check_disk_space(self, connection: DatabaseConnection) -> bool:
"""Check available disk space."""
# Estimate migration space requirements
# Check available disk space
return True
```
### 2. Backup Integration
```python
class BackupManager:
"""Manages database backups for migrations."""
def __init__(self, database_url: str, backup_path: str):
self.database_url = database_url
self.backup_path = Path(backup_path)
async def create_backup(self, migration_version: int) -> str:
"""Create database backup before migration."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"backup_v{migration_version}_{timestamp}.sql"
backup_file = self.backup_path / backup_filename
# Create backup based on database type
if self.database_url.startswith("sqlite"):
await self.backup_sqlite(backup_file)
elif self.database_url.startswith("postgresql"):
await self.backup_postgresql(backup_file)
elif self.database_url.startswith("mysql"):
await self.backup_mysql(backup_file)
return str(backup_file)
async def restore_backup(self, backup_file: str) -> bool:
"""Restore database from backup."""
# Implementation for restoring backup
pass
```
### 3. Migration Monitoring
```python
class MigrationMonitor:
"""Monitors migration progress and performance."""
def __init__(self):
self.start_time = None
self.metrics = {}
def start_migration(self, migration: Migration) -> None:
"""Start monitoring migration."""
self.start_time = time.time()
self.metrics = {
"version": migration.version,
"description": migration.description,
"start_time": self.start_time
}
def end_migration(self, success: bool) -> None:
"""End monitoring migration."""
end_time = time.time()
self.metrics.update({
"end_time": end_time,
"duration": end_time - self.start_time,
"success": success
})
# Log metrics
self.log_metrics()
def log_metrics(self) -> None:
"""Log migration metrics."""
logger.info(f"Migration {self.metrics['version']} completed", extra=self.metrics)
```
## Troubleshooting Migrations
### Common Issues
#### 1. Failed Migration Recovery
```python
class MigrationRecovery:
"""Handles migration failure recovery."""
async def recover_from_failed_migration(self, migration_version: int) -> bool:
"""Recover from failed migration."""
# Check migration state
state = await self.check_migration_state(migration_version)
if state == "partial":
# Attempt to complete migration
return await self.complete_partial_migration(migration_version)
elif state == "failed":
# Rollback failed migration
return await self.rollback_failed_migration(migration_version)
return False
async def check_migration_state(self, migration_version: int) -> str:
"""Check the state of a migration."""
# Check schema_version table
# Verify expected schema changes
# Check for partial data migration
pass
```
#### 2. Schema Validation
```python
class SchemaValidator:
"""Validates database schema after migrations."""
def __init__(self, connection: DatabaseConnection):
self.connection = connection
async def validate_schema(self, expected_version: int) -> ValidationResult:
"""Validate database schema matches expected version."""
# Check schema version
current_version = await self.get_schema_version()
if current_version != expected_version:
return ValidationResult(False, f"Version mismatch: {current_version} != {expected_version}")
# Validate table structure
table_validation = await self.validate_tables()
if not table_validation.success:
return table_validation
# Validate indexes
index_validation = await self.validate_indexes()
if not index_validation.success:
return index_validation
# Validate constraints
constraint_validation = await self.validate_constraints()
if not constraint_validation.success:
return constraint_validation
return ValidationResult(True, "Schema validation passed")
```
### Migration Debugging
```bash
# Enable debug logging
export MOCKLOOP_LOG_LEVEL=debug
# Run migration with verbose output
mockloop db migrate --verbose
# Check migration history
mockloop db history
# Validate current schema
mockloop db validate-schema
# Show migration details
mockloop db show-migration --version 3
```
## See Also
- **[Database Schema](../api/database-schema.md)**: Complete database schema reference
- **[Configuration Options](../api/configuration.md)**: Database configuration options
- **[Architecture](architecture.md)**: System architecture overview
- **[Troubleshooting](troubleshooting.md)**: General troubleshooting guide