Skip to main content
Glama

MCP Memory Service

#!/usr/bin/env python3 # Copyright 2024 Heinrich Krupp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Enhanced ChromaDB to SQLite-vec Migration Script for v5.0.0+ This script provides a robust migration path from ChromaDB to SQLite-vec with: - Custom data path support - Proper content hash generation - Tag format validation and correction - Progress indicators - Transaction-based migration with rollback - Dry-run mode for testing - Comprehensive error handling """ import argparse import asyncio import hashlib import json import logging import os import sqlite3 import sys import tempfile import time from datetime import datetime from pathlib import Path from typing import List, Dict, Any, Optional, Union, Tuple # Try importing with progress bar support try: from tqdm import tqdm TQDM_AVAILABLE = True except ImportError: TQDM_AVAILABLE = False print("Note: Install 'tqdm' for progress bars: pip install tqdm") # Add project root to path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root / "src")) # Import storage modules try: from mcp_memory_service.storage.chroma import ChromaMemoryStorage from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage from mcp_memory_service.models.memory import Memory from mcp_memory_service.utils.hashing import generate_content_hash except ImportError as e: print(f"Error importing MCP modules: {e}") print("Make sure you're running this from the MCP Memory Service directory") sys.exit(1) # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class MigrationConfig: """Configuration for migration process.""" def __init__(self): self.chroma_path: Optional[str] = None self.sqlite_path: Optional[str] = None self.batch_size: int = 50 self.dry_run: bool = False self.skip_duplicates: bool = True self.backup_path: Optional[str] = None self.verbose: bool = False self.validate_only: bool = False self.force: bool = False @classmethod def from_args(cls, args) -> 'MigrationConfig': """Create config from command line arguments.""" config = cls() config.chroma_path = args.chroma_path config.sqlite_path = args.sqlite_path config.batch_size = args.batch_size config.dry_run = args.dry_run config.skip_duplicates = not args.no_skip_duplicates config.backup_path = args.backup config.verbose = args.verbose config.validate_only = args.validate_only config.force = args.force return config def resolve_paths(self): """Resolve and validate data paths.""" # Resolve ChromaDB path if not self.chroma_path: # Check environment variable first self.chroma_path = os.environ.get('MCP_MEMORY_CHROMA_PATH') if not self.chroma_path: # Use default locations based on platform home = Path.home() if sys.platform == 'darwin': # macOS default_base = home / 'Library' / 'Application Support' / 'mcp-memory' elif sys.platform == 'win32': # Windows default_base = Path(os.getenv('LOCALAPPDATA', '')) / 'mcp-memory' else: # Linux default_base = home / '.local' / 'share' / 'mcp-memory' # Try multiple possible locations possible_paths = [ home / '.mcp_memory_chroma', # Legacy location default_base / 'chroma_db', # New standard location Path.cwd() / 'chroma_db', # Current directory ] for path in possible_paths: if path.exists(): self.chroma_path = str(path) logger.info(f"Found ChromaDB at: {path}") break if not self.chroma_path: raise ValueError( "Could not find ChromaDB data. Please specify --chroma-path or " "set MCP_MEMORY_CHROMA_PATH environment variable" ) # Resolve SQLite path if not self.sqlite_path: # Check environment variable first self.sqlite_path = os.environ.get('MCP_MEMORY_SQLITE_PATH') if not self.sqlite_path: # Default to same directory as ChromaDB with different name chroma_dir = Path(self.chroma_path).parent self.sqlite_path = str(chroma_dir / 'sqlite_vec.db') logger.info(f"Using default SQLite path: {self.sqlite_path}") # Resolve backup path if needed if self.backup_path is None and not self.dry_run and not self.validate_only: backup_dir = Path(self.sqlite_path).parent / 'backups' backup_dir.mkdir(exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') self.backup_path = str(backup_dir / f'migration_backup_{timestamp}.json') class EnhancedMigrationTool: """Enhanced migration tool with proper error handling and progress tracking.""" def __init__(self, config: MigrationConfig): self.config = config self.stats = { 'total': 0, 'migrated': 0, 'skipped': 0, 'failed': 0, 'errors': [] } self.chroma_storage = None self.sqlite_storage = None def generate_proper_content_hash(self, content: str) -> str: """Generate a proper SHA256 content hash.""" return hashlib.sha256(content.encode()).hexdigest() def validate_and_fix_tags(self, tags: Any) -> List[str]: """Validate and fix tag format.""" if not tags: return [] if isinstance(tags, str): # Handle comma-separated string if ',' in tags: return [tag.strip() for tag in tags.split(',') if tag.strip()] # Handle single tag return [tags.strip()] if tags.strip() else [] if isinstance(tags, list): # Clean and validate list tags clean_tags = [] for tag in tags: if isinstance(tag, str) and tag.strip(): clean_tags.append(tag.strip()) return clean_tags # Unknown format - log warning and return empty logger.warning(f"Unknown tag format: {type(tags)} - {tags}") return [] def safe_timestamp_convert(self, timestamp: Any) -> float: """Safely convert various timestamp formats to float.""" if timestamp is None: return datetime.now().timestamp() # Handle numeric timestamps if isinstance(timestamp, (int, float)): # Check if timestamp is reasonable (between 2000 and 2100) if 946684800 <= float(timestamp) <= 4102444800: return float(timestamp) else: logger.warning(f"Timestamp {timestamp} out of reasonable range, using current time") return datetime.now().timestamp() # Handle string timestamps if isinstance(timestamp, str): # Try ISO format for fmt in [ '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d %H:%M:%S', ]: try: dt = datetime.strptime(timestamp.rstrip('Z'), fmt) return dt.timestamp() except ValueError: continue # Try parsing as float string try: ts = float(timestamp) if 946684800 <= ts <= 4102444800: return ts except ValueError: pass # Fallback to current time logger.warning(f"Could not parse timestamp '{timestamp}', using current time") return datetime.now().timestamp() async def extract_memories_from_chroma(self) -> List[Dict[str, Any]]: """Extract all memories from ChromaDB with proper error handling.""" memories = [] try: # Initialize ChromaDB storage logger.info("Connecting to ChromaDB...") self.chroma_storage = ChromaMemoryStorage(path=self.config.chroma_path) # Access the collection directly collection = self.chroma_storage.collection if not collection: raise ValueError("ChromaDB collection not initialized") # Get all data from collection logger.info("Extracting memories from ChromaDB...") results = collection.get(include=['documents', 'metadatas', 'embeddings']) if not results or not results.get('ids'): logger.warning("No memories found in ChromaDB") return memories total = len(results['ids']) logger.info(f"Found {total} memories to process") # Process each memory for i in range(total): try: # Extract data with defaults doc_id = results['ids'][i] content = results['documents'][i] if i < len(results.get('documents', [])) else "" metadata = results['metadatas'][i] if i < len(results.get('metadatas', [])) else {} embedding = results['embeddings'][i] if i < len(results.get('embeddings', [])) else None if not content: logger.warning(f"Skipping memory {doc_id}: empty content") continue # Generate proper content hash content_hash = self.generate_proper_content_hash(content) # Extract and validate tags raw_tags = metadata.get('tags', metadata.get('tags_str', [])) tags = self.validate_and_fix_tags(raw_tags) # Extract timestamps created_at = self.safe_timestamp_convert( metadata.get('created_at', metadata.get('timestamp')) ) updated_at = self.safe_timestamp_convert( metadata.get('updated_at', created_at) ) # Extract memory type memory_type = metadata.get('memory_type', metadata.get('type', 'imported')) # Clean metadata (remove special fields) clean_metadata = {} exclude_keys = { 'tags', 'tags_str', 'created_at', 'updated_at', 'timestamp', 'timestamp_float', 'timestamp_str', 'memory_type', 'type', 'content_hash', 'created_at_iso', 'updated_at_iso' } for key, value in metadata.items(): if key not in exclude_keys and value is not None: clean_metadata[key] = value # Create memory record memory_data = { 'content': content, 'content_hash': content_hash, 'tags': tags, 'memory_type': memory_type, 'metadata': clean_metadata, 'embedding': embedding, 'created_at': created_at, 'updated_at': updated_at, 'original_id': doc_id # Keep for reference } memories.append(memory_data) if self.config.verbose and (i + 1) % 100 == 0: logger.info(f"Processed {i + 1}/{total} memories") except Exception as e: logger.error(f"Failed to extract memory {i}: {e}") self.stats['errors'].append(f"Extract error at index {i}: {str(e)}") continue logger.info(f"Successfully extracted {len(memories)} memories") return memories except Exception as e: logger.error(f"Critical error extracting from ChromaDB: {e}") raise async def migrate_to_sqlite(self, memories: List[Dict[str, Any]]) -> bool: """Migrate memories to SQLite-vec with transaction support.""" if not memories: logger.warning("No memories to migrate") return True try: # Initialize SQLite-vec storage logger.info(f"Initializing SQLite-vec at {self.config.sqlite_path}") self.sqlite_storage = SqliteVecMemoryStorage( db_path=self.config.sqlite_path, embedding_model=os.environ.get('MCP_MEMORY_EMBEDDING_MODEL', 'all-MiniLM-L6-v2') ) await self.sqlite_storage.initialize() # Start transaction conn = self.sqlite_storage.conn conn.execute("BEGIN TRANSACTION") try: # Migrate memories in batches total = len(memories) batch_size = self.config.batch_size # Use progress bar if available if TQDM_AVAILABLE and not self.config.dry_run: progress_bar = tqdm(total=total, desc="Migrating memories") else: progress_bar = None for i in range(0, total, batch_size): batch = memories[i:i + batch_size] if not self.config.dry_run: for memory_data in batch: try: # Check for duplicates if self.config.skip_duplicates: existing = conn.execute( "SELECT 1 FROM memories WHERE content_hash = ? LIMIT 1", (memory_data['content_hash'],) ).fetchone() if existing: self.stats['skipped'] += 1 if progress_bar: progress_bar.update(1) continue # Create Memory object memory = Memory( content=memory_data['content'], content_hash=memory_data['content_hash'], tags=memory_data['tags'], memory_type=memory_data.get('memory_type'), metadata=memory_data.get('metadata', {}), created_at=memory_data['created_at'], updated_at=memory_data['updated_at'] ) # Store memory success, message = await self.sqlite_storage.store(memory) if success: self.stats['migrated'] += 1 else: raise Exception(f"Failed to store: {message}") if progress_bar: progress_bar.update(1) except Exception as e: self.stats['failed'] += 1 self.stats['errors'].append( f"Migration error for {memory_data['content_hash'][:8]}: {str(e)}" ) if progress_bar: progress_bar.update(1) else: # Dry run - just count self.stats['migrated'] += len(batch) if progress_bar: progress_bar.update(len(batch)) if progress_bar: progress_bar.close() # Commit transaction if not self.config.dry_run: conn.execute("COMMIT") logger.info("Transaction committed successfully") else: conn.execute("ROLLBACK") logger.info("Dry run - transaction rolled back") return True except Exception as e: # Rollback on error conn.execute("ROLLBACK") logger.error(f"Migration failed, transaction rolled back: {e}") raise except Exception as e: logger.error(f"Critical error during migration: {e}") return False finally: # Clean up if self.sqlite_storage: self.sqlite_storage.close() async def create_backup(self, memories: List[Dict[str, Any]]): """Create a JSON backup of memories.""" if not self.config.backup_path or self.config.dry_run: return logger.info(f"Creating backup at {self.config.backup_path}") backup_data = { 'version': '2.0', 'created_at': datetime.now().isoformat(), 'source': self.config.chroma_path, 'total_memories': len(memories), 'memories': memories } # Remove embeddings from backup to reduce size for memory in backup_data['memories']: memory.pop('embedding', None) with open(self.config.backup_path, 'w') as f: json.dump(backup_data, f, indent=2, default=str) logger.info(f"Backup created: {self.config.backup_path}") async def validate_migration(self) -> bool: """Validate the migrated data.""" logger.info("Validating migration...") try: # Connect to SQLite database conn = sqlite3.connect(self.config.sqlite_path) # Check memory count count = conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0] logger.info(f"SQLite database contains {count} memories") # Check for required fields sample = conn.execute(""" SELECT content_hash, content, tags, created_at FROM memories LIMIT 10 """).fetchall() issues = [] for row in sample: if not row[0]: # content_hash issues.append("Missing content_hash") if not row[1]: # content issues.append("Missing content") if row[3] is None: # created_at issues.append("Missing created_at") conn.close() if issues: logger.warning(f"Validation issues found: {', '.join(set(issues))}") return False logger.info("Validation passed!") return True except Exception as e: logger.error(f"Validation failed: {e}") return False async def run(self) -> bool: """Run the migration process.""" try: # Resolve paths self.config.resolve_paths() # Print configuration print("\n" + "="*60) print("MCP Memory Service - Enhanced Migration Tool v2.0") print("="*60) print(f"ChromaDB source: {self.config.chroma_path}") print(f"SQLite-vec target: {self.config.sqlite_path}") if self.config.backup_path: print(f"Backup location: {self.config.backup_path}") print(f"Mode: {'DRY RUN' if self.config.dry_run else 'LIVE MIGRATION'}") print(f"Batch size: {self.config.batch_size}") print(f"Skip duplicates: {self.config.skip_duplicates}") print() # Check if validation only if self.config.validate_only: return await self.validate_migration() # Check if target exists if Path(self.config.sqlite_path).exists() and not self.config.force: if not self.config.dry_run: response = input(f"Target database exists. Overwrite? (y/N): ") if response.lower() != 'y': print("Migration cancelled") return False # Extract memories from ChromaDB memories = await self.extract_memories_from_chroma() self.stats['total'] = len(memories) if not memories: print("No memories found to migrate") return True # Create backup if self.config.backup_path and not self.config.dry_run: await self.create_backup(memories) # Confirm migration if not self.config.dry_run and not self.config.force: print(f"\nAbout to migrate {len(memories)} memories") response = input("Proceed? (y/N): ") if response.lower() != 'y': print("Migration cancelled") return False # Perform migration success = await self.migrate_to_sqlite(memories) # Print summary print("\n" + "="*60) print("MIGRATION SUMMARY") print("="*60) print(f"Total memories found: {self.stats['total']}") print(f"Successfully migrated: {self.stats['migrated']}") print(f"Duplicates skipped: {self.stats['skipped']}") print(f"Failed migrations: {self.stats['failed']}") if self.stats['errors'] and self.config.verbose: print("\nErrors encountered:") for i, error in enumerate(self.stats['errors'][:10], 1): print(f" {i}. {error}") if len(self.stats['errors']) > 10: print(f" ... and {len(self.stats['errors']) - 10} more") if success and not self.config.dry_run: # Validate migration if await self.validate_migration(): print("\n✅ Migration completed successfully!") print("\nNext steps:") print("1. Set environment variable:") print(" export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec") print(f"2. Set database path:") print(f" export MCP_MEMORY_SQLITE_PATH={self.config.sqlite_path}") print("3. Restart MCP Memory Service") print("4. Test that your memories are accessible") else: print("\n⚠️ Migration completed with validation warnings") return success except Exception as e: logger.error(f"Migration failed: {e}") if self.config.verbose: import traceback traceback.print_exc() return False def main(): """Main entry point.""" parser = argparse.ArgumentParser( description="Enhanced ChromaDB to SQLite-vec migration tool", formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( '--chroma-path', help='Path to ChromaDB data directory (default: auto-detect)' ) parser.add_argument( '--sqlite-path', help='Path for SQLite-vec database (default: same dir as ChromaDB)' ) parser.add_argument( '--batch-size', type=int, default=50, help='Number of memories to migrate per batch (default: 50)' ) parser.add_argument( '--dry-run', action='store_true', help='Simulate migration without making changes' ) parser.add_argument( '--no-skip-duplicates', action='store_true', help='Migrate all memories including duplicates' ) parser.add_argument( '--backup', help='Path for JSON backup file (default: auto-generate)' ) parser.add_argument( '--verbose', action='store_true', help='Enable verbose logging' ) parser.add_argument( '--validate-only', action='store_true', help='Only validate existing SQLite database' ) parser.add_argument( '--force', action='store_true', help='Skip confirmation prompts' ) args = parser.parse_args() # Set logging level if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Create configuration config = MigrationConfig.from_args(args) # Run migration tool = EnhancedMigrationTool(config) success = asyncio.run(tool.run()) sys.exit(0 if success else 1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server