Skip to main content
Glama

MCP Memory Service

#!/usr/bin/env python3 """ Migration script for moving data to Cloudflare backend. Supports migration from SQLite-vec and ChromaDB backends. """ import asyncio import json import logging import os import sys import time from pathlib import Path from typing import List, Dict, Any, Optional import argparse # Add src to path for imports sys.path.insert(0, str(Path(__file__).parent.parent / 'src')) from mcp_memory_service.models.memory import Memory from mcp_memory_service.utils.hashing import generate_content_hash # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class DataMigrator: """Handles migration of data to Cloudflare backend.""" def __init__(self): self.source_storage = None self.cloudflare_storage = None async def export_from_sqlite_vec(self, sqlite_path: str) -> List[Dict[str, Any]]: """Export data from SQLite-vec backend.""" logger.info(f"Exporting data from SQLite-vec: {sqlite_path}") try: from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage storage = SqliteVecMemoryStorage(sqlite_path) await storage.initialize() # Get all memories memories = [] stats = await storage.get_stats() total_memories = stats.get('total_memories', 0) logger.info(f"Found {total_memories} memories to export") # Get recent memories in batches batch_size = 100 exported_count = 0 while exported_count < total_memories: batch = await storage.get_recent_memories(batch_size) if not batch: break for memory in batch: memory_data = { 'content': memory.content, 'content_hash': memory.content_hash, 'tags': memory.tags, 'memory_type': memory.memory_type, 'metadata': memory.metadata, 'created_at': memory.created_at, 'created_at_iso': memory.created_at_iso, 'updated_at': memory.updated_at, 'updated_at_iso': memory.updated_at_iso } memories.append(memory_data) exported_count += 1 logger.info(f"Exported {exported_count}/{total_memories} memories") # Break if we got fewer memories than batch size if len(batch) < batch_size: break logger.info(f"Successfully exported {len(memories)} memories from SQLite-vec") return memories except Exception as e: logger.error(f"Failed to export from SQLite-vec: {e}") raise async def export_from_chroma(self, chroma_path: str) -> List[Dict[str, Any]]: """Export data from ChromaDB backend.""" logger.info(f"Exporting data from ChromaDB: {chroma_path}") try: from mcp_memory_service.storage.chroma import ChromaMemoryStorage storage = ChromaMemoryStorage(chroma_path, preload_model=False) await storage.initialize() # Get all memories memories = [] stats = await storage.get_stats() total_memories = stats.get('total_memories', 0) logger.info(f"Found {total_memories} memories to export") # Get recent memories recent_memories = await storage.get_recent_memories(total_memories) for memory in recent_memories: memory_data = { 'content': memory.content, 'content_hash': memory.content_hash, 'tags': memory.tags, 'memory_type': memory.memory_type, 'metadata': memory.metadata, 'created_at': memory.created_at, 'created_at_iso': memory.created_at_iso, 'updated_at': memory.updated_at, 'updated_at_iso': memory.updated_at_iso } memories.append(memory_data) logger.info(f"Successfully exported {len(memories)} memories from ChromaDB") return memories except Exception as e: logger.error(f"Failed to export from ChromaDB: {e}") raise async def import_to_cloudflare(self, memories: List[Dict[str, Any]]) -> bool: """Import data to Cloudflare backend.""" logger.info(f"Importing {len(memories)} memories to Cloudflare backend") try: # Initialize Cloudflare storage from mcp_memory_service.storage.cloudflare import CloudflareStorage # Get configuration from environment api_token = os.getenv('CLOUDFLARE_API_TOKEN') account_id = os.getenv('CLOUDFLARE_ACCOUNT_ID') vectorize_index = os.getenv('CLOUDFLARE_VECTORIZE_INDEX') d1_database_id = os.getenv('CLOUDFLARE_D1_DATABASE_ID') r2_bucket = os.getenv('CLOUDFLARE_R2_BUCKET') if not all([api_token, account_id, vectorize_index, d1_database_id]): raise ValueError("Missing required Cloudflare environment variables") storage = CloudflareStorage( api_token=api_token, account_id=account_id, vectorize_index=vectorize_index, d1_database_id=d1_database_id, r2_bucket=r2_bucket ) await storage.initialize() # Import memories in batches batch_size = 10 # Smaller batches for Cloudflare API limits imported_count = 0 failed_count = 0 for i in range(0, len(memories), batch_size): batch = memories[i:i + batch_size] for memory_data in batch: try: # Convert to Memory object memory = Memory( content=memory_data['content'], content_hash=memory_data['content_hash'], tags=memory_data.get('tags', []), memory_type=memory_data.get('memory_type'), metadata=memory_data.get('metadata', {}), created_at=memory_data.get('created_at'), created_at_iso=memory_data.get('created_at_iso'), updated_at=memory_data.get('updated_at'), updated_at_iso=memory_data.get('updated_at_iso') ) # Store in Cloudflare success, message = await storage.store(memory) if success: imported_count += 1 logger.debug(f"Imported memory: {memory.content_hash[:16]}...") else: failed_count += 1 logger.warning(f"Failed to import memory {memory.content_hash[:16]}: {message}") except Exception as e: failed_count += 1 logger.error(f"Error importing memory: {e}") # Progress update processed = min(i + batch_size, len(memories)) logger.info(f"Progress: {processed}/{len(memories)} processed, {imported_count} imported, {failed_count} failed") # Rate limiting - small delay between batches await asyncio.sleep(0.5) # Final cleanup await storage.close() logger.info(f"Migration completed: {imported_count} imported, {failed_count} failed") return failed_count == 0 except Exception as e: logger.error(f"Failed to import to Cloudflare: {e}") raise async def export_to_file(self, source_backend: str, source_path: str, output_file: str) -> bool: """Export data from source backend to JSON file.""" try: if source_backend == 'sqlite_vec': memories = await self.export_from_sqlite_vec(source_path) elif source_backend == 'chroma': memories = await self.export_from_chroma(source_path) else: raise ValueError(f"Unsupported source backend: {source_backend}") # Save to JSON file export_data = { 'source_backend': source_backend, 'source_path': source_path, 'export_timestamp': time.time(), 'total_memories': len(memories), 'memories': memories } with open(output_file, 'w', encoding='utf-8') as f: json.dump(export_data, f, indent=2, ensure_ascii=False) logger.info(f"Exported {len(memories)} memories to {output_file}") return True except Exception as e: logger.error(f"Export failed: {e}") return False async def import_from_file(self, input_file: str) -> bool: """Import data from JSON file to Cloudflare backend.""" try: with open(input_file, 'r', encoding='utf-8') as f: export_data = json.load(f) memories = export_data.get('memories', []) logger.info(f"Loaded {len(memories)} memories from {input_file}") return await self.import_to_cloudflare(memories) except Exception as e: logger.error(f"Import failed: {e}") return False async def migrate_direct(self, source_backend: str, source_path: str) -> bool: """Direct migration from source backend to Cloudflare.""" try: # Export data if source_backend == 'sqlite_vec': memories = await self.export_from_sqlite_vec(source_path) elif source_backend == 'chroma': memories = await self.export_from_chroma(source_path) else: raise ValueError(f"Unsupported source backend: {source_backend}") # Import to Cloudflare return await self.import_to_cloudflare(memories) except Exception as e: logger.error(f"Direct migration failed: {e}") return False async def main(): """Main migration function.""" parser = argparse.ArgumentParser(description='Migrate data to Cloudflare backend') subparsers = parser.add_subparsers(dest='command', help='Migration commands') # Export command export_parser = subparsers.add_parser('export', help='Export data to JSON file') export_parser.add_argument('--source', choices=['sqlite_vec', 'chroma'], required=True, help='Source backend type') export_parser.add_argument('--source-path', required=True, help='Path to source database') export_parser.add_argument('--output', required=True, help='Output JSON file path') # Import command import_parser = subparsers.add_parser('import', help='Import data from JSON file') import_parser.add_argument('--input', required=True, help='Input JSON file path') # Direct migration command migrate_parser = subparsers.add_parser('migrate', help='Direct migration to Cloudflare') migrate_parser.add_argument('--source', choices=['sqlite_vec', 'chroma'], required=True, help='Source backend type') migrate_parser.add_argument('--source-path', required=True, help='Path to source database') args = parser.parse_args() if not args.command: parser.print_help() return migrator = DataMigrator() try: if args.command == 'export': success = await migrator.export_to_file( args.source, args.source_path, args.output ) elif args.command == 'import': success = await migrator.import_from_file(args.input) elif args.command == 'migrate': success = await migrator.migrate_direct(args.source, args.source_path) if success: logger.info("Migration completed successfully!") sys.exit(0) else: logger.error("Migration failed!") sys.exit(1) except KeyboardInterrupt: logger.info("Migration cancelled by user") sys.exit(1) except Exception as e: logger.error(f"Migration error: {e}") sys.exit(1) if __name__ == '__main__': asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server