Skip to main content
Glama
cleanup_low_quality.py6.82 kB
#!/usr/bin/env python3 """ Safe cleanup of very low quality memories (<0.1 score). Removes document fragments, page numbers, and corrupted PDF text identified by DeBERTa quality scoring. """ import asyncio import sys from pathlib import Path from datetime import datetime sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage from mcp_memory_service.config import SQLITE_VEC_PATH # Safety thresholds QUALITY_THRESHOLD = 0.1 # Only delete memories scoring below this MIN_LENGTH_EXCEPTION = 100 # Keep memories >100 chars even if low quality PRESERVE_TYPES = ['test', 'troubleshooting'] # Never delete these types async def cleanup_low_quality(dry_run=True): """ Cleanup very low quality memories with safety checks. Args: dry_run: If True, only report what would be deleted (default) """ print("=" * 80) print(f"Low Quality Memory Cleanup - {'DRY RUN' if dry_run else 'LIVE MODE'}") print("=" * 80) print() # Connect to storage print("Connecting to database...") storage = SqliteVecMemoryStorage(SQLITE_VEC_PATH) await storage.initialize() print(f"✓ Connected to: {SQLITE_VEC_PATH}") print() # Fetch all memories print("Fetching all memories...") all_memories = await storage.get_all_memories() print(f"✓ Total memories: {len(all_memories)}") print() # Identify cleanup candidates candidates = [] preserved = { 'high_quality': 0, 'long_content': 0, 'preserved_type': 0, 'no_quality_score': 0 } for memory in all_memories: metadata = memory.metadata or {} quality_score = metadata.get('quality_score') quality_provider = metadata.get('quality_provider') # Only process DeBERTa-scored memories if quality_provider != 'onnx_deberta': preserved['no_quality_score'] += 1 continue # Skip if no quality score if quality_score is None: preserved['no_quality_score'] += 1 continue # Preserve high quality if quality_score >= QUALITY_THRESHOLD: preserved['high_quality'] += 1 continue # Preserve longer content (might be valuable despite low score) if len(memory.content) > MIN_LENGTH_EXCEPTION: preserved['long_content'] += 1 continue # Preserve specific types if memory.memory_type in PRESERVE_TYPES: preserved['preserved_type'] += 1 continue # Mark for deletion candidates.append({ 'hash': memory.content_hash, 'type': memory.memory_type or '(no type)', 'score': quality_score, 'length': len(memory.content), 'preview': memory.content[:80].replace('\n', ' ') }) # Report findings print("=" * 80) print("Analysis Results") print("=" * 80) print() print(f"Memories to DELETE: {len(candidates)}") print(f" Quality score <{QUALITY_THRESHOLD}") print(f" Length ≤{MIN_LENGTH_EXCEPTION} characters") print(f" Not in preserved types: {PRESERVE_TYPES}") print() print("Memories to PRESERVE:") print(f" High quality (≥{QUALITY_THRESHOLD}): {preserved['high_quality']}") print(f" Long content (>{MIN_LENGTH_EXCEPTION} chars): {preserved['long_content']}") print(f" Preserved types: {preserved['preserved_type']}") print(f" No quality score: {preserved['no_quality_score']}") print() if len(candidates) == 0: print("✓ No memories to delete!") return # Show breakdown by type print("Deletion Breakdown by Type:") from collections import Counter type_counts = Counter(c['type'] for c in candidates) for mem_type, count in sorted(type_counts.items(), key=lambda x: -x[1])[:15]: avg_score = sum(c['score'] for c in candidates if c['type'] == mem_type) / count print(f" {mem_type:25s}: {count:4d} memories (avg score: {avg_score:.3f})") print() # Show sample deletions print("Sample Deletions (10 random):") import random samples = random.sample(candidates, min(10, len(candidates))) for sample in samples: print(f" Score: {sample['score']:.4f} | Len: {sample['length']:4d} | Type: {sample['type']}") print(f" {sample['preview'][:100]}") print() if dry_run: print("=" * 80) print("DRY RUN COMPLETE - No changes made") print("=" * 80) print() print("To execute cleanup, run:") print(f" python {__file__} --execute") return # Execute cleanup print("=" * 80) print("EXECUTING CLEANUP") print("=" * 80) print() # Create deletion log log_path = Path.home() / 'backups/mcp-memory-service' / f'cleanup-log-{datetime.now().strftime("%Y%m%d-%H%M%S")}.txt' log_path.parent.mkdir(parents=True, exist_ok=True) with open(log_path, 'w') as log: log.write(f"Cleanup executed: {datetime.now()}\n") log.write(f"Threshold: quality_score < {QUALITY_THRESHOLD}\n") log.write(f"Max length: {MIN_LENGTH_EXCEPTION} characters\n") log.write(f"Total deleted: {len(candidates)}\n\n") deleted_count = 0 error_count = 0 for i, candidate in enumerate(candidates, 1): try: success, message = await storage.delete(candidate['hash']) if success: deleted_count += 1 else: error_count += 1 log.write(f"ERROR: {candidate['hash']}\t{message}\n") # Log deletion log.write(f"{candidate['hash']}\t{candidate['score']:.4f}\t{candidate['length']}\t{candidate['type']}\n") if i % 100 == 0: print(f" Deleted: {deleted_count}/{len(candidates)}") except Exception as e: error_count += 1 log.write(f"ERROR: {candidate['hash']}\t{str(e)}\n") print(f" Error deleting {candidate['hash'][:16]}...: {e}") print() print("=" * 80) print("CLEANUP COMPLETE") print("=" * 80) print() print(f"✓ Deleted: {deleted_count} memories") if error_count > 0: print(f"⚠ Errors: {error_count}") print(f"📝 Log saved: {log_path}") print() # Verify final state remaining = await storage.get_all_memories() print(f"Database state:") print(f" Total memories: {len(remaining)}") print(f" Removed: {len(all_memories) - len(remaining)}") if __name__ == "__main__": # Check for --execute flag dry_run = '--execute' not in sys.argv asyncio.run(cleanup_low_quality(dry_run=dry_run))

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server