sync_memory_backends.py•10.6 kB
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Bidirectional sync script for MCP Memory Service backends.
Syncs memories between Cloudflare (primary) and SQLite-vec (backup).
"""
import sys
import os
import asyncio
import logging
import argparse
import hashlib
from pathlib import Path
from typing import List, Dict, Any, Tuple
from datetime import datetime
# Add parent directory to path so we can import from the src directory
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.mcp_memory_service.storage.cloudflare import CloudflareStorage
from src.mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from src.mcp_memory_service.config import (
    CLOUDFLARE_API_TOKEN, CLOUDFLARE_ACCOUNT_ID, CLOUDFLARE_VECTORIZE_INDEX,
    CLOUDFLARE_D1_DATABASE_ID, BASE_DIR
)
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger("memory_sync")
class MemorySync:
    """Handles bidirectional sync between Cloudflare and SQLite-vec backends."""
    def __init__(self, sqlite_path: str = None):
        """Initialize sync with storage backends."""
        self.sqlite_path = sqlite_path or os.path.join(BASE_DIR, 'backup_sqlite_vec.db')
        # Initialize storage backends
        self.cloudflare = CloudflareStorage(
            api_token=CLOUDFLARE_API_TOKEN,
            account_id=CLOUDFLARE_ACCOUNT_ID,
            vectorize_index=CLOUDFLARE_VECTORIZE_INDEX,
            d1_database_id=CLOUDFLARE_D1_DATABASE_ID
        )
        self.sqlite_vec = SqliteVecMemoryStorage(self.sqlite_path)
    async def get_all_memories_from_backend(self, backend_name: str) -> List[Dict[str, Any]]:
        """Get all memories from a specific backend."""
        if backend_name == 'cloudflare':
            backend = self.cloudflare
        elif backend_name == 'sqlite_vec':
            backend = self.sqlite_vec
        else:
            raise ValueError(f"Unknown backend: {backend_name}")
        try:
            # Get all memories from the backend
            memories_list = await backend.get_all_memories()
            memories = []
            for memory in memories_list:
                memory_dict = {
                    'id': memory.id,
                    'content': memory.content,
                    'metadata': memory.metadata,
                    'timestamp': memory.timestamp,
                    'hash': memory.hash
                }
                memories.append(memory_dict)
            logger.info(f"Retrieved {len(memories)} memories from {backend_name}")
            return memories
        except Exception as e:
            logger.error(f"Error retrieving memories from {backend_name}: {e}")
            return []
    def calculate_content_hash(self, content: str, metadata: Dict[str, Any]) -> str:
        """Calculate a hash for memory content to detect duplicates."""
        # Create a consistent string representation
        content_str = f"{content}_{sorted(metadata.items())}"
        return hashlib.sha256(content_str.encode()).hexdigest()[:16]
    async def _sync_between_backends(self, source_backend: str, target_backend: str, dry_run: bool = False) -> Tuple[int, int]:
        """
        Generic method to sync memories between any two backends.
        Args:
            source_backend: Backend to sync from ('cloudflare' or 'sqlite_vec')
            target_backend: Backend to sync to ('cloudflare' or 'sqlite_vec')
            dry_run: If True, only show what would be synced without making changes
        Returns:
            Tuple of (added_count, skipped_count)
        """
        logger.info(f"Starting sync from {source_backend} to {target_backend}...")
        # Get memories from both backends
        source_memories = await self.get_all_memories_from_backend(source_backend)
        target_memories = await self.get_all_memories_from_backend(target_backend)
        # Create hash sets for quick lookup
        target_hashes = {mem['hash'] for mem in target_memories if mem.get('hash')}
        target_content_hashes = {
            self.calculate_content_hash(mem['content'], mem['metadata'])
            for mem in target_memories
        }
        added_count = 0
        skipped_count = 0
        # Get target backend instance for storing memories
        target_storage = self.cloudflare if target_backend == 'cloudflare' else self.sqlite_vec
        for source_memory in source_memories:
            # Check if memory already exists (by hash or content)
            content_hash = self.calculate_content_hash(source_memory['content'], source_memory['metadata'])
            if (source_memory.get('hash') in target_hashes or
                content_hash in target_content_hashes):
                skipped_count += 1
                continue
            if not dry_run:
                try:
                    await target_storage.store_memory(
                        content=source_memory['content'],
                        metadata=source_memory['metadata']
                    )
                    added_count += 1
                    logger.debug(f"Added memory: {source_memory['id'][:8]}...")
                except Exception as e:
                    logger.error(f"Error storing memory {source_memory['id']}: {e}")
            else:
                added_count += 1
        logger.info(f"{source_backend} → {target_backend}: {added_count} added, {skipped_count} skipped")
        return added_count, skipped_count
    async def sync_cloudflare_to_sqlite(self, dry_run: bool = False) -> Tuple[int, int]:
        """Sync memories from Cloudflare to SQLite-vec."""
        return await self._sync_between_backends('cloudflare', 'sqlite_vec', dry_run)
    async def sync_sqlite_to_cloudflare(self, dry_run: bool = False) -> Tuple[int, int]:
        """Sync memories from SQLite-vec to Cloudflare."""
        return await self._sync_between_backends('sqlite_vec', 'cloudflare', dry_run)
    async def bidirectional_sync(self, dry_run: bool = False) -> Dict[str, Tuple[int, int]]:
        """Perform bidirectional sync between backends."""
        logger.info("Starting bidirectional sync...")
        results = {}
        # Sync Cloudflare → SQLite-vec
        cf_to_sqlite = await self.sync_cloudflare_to_sqlite(dry_run)
        results['cloudflare_to_sqlite'] = cf_to_sqlite
        # Sync SQLite-vec → Cloudflare
        sqlite_to_cf = await self.sync_sqlite_to_cloudflare(dry_run)
        results['sqlite_to_cloudflare'] = sqlite_to_cf
        logger.info("Bidirectional sync completed")
        return results
    async def get_sync_status(self) -> Dict[str, Any]:
        """Get sync status showing memory counts in both backends."""
        cf_memories = await self.get_all_memories_from_backend('cloudflare')
        sqlite_memories = await self.get_all_memories_from_backend('sqlite_vec')
        status = {
            'cloudflare_count': len(cf_memories),
            'sqlite_vec_count': len(sqlite_memories),
            'sync_time': datetime.now().isoformat(),
            'backends_configured': {
                'cloudflare': bool(CLOUDFLARE_API_TOKEN and CLOUDFLARE_ACCOUNT_ID),
                'sqlite_vec': os.path.exists(self.sqlite_path) if self.sqlite_path else False
            }
        }
        return status
async def main():
    """Main function to run memory sync operations."""
    parser = argparse.ArgumentParser(description='Sync memories between Cloudflare and SQLite-vec backends')
    parser.add_argument('--direction', choices=['cf-to-sqlite', 'sqlite-to-cf', 'bidirectional'],
                        default='bidirectional', help='Sync direction')
    parser.add_argument('--dry-run', action='store_true', help='Show what would be synced without actually syncing')
    parser.add_argument('--status', action='store_true', help='Show sync status only')
    parser.add_argument('--sqlite-path', help='Path to SQLite-vec database file')
    parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging')
    args = parser.parse_args()
    if args.verbose:
        logging.getLogger().setLevel(logging.DEBUG)
    # Initialize sync
    sync = MemorySync(sqlite_path=args.sqlite_path)
    try:
        if args.status:
            status = await sync.get_sync_status()
            print(f"\n=== Memory Sync Status ===")
            print(f"Cloudflare memories: {status['cloudflare_count']}")
            print(f"SQLite-vec memories: {status['sqlite_vec_count']}")
            print(f"Cloudflare configured: {status['backends_configured']['cloudflare']}")
            print(f"SQLite-vec file exists: {status['backends_configured']['sqlite_vec']}")
            print(f"Last check: {status['sync_time']}")
            return
        logger.info(f"=== Starting memory sync ({args.direction}) ===")
        if args.dry_run:
            logger.info("DRY RUN MODE - No changes will be made")
        if args.direction == 'cf-to-sqlite':
            added, skipped = await sync.sync_cloudflare_to_sqlite(dry_run=args.dry_run)
            print(f"Cloudflare → SQLite-vec: {added} added, {skipped} skipped")
        elif args.direction == 'sqlite-to-cf':
            added, skipped = await sync.sync_sqlite_to_cloudflare(dry_run=args.dry_run)
            print(f"SQLite-vec → Cloudflare: {added} added, {skipped} skipped")
        else:  # bidirectional
            results = await sync.bidirectional_sync(dry_run=args.dry_run)
            cf_to_sqlite = results['cloudflare_to_sqlite']
            sqlite_to_cf = results['sqlite_to_cloudflare']
            print(f"Cloudflare → SQLite-vec: {cf_to_sqlite[0]} added, {cf_to_sqlite[1]} skipped")
            print(f"SQLite-vec → Cloudflare: {sqlite_to_cf[0]} added, {sqlite_to_cf[1]} skipped")
        logger.info("=== Sync completed successfully ===")
    except Exception as e:
        logger.error(f"Sync failed: {str(e)}")
        sys.exit(1)
if __name__ == "__main__":
    asyncio.run(main())