Skip to main content
Glama
cleanup_association_memories_hybrid.py18 kB
#!/usr/bin/env python3 """ Hybrid Association Memory Cleanup Script Removes association memories from BOTH local SQLite AND Cloudflare D1/Vectorize. This ensures multi-PC sync doesn't restore deleted associations. PROBLEM SOLVED: When using hybrid backend with multiple PCs, deleting association memories locally doesn't prevent them from being restored via Cloudflare drift-sync. The drift-sync mechanism (hybrid.py:632-750) pulls "missing" memories from Cloudflare back to local storage. SOLUTION: Clean Cloudflare D1 FIRST, then local SQLite. This prevents sync from restoring deleted associations. Other PCs will sync the deletion automatically. WORKFLOW: ┌─────────────┐ sync ┌─────────────┐ sync ┌─────────────┐ │ Windows PC │ ◄──────────► │ Cloudflare │ ◄──────────► │ Linux PC │ │ (run here) │ │ D1 + Vec │ │ auto-sync │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ └── 1. Delete D1 first ──────┘ │ └── 2. Delete Vectorize (optional) ───────────────────────┘ └── 3. Delete local SQLite ───────────────────────────────┘ Usage: python cleanup_association_memories_hybrid.py --dry-run # Preview python cleanup_association_memories_hybrid.py --apply # Execute python cleanup_association_memories_hybrid.py --apply --skip-vectorize # Skip Vectorize cleanup python cleanup_association_memories_hybrid.py --apply --cloudflare-only # Only clean Cloudflare python cleanup_association_memories_hybrid.py --apply --local-only # Only clean local Prerequisites: - MCP_GRAPH_STORAGE_MODE=graph_only should be set (v8.51.0+) - Graph table must exist (run backfill_graph_table.py first if needed) - Cloudflare credentials in environment (CLOUDFLARE_API_TOKEN, etc.) See also: - cleanup_association_memories.py - Local-only cleanup (SQLite backend) - docs/migration/graph-migration-guide.md - Full migration guide """ import asyncio import sqlite3 import os import sys import io import shutil from pathlib import Path from datetime import datetime from typing import List, Dict, Any # Fix Windows console encoding FIRST if sys.platform == 'win32': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') # Now import httpx import httpx # Add project root to path PROJECT_ROOT = Path(__file__).parent.parent.parent sys.path.insert(0, str(PROJECT_ROOT / "src")) from mcp_memory_service.config import SQLITE_VEC_PATH # Database paths DB_PATH = Path(SQLITE_VEC_PATH) if SQLITE_VEC_PATH else None # Cloudflare config from environment CF_TOKEN = os.getenv('CLOUDFLARE_API_TOKEN') CF_ACCOUNT = os.getenv('CLOUDFLARE_ACCOUNT_ID') CF_DATABASE = os.getenv('CLOUDFLARE_D1_DATABASE_ID') CF_VECTORIZE = os.getenv('CLOUDFLARE_VECTORIZE_INDEX', 'mcp-memory-index') async def get_cloudflare_associations() -> List[str]: """Get all association memory hashes from Cloudflare D1.""" if not all([CF_TOKEN, CF_ACCOUNT, CF_DATABASE]): print("[ERROR] Cloudflare credentials not configured") return [] headers = { 'Authorization': f'Bearer {CF_TOKEN}', 'Content-Type': 'application/json' } url = f'https://api.cloudflare.com/client/v4/accounts/{CF_ACCOUNT}/d1/database/{CF_DATABASE}/query' async with httpx.AsyncClient(timeout=60) as client: resp = await client.post(url, headers=headers, json={ 'sql': "SELECT content_hash FROM memories WHERE content LIKE 'Association between%'" }) data = resp.json() if not data.get('success'): print(f"[ERROR] Cloudflare query failed: {data}") return [] results = data['result'][0]['results'] hashes = [r['content_hash'] for r in results] print(f"[CLOUDFLARE] Found {len(hashes)} association memories") return hashes async def delete_from_cloudflare_d1(hashes: List[str], batch_size: int = 50, dry_run: bool = True) -> int: """Delete memories from Cloudflare D1 database.""" if dry_run: print(f"[DRY RUN] Would delete {len(hashes)} memories from Cloudflare D1") return len(hashes) if not all([CF_TOKEN, CF_ACCOUNT, CF_DATABASE]): print("[ERROR] Cloudflare credentials not configured") return 0 headers = { 'Authorization': f'Bearer {CF_TOKEN}', 'Content-Type': 'application/json' } url = f'https://api.cloudflare.com/client/v4/accounts/{CF_ACCOUNT}/d1/database/{CF_DATABASE}/query' deleted = 0 async with httpx.AsyncClient(timeout=120) as client: for i in range(0, len(hashes), batch_size): batch = hashes[i:i + batch_size] placeholders = ','.join(['?' for _ in batch]) resp = await client.post(url, headers=headers, json={ 'sql': f"DELETE FROM memories WHERE content_hash IN ({placeholders})", 'params': batch }) data = resp.json() if data.get('success'): changes = data['result'][0].get('meta', {}).get('changes', len(batch)) deleted += changes print(f" D1: Deleted batch {i//batch_size + 1} ({changes} records)") else: print(f" [ERROR] D1 batch delete failed: {data.get('errors', data)}") await asyncio.sleep(0.5) print(f"[OK] Deleted {deleted} memories from Cloudflare D1") return deleted async def delete_from_cloudflare_vectorize(hashes: List[str], batch_size: int = 100, dry_run: bool = True) -> int: """Delete vectors from Cloudflare Vectorize. Note: This step is OPTIONAL. Orphaned vectors in Vectorize are harmless - they don't appear in search results since the D1 metadata is deleted. Use --skip-vectorize to skip this step if Vectorize API is problematic. Args: hashes: List of content hashes (vector IDs) to delete batch_size: Number of vectors to delete per API call dry_run: If True, only preview without actual deletion Returns: Number of vectors deleted (or would be deleted in dry-run mode) """ if dry_run: print(f"[DRY RUN] Would delete {len(hashes)} vectors from Cloudflare Vectorize") return len(hashes) if not all([CF_TOKEN, CF_ACCOUNT]): print("[ERROR] Cloudflare credentials not configured") return 0 headers = { 'Authorization': f'Bearer {CF_TOKEN}', 'Content-Type': 'application/json' } url = f'https://api.cloudflare.com/client/v4/accounts/{CF_ACCOUNT}/vectorize/v2/indexes/{CF_VECTORIZE}/delete-by-ids' deleted = 0 errors_count = 0 async with httpx.AsyncClient(timeout=120) as client: for i in range(0, len(hashes), batch_size): batch = hashes[i:i + batch_size] try: resp = await client.post(url, headers=headers, json={ 'ids': batch }) # Handle non-JSON responses (Vectorize API can return HTML errors) try: data = resp.json() except Exception as json_err: errors_count += 1 if errors_count <= 3: # Only show first few errors print(f" [WARN] Vectorize batch {i//batch_size + 1}: Invalid JSON response (HTTP {resp.status_code})") continue if data.get('success'): count = data.get('result', {}).get('count', len(batch)) deleted += count print(f" Vectorize: Deleted batch {i//batch_size + 1} ({count} vectors)") else: errors = data.get('errors', []) # Ignore "not found" errors - vectors may already be deleted if errors and 'not found' not in str(errors).lower(): errors_count += 1 if errors_count <= 3: print(f" [WARN] Vectorize batch {i//batch_size + 1}: {errors}") except httpx.RequestError as e: errors_count += 1 if errors_count <= 3: print(f" [WARN] Vectorize batch {i//batch_size + 1}: Network error - {e}") await asyncio.sleep(0.5) if errors_count > 3: print(f" [WARN] {errors_count} total Vectorize errors (showing first 3)") print(f"[OK] Deleted {deleted} vectors from Cloudflare Vectorize") if errors_count > 0: print(f" (Note: {errors_count} batches had errors - orphaned vectors are harmless)") return deleted def get_local_associations(conn: sqlite3.Connection) -> List[str]: """Get all association memory hashes from local SQLite.""" cursor = conn.cursor() cursor.execute("SELECT content_hash FROM memories WHERE content LIKE 'Association between%'") hashes = [row[0] for row in cursor.fetchall()] print(f"[LOCAL] Found {len(hashes)} association memories") return hashes def delete_from_local(conn: sqlite3.Connection, hashes: List[str], dry_run: bool = True) -> int: """Delete memories from local SQLite database.""" if dry_run: print(f"[DRY RUN] Would delete {len(hashes)} memories from local SQLite") return len(hashes) cursor = conn.cursor() deleted = 0 try: conn.execute("BEGIN TRANSACTION") for content_hash in hashes: cursor.execute("DELETE FROM memories WHERE content_hash = ?", (content_hash,)) if cursor.rowcount > 0: deleted += 1 conn.commit() print(f"[OK] Deleted {deleted} memories from local SQLite") except Exception as e: conn.rollback() print(f"[ERROR] Local delete failed: {e}") raise return deleted def create_backup(db_path: Path) -> Path: """Create a timestamped backup of the local database.""" timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") backup_path = db_path.parent / f"{db_path.stem}.backup-{timestamp}{db_path.suffix}" shutil.copy2(db_path, backup_path) print(f"[BACKUP] Created: {backup_path}") return backup_path def verify_graph_table(conn: sqlite3.Connection) -> Dict[str, int]: """Verify graph table has the association data.""" cursor = conn.cursor() try: cursor.execute("SELECT COUNT(*) FROM memory_graph") graph_count = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM memories") memory_count = cursor.fetchone()[0] return { 'graph_edges': graph_count, 'total_memories': memory_count, 'graph_exists': graph_count > 0 } except sqlite3.OperationalError: return { 'graph_edges': 0, 'total_memories': 0, 'graph_exists': False, 'error': 'Graph table does not exist!' } async def main(): import argparse parser = argparse.ArgumentParser( description='Hybrid Association Memory Cleanup - Cleans BOTH Cloudflare AND local SQLite', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s --dry-run # Preview what would be deleted %(prog)s --apply # Execute full cleanup (D1 + Vectorize + local) %(prog)s --apply --skip-vectorize # Skip Vectorize (orphaned vectors are harmless) %(prog)s --apply --cloudflare-only # Only clean Cloudflare (run from any PC) %(prog)s --apply --local-only # Only clean local (if Cloudflare already done) Multi-PC Usage: For multi-PC setups, Cloudflare must be cleaned FIRST to prevent drift-sync from restoring deleted associations. Run with --cloudflare-only from one PC, then other PCs will auto-sync the deletion. """ ) parser.add_argument('--dry-run', action='store_true', help='Preview without changes') parser.add_argument('--apply', action='store_true', help='Apply changes (required for actual deletion)') parser.add_argument('--local-only', action='store_true', help='Only clean local SQLite (skip Cloudflare)') parser.add_argument('--cloudflare-only', action='store_true', help='Only clean Cloudflare (skip local SQLite)') parser.add_argument('--skip-vectorize', action='store_true', help='Skip Vectorize cleanup (orphaned vectors are harmless and cleanup can fail)') args = parser.parse_args() if not args.dry_run and not args.apply: parser.print_help() print("\nError: Specify --dry-run to preview or --apply to execute.") sys.exit(1) dry_run = args.dry_run print("=" * 70) print("HYBRID ASSOCIATION MEMORY CLEANUP") print("=" * 70) print(f"Mode: {'DRY RUN (preview)' if dry_run else 'APPLY (destructive)'}") print() # Check local database if DB_PATH is None: print("[ERROR] Local database path not configured") sys.exit(1) if not DB_PATH.exists(): print(f"[ERROR] Local database not found: {DB_PATH}") sys.exit(1) print(f"Local DB: {DB_PATH}") print(f"Cloudflare: {CF_DATABASE[:16]}... (Vectorize: {CF_VECTORIZE})") print() # Connect to local database conn = sqlite3.connect(DB_PATH) # Verify graph table exists graph_status = verify_graph_table(conn) print(f"Graph table status:") print(f" - Graph edges: {graph_status['graph_edges']}") print(f" - Total memories: {graph_status['total_memories']}") if not graph_status['graph_exists']: print("\n[ABORT] Graph table does not exist!") print(" Run backfill_graph_table.py first to migrate associations.") conn.close() sys.exit(1) # Get association counts print("\n--- Current State ---") local_hashes = get_local_associations(conn) cf_hashes = await get_cloudflare_associations() # Find unique hashes (union of both) all_hashes = list(set(local_hashes + cf_hashes)) print(f" Combined unique: {len(all_hashes)}") if len(all_hashes) == 0: print("\n[OK] No association memories to clean up!") conn.close() return # Confirmation for apply mode if not dry_run: print("\n[WARNING] This will permanently delete association memories from:") if not args.cloudflare_only: print(f" - Local SQLite: {len(local_hashes)} memories") if not args.local_only: print(f" - Cloudflare D1: {len(cf_hashes)} memories") if args.skip_vectorize: print(f" - Cloudflare Vectorize: SKIPPED (--skip-vectorize)") else: print(f" - Cloudflare Vectorize: {len(cf_hashes)} vectors") confirm = input("\nType 'yes' to proceed: ") if confirm.lower() != 'yes': print("Aborted.") conn.close() return # Create backup if not args.cloudflare_only: create_backup(DB_PATH) # Execute cleanup print("\n--- Starting cleanup ---") stats = { 'local_deleted': 0, 'cf_d1_deleted': 0, 'cf_vectorize_deleted': 0 } # Cloudflare first (so sync doesn't bring them back) if not args.local_only and cf_hashes: print("\n[CLOUDFLARE CLEANUP]") stats['cf_d1_deleted'] = await delete_from_cloudflare_d1(cf_hashes, dry_run=dry_run) if args.skip_vectorize: print(" Vectorize: Skipped (--skip-vectorize flag)") stats['cf_vectorize_deleted'] = 0 else: stats['cf_vectorize_deleted'] = await delete_from_cloudflare_vectorize(cf_hashes, dry_run=dry_run) # Then local if not args.cloudflare_only and local_hashes: print("\n[LOCAL CLEANUP]") stats['local_deleted'] = delete_from_local(conn, local_hashes, dry_run=dry_run) # VACUUM to reclaim space if not dry_run: print("\n[VACUUM] Reclaiming space...") conn.execute("VACUUM") print("[OK] VACUUM complete") # Summary print("\n" + "=" * 70) print("SUMMARY") print("=" * 70) action = "would be " if dry_run else "" print(f"Local SQLite: {stats['local_deleted']} memories {action}deleted") print(f"Cloudflare D1: {stats['cf_d1_deleted']} memories {action}deleted") if args.skip_vectorize: print(f"Cloudflare Vectorize: SKIPPED (orphaned vectors are harmless)") else: print(f"Cloudflare Vectorize: {stats['cf_vectorize_deleted']} vectors {action}deleted") if dry_run: print("\n[TIP] Run with --apply to execute these changes") else: print("\n[OK] Cleanup complete!") print(" Other PCs will sync the deletion automatically.") # Verify final state final_status = verify_graph_table(conn) print(f"\n--- Final State ---") print(f" Graph edges: {final_status['graph_edges']} (preserved)") print(f" Total memories: {final_status['total_memories']}") conn.close() if __name__ == '__main__': asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server