"""
Migration utilities for upgrading from ChromaDB to PostgreSQL.
This module provides utilities to help users migrate from the old ChromaDB-based
semantic search system to the new PostgreSQL + pg-vector implementation.
"""
import os
import json
import logging
from pathlib import Path
from typing import Dict, Any, Optional, List
import shutil
from .db_schema import DatabaseManager, create_database_manager
from .embedding_service import create_embedding_provider
from .semantic_search import create_semantic_search
logger = logging.getLogger(__name__)
class MigrationError(Exception):
"""Custom exception for migration errors."""
pass
class ZoteroMigrationManager:
"""Manages migration from ChromaDB to PostgreSQL system."""
def __init__(self, config_path: Optional[str] = None):
"""
Initialize migration manager.
Args:
config_path: Path to configuration file
"""
self.config_path = config_path or str(Path.home() / ".config" / "zotero-mcp" / "config.json")
self.config_dir = Path(self.config_path).parent
def detect_old_installation(self) -> Dict[str, Any]:
"""
Detect if there's an old ChromaDB installation.
Returns:
Dict with detection results
"""
detection = {
"has_old_chroma_db": False,
"chroma_db_path": None,
"has_old_config": False,
"old_config_path": None,
"estimated_items": 0,
"recommendations": []
}
# Look for ChromaDB data directories
possible_chroma_paths = [
self.config_dir / "chroma_db",
Path.home() / ".local" / "share" / "zotero-mcp" / "chroma_db",
Path.home() / ".cache" / "zotero-mcp" / "chroma_db"
]
for path in possible_chroma_paths:
if path.exists() and path.is_dir():
detection["has_old_chroma_db"] = True
detection["chroma_db_path"] = str(path)
# Estimate number of items
try:
# Look for ChromaDB files
db_files = list(path.rglob("*.sqlite*"))
if db_files:
detection["estimated_items"] = len(db_files) * 100 # Rough estimate
except Exception:
pass
break
# Look for old configuration
if os.path.exists(self.config_path):
try:
with open(self.config_path, 'r') as f:
config = json.load(f)
# Check if it has old-style configuration
if "semantic_search" in config:
old_config = config["semantic_search"]
if "embedding_model" in old_config:
detection["has_old_config"] = True
detection["old_config_path"] = self.config_path
except Exception:
pass
# Generate recommendations
if detection["has_old_chroma_db"]:
detection["recommendations"].append(
"Old ChromaDB database found. You'll need to rebuild the semantic search database."
)
if detection["has_old_config"]:
detection["recommendations"].append(
"Old configuration found. Migration will update to new PostgreSQL configuration."
)
if not detection["has_old_chroma_db"] and not detection["has_old_config"]:
detection["recommendations"].append(
"No old installation detected. You can proceed with fresh setup."
)
return detection
def backup_old_data(self) -> Dict[str, Any]:
"""
Backup old ChromaDB data and configuration.
Returns:
Dict with backup results
"""
backup_results = {
"config_backed_up": False,
"data_backed_up": False,
"backup_dir": None,
"errors": []
}
try:
# Create backup directory
backup_dir = self.config_dir / f"backup_chromadb_{int(os.time.time())}"
backup_dir.mkdir(parents=True, exist_ok=True)
backup_results["backup_dir"] = str(backup_dir)
# Backup configuration
if os.path.exists(self.config_path):
config_backup = backup_dir / "config.json.backup"
shutil.copy2(self.config_path, config_backup)
backup_results["config_backed_up"] = True
logger.info(f"Backed up configuration to {config_backup}")
# Backup ChromaDB data
chroma_db_path = self.config_dir / "chroma_db"
if chroma_db_path.exists():
data_backup = backup_dir / "chroma_db"
shutil.copytree(chroma_db_path, data_backup)
backup_results["data_backed_up"] = True
logger.info(f"Backed up ChromaDB data to {data_backup}")
except Exception as e:
error_msg = f"Error during backup: {e}"
backup_results["errors"].append(error_msg)
logger.error(error_msg)
return backup_results
def validate_postgresql_setup(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate PostgreSQL setup and connectivity.
Args:
config: Database configuration
Returns:
Dict with validation results
"""
validation = {
"connection_successful": False,
"pg_vector_available": False,
"schema_exists": False,
"errors": [],
"recommendations": []
}
try:
# Test database connection
db_manager = create_database_manager(config.get("database", {}))
# Check if database exists
if db_manager.check_database_exists():
validation["connection_successful"] = True
# Check pg-vector extension
if db_manager.check_pg_vector_extension():
validation["pg_vector_available"] = True
else:
validation["recommendations"].append(
"pg-vector extension needs to be installed: CREATE EXTENSION vector;"
)
# Check if schema exists
status = db_manager.get_database_status()
if status.get("total_items", 0) >= 0: # Schema exists if we can query
validation["schema_exists"] = True
else:
validation["errors"].append("Cannot connect to PostgreSQL database")
validation["recommendations"].append(
f"Ensure PostgreSQL is running and database '{config['database']['database']}' exists"
)
except Exception as e:
validation["errors"].append(str(e))
validation["recommendations"].append(
"Check PostgreSQL connection settings and ensure database is accessible"
)
return validation
def create_migration_config(self,
database_config: Dict[str, Any],
embedding_config: Dict[str, Any]) -> Dict[str, Any]:
"""
Create new configuration file for PostgreSQL-based system.
Args:
database_config: PostgreSQL database configuration
embedding_config: Embedding provider configuration
Returns:
Complete configuration dictionary
"""
config = {
"database": {
"host": database_config.get("host", "192.168.1.173"),
"port": database_config.get("port", 5432),
"database": database_config.get("database", "zotero_mcp_test"),
"username": database_config.get("username", "zotero_mcp_test"),
"password": database_config.get("password", "jt24jtiowjeiofjoi"),
"schema": database_config.get("schema", "public"),
"pool_size": database_config.get("pool_size", 5)
},
"embedding": embedding_config,
"chunking": {
"chunk_size": 1000,
"overlap": 100,
"min_chunk_size": 100,
"max_chunks_per_item": 10,
"chunking_strategy": "sentences"
},
"semantic_search": {
"similarity_threshold": 0.7,
"max_results": 50,
"update_config": {
"auto_update": False,
"update_frequency": "manual",
"batch_size": 50,
"parallel_workers": 4
}
}
}
return config
def save_migration_config(self, config: Dict[str, Any]) -> bool:
"""
Save new configuration to file.
Args:
config: Configuration dictionary
Returns:
True if successful, False otherwise
"""
try:
# Ensure config directory exists
self.config_dir.mkdir(parents=True, exist_ok=True)
# Save configuration
with open(self.config_path, 'w') as f:
json.dump(config, f, indent=2)
logger.info(f"Migration configuration saved to {self.config_path}")
return True
except Exception as e:
logger.error(f"Error saving migration configuration: {e}")
return False
def initialize_postgresql_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""
Initialize PostgreSQL database with schema.
Args:
config: Database configuration
Returns:
Dict with initialization results
"""
results = {
"schema_created": False,
"extension_installed": False,
"errors": []
}
try:
db_manager = create_database_manager(config.get("database", {}))
# Initialize database
db_manager.initialize_database()
results["schema_created"] = True
results["extension_installed"] = True
logger.info("PostgreSQL database initialized successfully")
except Exception as e:
error_msg = f"Error initializing PostgreSQL database: {e}"
results["errors"].append(error_msg)
logger.error(error_msg)
return results
def cleanup_old_data(self, remove_chroma_db: bool = False) -> Dict[str, Any]:
"""
Clean up old ChromaDB data.
Args:
remove_chroma_db: Whether to remove ChromaDB directory
Returns:
Dict with cleanup results
"""
cleanup_results = {
"chroma_db_removed": False,
"cache_cleared": False,
"errors": []
}
try:
# Remove ChromaDB directory if requested
if remove_chroma_db:
chroma_db_path = self.config_dir / "chroma_db"
if chroma_db_path.exists():
shutil.rmtree(chroma_db_path)
cleanup_results["chroma_db_removed"] = True
logger.info(f"Removed old ChromaDB data at {chroma_db_path}")
# Clear any cache directories
cache_dirs = [
self.config_dir / "cache",
Path.home() / ".cache" / "zotero-mcp"
]
for cache_dir in cache_dirs:
if cache_dir.exists():
shutil.rmtree(cache_dir)
cleanup_results["cache_cleared"] = True
logger.info(f"Cleared cache directory {cache_dir}")
except Exception as e:
error_msg = f"Error during cleanup: {e}"
cleanup_results["errors"].append(error_msg)
logger.error(error_msg)
return cleanup_results
def run_full_migration(self,
database_config: Dict[str, Any],
embedding_config: Dict[str, Any],
backup_old_data: bool = True,
remove_old_data: bool = False) -> Dict[str, Any]:
"""
Run complete migration from ChromaDB to PostgreSQL.
Args:
database_config: PostgreSQL configuration
embedding_config: Embedding provider configuration
backup_old_data: Whether to backup old data
remove_old_data: Whether to remove old data after migration
Returns:
Dict with complete migration results
"""
migration_results = {
"detection": {},
"backup": {},
"validation": {},
"config_created": False,
"database_initialized": False,
"cleanup": {},
"success": False,
"errors": [],
"recommendations": []
}
try:
# Step 1: Detect old installation
logger.info("Step 1: Detecting old installation...")
migration_results["detection"] = self.detect_old_installation()
# Step 2: Backup old data if requested
if backup_old_data and migration_results["detection"]["has_old_chroma_db"]:
logger.info("Step 2: Backing up old data...")
migration_results["backup"] = self.backup_old_data()
if migration_results["backup"]["errors"]:
migration_results["errors"].extend(migration_results["backup"]["errors"])
# Step 3: Validate PostgreSQL setup
logger.info("Step 3: Validating PostgreSQL setup...")
config = self.create_migration_config(database_config, embedding_config)
migration_results["validation"] = self.validate_postgresql_setup(config)
if not migration_results["validation"]["connection_successful"]:
raise MigrationError("PostgreSQL validation failed")
# Step 4: Create new configuration
logger.info("Step 4: Creating migration configuration...")
if self.save_migration_config(config):
migration_results["config_created"] = True
else:
raise MigrationError("Failed to save migration configuration")
# Step 5: Initialize PostgreSQL database
logger.info("Step 5: Initializing PostgreSQL database...")
init_results = self.initialize_postgresql_database(config)
migration_results["database_initialized"] = init_results["schema_created"]
if init_results["errors"]:
migration_results["errors"].extend(init_results["errors"])
if not migration_results["database_initialized"]:
raise MigrationError("Failed to initialize PostgreSQL database")
# Step 6: Cleanup old data if requested
if remove_old_data:
logger.info("Step 6: Cleaning up old data...")
migration_results["cleanup"] = self.cleanup_old_data(remove_chroma_db=True)
if migration_results["cleanup"]["errors"]:
migration_results["errors"].extend(migration_results["cleanup"]["errors"])
# Step 7: Final recommendations
migration_results["recommendations"].extend([
"Migration completed successfully!",
"Run 'zotero-mcp update-database --force-rebuild' to populate the new database",
"The semantic search will use the new PostgreSQL + pg-vector backend",
f"Configuration saved to: {self.config_path}"
])
if backup_old_data and migration_results["backup"].get("backup_dir"):
migration_results["recommendations"].append(
f"Old data backed up to: {migration_results['backup']['backup_dir']}"
)
migration_results["success"] = True
logger.info("Migration completed successfully!")
except Exception as e:
error_msg = f"Migration failed: {e}"
migration_results["errors"].append(error_msg)
migration_results["success"] = False
logger.error(error_msg)
return migration_results
def create_migration_manager(config_path: Optional[str] = None) -> ZoteroMigrationManager:
"""
Create a migration manager instance.
Args:
config_path: Path to configuration file
Returns:
Configured ZoteroMigrationManager instance
"""
return ZoteroMigrationManager(config_path)
def detect_migration_needed() -> bool:
"""
Quick check if migration is needed.
Returns:
True if migration appears to be needed
"""
manager = create_migration_manager()
detection = manager.detect_old_installation()
return detection["has_old_chroma_db"] or detection["has_old_config"]