Skip to main content
Glama
apply_quality_boost_retroactively.py6.83 kB
#!/usr/bin/env python3 """ Retroactively apply association-based quality boosts to all existing memories. This script runs the consolidation's relevance calculation on all memories to apply quality boosts based on connection counts, without running the full consolidation pipeline (no archiving, no compression, just quality updates). Usage: python scripts/maintenance/apply_quality_boost_retroactively.py [--dry-run] [--batch-size 500] """ import asyncio import argparse import sys import logging from datetime import datetime from pathlib import Path # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from mcp_memory_service.storage.factory import create_storage_instance from mcp_memory_service.consolidation.decay import ExponentialDecayCalculator from mcp_memory_service.consolidation.base import ConsolidationConfig from mcp_memory_service.config import CONSOLIDATION_CONFIG, SQLITE_VEC_PATH logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) async def get_connection_counts(storage, memories): """Calculate connection counts for all memories.""" logger.info(f"Calculating connection counts for {len(memories)} memories...") # Build connection graph from associations stored as memories connections = {} # Get all association memories all_memories = await storage.get_all_memories() association_memories = [m for m in all_memories if m.memory_type == 'association'] logger.info(f"Found {len(association_memories)} existing associations") # Count connections for assoc in association_memories: # Association metadata contains source_memory_hashes source_hashes = assoc.metadata.get('source_memory_hashes', []) for hash_val in source_hashes: connections[hash_val] = connections.get(hash_val, 0) + 1 return connections async def apply_quality_boosts(dry_run=False, batch_size=500): """Apply quality boosts to all memories based on connections.""" logger.info("=" * 80) logger.info("Association-Based Quality Boost - Retroactive Application") logger.info("=" * 80) logger.info(f"Mode: {'DRY RUN' if dry_run else 'LIVE'}") logger.info(f"Batch size: {batch_size}") logger.info("") # Initialize storage logger.info("Initializing storage backend...") storage = await create_storage_instance(SQLITE_VEC_PATH, server_type='script') try: # Get all memories logger.info("Retrieving all memories...") all_memories = await storage.get_all_memories() logger.info(f"Found {len(all_memories)} total memories") # Calculate connection counts connections = await get_connection_counts(storage, all_memories) logger.info(f"Connection graph built: {len(connections)} memories have connections") # Initialize decay calculator config_obj = ConsolidationConfig(**CONSOLIDATION_CONFIG) decay_calculator = ExponentialDecayCalculator(config_obj) # Process in batches total_boosted = 0 total_processed = 0 for i in range(0, len(all_memories), batch_size): batch = all_memories[i:i + batch_size] batch_num = (i // batch_size) + 1 total_batches = (len(all_memories) + batch_size - 1) // batch_size logger.info("") logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} memories)...") # Calculate relevance scores (includes quality boost) relevance_scores = await decay_calculator.process( batch, connections=connections, access_patterns={} ) # Count boosts applied in this batch batch_boosted = sum( 1 for score in relevance_scores if score.metadata.get('association_boost_applied', False) ) # Update memories with boosted quality scores if not dry_run: for memory, score in zip(batch, relevance_scores): if score.metadata.get('association_boost_applied', False): updated_memory = await decay_calculator.update_memory_relevance_metadata(memory, score) await storage.update_memory_metadata( updated_memory.content_hash, updated_memory.metadata, preserve_timestamps=True ) total_boosted += batch_boosted total_processed += len(batch) logger.info(f" Batch {batch_num}: {batch_boosted} memories received quality boost") logger.info(f" Progress: {total_processed}/{len(all_memories)} memories processed ({total_boosted} boosted)") # Summary logger.info("") logger.info("=" * 80) logger.info("SUMMARY") logger.info("=" * 80) logger.info(f"Total memories processed: {total_processed}") logger.info(f"Total memories boosted: {total_boosted}") logger.info(f"Boost rate: {(total_boosted / total_processed * 100):.1f}%") if dry_run: logger.info("") logger.info("DRY RUN MODE - No changes were written to storage") logger.info("Run without --dry-run to apply changes") else: logger.info("") logger.info("✅ Quality boosts applied successfully!") logger.info("") logger.info("Verification:") logger.info(" Check logs: grep 'Association quality boost' ~/.local/share/mcp-memory/logs/*.log") logger.info(" Check distribution: curl http://127.0.0.1:8000/api/quality/distribution") finally: # Cleanup if hasattr(storage, 'close'): await storage.close() async def main(): parser = argparse.ArgumentParser( description="Retroactively apply association-based quality boosts" ) parser.add_argument( '--dry-run', action='store_true', help="Preview changes without applying them" ) parser.add_argument( '--batch-size', type=int, default=500, help="Number of memories to process per batch (default: 500)" ) args = parser.parse_args() try: await apply_quality_boosts( dry_run=args.dry_run, batch_size=args.batch_size ) return 0 except KeyboardInterrupt: logger.warning("\nInterrupted by user") return 130 except Exception as e: logger.error(f"Error: {e}", exc_info=True) return 1 if __name__ == '__main__': exit_code = asyncio.run(main()) sys.exit(exit_code)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server