Skip to main content
Glama
decay_analyzer.py8.68 kB
"""Decay Analyzer agent for memory triage (T028-T033). This agent identifies memories approaching the forget threshold and creates beads issues for triage. It's the MVP agent that prevents memory loss. From spec.md US1: As a knowledge worker, I want memories approaching forget threshold to be automatically flagged so I can decide whether to reinforce, consolidate, or let them decay. Urgency Classification: - HIGH: score < 0.10 (near-forget, needs immediate attention) - MEDIUM: score 0.10-0.25 (danger zone peak, review soon) - LOW: score 0.25-0.35 (entering decay, can wait) Action Recommendations: - REINFORCE: Touch memory to reset decay - CONSOLIDATE: Group with similar memories - PROMOTE: Move to long-term storage if high value - GC: Allow to expire (low value) """ from __future__ import annotations import logging from typing import TYPE_CHECKING from cortexgraph.agents.base import ConsolidationAgent from cortexgraph.agents.models import DecayAction, DecayResult, Urgency from cortexgraph.core.decay import calculate_score if TYPE_CHECKING: from cortexgraph.storage.jsonl_storage import JSONLStorage from cortexgraph.storage.models import Memory logger = logging.getLogger(__name__) # Default threshold for scanning (memories below this are candidates) SCAN_THRESHOLD = 0.35 # Urgency boundaries URGENCY_HIGH_THRESHOLD = 0.10 URGENCY_MEDIUM_THRESHOLD = 0.25 def get_storage() -> JSONLStorage: """Get storage instance. Separated for testability.""" from cortexgraph.context import get_db return get_db() class DecayAnalyzer(ConsolidationAgent[DecayResult]): """Agent for identifying memories needing triage due to decay (T028). Scans for memories with score below threshold and classifies them by urgency, recommending appropriate actions. Example: >>> analyzer = DecayAnalyzer(dry_run=True) >>> results = analyzer.run() >>> for result in results: ... print(f"{result.memory_id}: {result.urgency} - {result.action}") """ def __init__( self, dry_run: bool = False, rate_limit: int = 100, scan_threshold: float = SCAN_THRESHOLD, ) -> None: """Initialize DecayAnalyzer. Args: dry_run: If True, preview without making changes rate_limit: Max operations per minute scan_threshold: Score threshold for scanning (default 0.35) """ super().__init__(dry_run=dry_run, rate_limit=rate_limit) self.scan_threshold = scan_threshold self._storage: JSONLStorage | None = None @property def storage(self) -> JSONLStorage: """Get storage, initializing lazily.""" if self._storage is None: self._storage = get_storage() return self._storage def _compute_score(self, memory: Memory) -> float: """Compute current decay score for a memory. Uses the decay module's calculate_score function which considers use_count, last_used, and strength. """ return calculate_score( use_count=memory.use_count, last_used=memory.last_used, strength=memory.strength, ) def scan(self) -> list[str]: """Find memories with score below threshold (T029). Returns: List of memory IDs needing triage Contract (from contracts/agent-api.md): - MUST return list (may be empty) - MUST NOT modify any data - SHOULD complete within 5 seconds """ memory_ids: list[str] = [] # Get all memories and filter by computed score for mem_id, memory in self.storage.memories.items(): try: score = self._compute_score(memory) if score < self.scan_threshold: memory_ids.append(mem_id) except (AttributeError, TypeError): # Skip memories with missing attributes logger.debug(f"Skipping memory {mem_id} - cannot compute score") continue logger.info(f"DecayAnalyzer scan found {len(memory_ids)} memories below threshold") return memory_ids def process_item(self, memory_id: str) -> DecayResult: """Process a single memory, determining urgency and action (T030). Args: memory_id: UUID of memory to process Returns: DecayResult with urgency classification and recommended action Raises: KeyError: If memory_id not found ValueError: If memory has invalid data Contract (from contracts/agent-api.md): - MUST return DecayResult or raise exception - If dry_run=True, MUST NOT modify any data - SHOULD complete within 5 seconds """ # Get memory from storage memory = self.storage.memories.get(memory_id) if memory is None: raise KeyError(f"Memory not found: {memory_id}") # Compute score using decay calculation try: score = self._compute_score(memory) except (AttributeError, TypeError) as e: raise ValueError(f"Memory {memory_id} has invalid data: {e}") from e # Classify urgency (T025) urgency = self._classify_urgency(score) # Recommend action (T026) action = self._recommend_action(score, memory) # Create beads issue for urgent items (T032) - only if not dry_run beads_issue_id = None if not self.dry_run and urgency == Urgency.HIGH: beads_issue_id = self._create_beads_issue(memory_id, score, urgency, action) return DecayResult( memory_id=memory_id, score=score, urgency=urgency, action=action, beads_issue_id=beads_issue_id, ) def _classify_urgency(self, score: float) -> Urgency: """Classify urgency based on score (T025). Thresholds from research.md danger zone analysis: - HIGH: < 0.10 (near forget threshold) - MEDIUM: 0.10-0.25 (danger zone peak) - LOW: 0.25-0.35 (entering decay) """ if score < URGENCY_HIGH_THRESHOLD: return Urgency.HIGH elif score < URGENCY_MEDIUM_THRESHOLD: return Urgency.MEDIUM else: return Urgency.LOW def _recommend_action(self, score: float, memory: object) -> DecayAction: """Recommend action based on score and memory attributes (T026). Logic: - score < 0.05: GC (too late to save unless high value) - score < 0.10 & high use_count: REINFORCE (worth saving) - score < 0.10 & low use_count: GC (let it go) - score < 0.20: CONSOLIDATE (group with similar) - score >= 0.20 & high strength: PROMOTE (move to LTM) - otherwise: REINFORCE (default safe action) """ use_count = getattr(memory, "use_count", 0) strength = getattr(memory, "strength", 1.0) # Near forget threshold if score < 0.05: # Only save if heavily used if use_count >= 5: return DecayAction.REINFORCE return DecayAction.GC # High urgency zone if score < URGENCY_HIGH_THRESHOLD: if use_count >= 3: return DecayAction.REINFORCE return DecayAction.GC # Medium urgency - good candidates for consolidation if score < 0.20: return DecayAction.CONSOLIDATE # Low urgency with high strength - promote if strength >= 1.5: return DecayAction.PROMOTE # Default: reinforce to prevent further decay return DecayAction.REINFORCE def _create_beads_issue( self, memory_id: str, score: float, urgency: Urgency, action: DecayAction, ) -> str | None: """Create beads issue for urgent memory (T032). Only creates issues for HIGH urgency memories to avoid noise. """ try: from cortexgraph.agents.beads_integration import create_consolidation_issue issue_id = create_consolidation_issue( agent="decay", memory_ids=[memory_id], action=action.value, urgency=urgency.value, ) logger.info(f"Created beads issue {issue_id} for memory {memory_id}") return issue_id except Exception as e: logger.warning(f"Failed to create beads issue: {e}") return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/prefrontalsys/mnemex'

If you have feedback or need assistance with the MCP directory API, please join our Discord server