Skip to main content
Glama
scheduler.py13.2 kB
"""Hybrid scheduler for consolidation agents. This module provides the Scheduler class which coordinates: 1. Programmatic execution of consolidation agents 2. Pipeline orchestration (decay → cluster → merge → promote → relations) 3. Event-driven hooks for urgent decay detection (post_save_check) The scheduler is designed for two modes of operation: - Scheduled: External cron/launchd calls cortexgraph-consolidate CLI - Event-driven: post_save_check() hook detects urgent decay (score < 0.10) Example: # Run full pipeline scheduler = Scheduler(dry_run=True) results = scheduler.run_pipeline() # Run single agent scheduler.run_agent("decay") # Check for urgent decay after saving scheduler.post_save_check("memory-123") """ from __future__ import annotations import logging from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from pathlib import Path from cortexgraph.agents.base import ConsolidationAgent from cortexgraph.storage.jsonl_storage import JSONLStorage logger = logging.getLogger(__name__) # Pipeline execution order AGENT_ORDER = ["decay", "cluster", "merge", "promote", "relations"] # Default threshold for urgent decay detection DEFAULT_URGENT_THRESHOLD = 0.10 # Default interval between scheduled runs (1 hour) DEFAULT_INTERVAL_SECONDS = 3600 # Filename for storing last run timestamp LAST_RUN_FILENAME = ".consolidation_last_run" def post_save_hook(memory_id: str) -> dict[str, Any] | None: """Event-driven hook to check for urgent decay after save_memory. This function is called after a memory is saved to detect if it needs urgent attention (score < threshold). It's designed to be fast and fail-safe - errors are logged but don't propagate. Args: memory_id: ID of the newly saved memory Returns: None if no urgent action needed, otherwise dict with action details """ try: scheduler = Scheduler() return scheduler.post_save_check(memory_id) except Exception as e: logger.warning(f"post_save_hook error for {memory_id}: {e}") return None def calculate_score(memory_id: str) -> float: """Calculate current decay score for a memory. This is a wrapper around the core decay calculation that fetches the memory from storage first. Args: memory_id: Memory ID to check Returns: Current decay score (0.0-1.0) """ from cortexgraph.context import get_db from cortexgraph.core.decay import calculate_score as core_calculate_score storage = get_db() memory = storage.memories.get(memory_id) if memory is None: # Memory not found, return high score (no action needed) return 1.0 return core_calculate_score( use_count=memory.use_count, last_used=memory.last_used, strength=memory.strength, ) class Scheduler: """Hybrid scheduler for consolidation pipeline. Coordinates agent execution and provides event-driven hooks for urgent decay detection. Designed to work with external scheduling (cron/launchd) for periodic execution. Attributes: dry_run: If True, agents preview changes without modifying data urgent_threshold: Score threshold below which memory is urgent (default: 0.10) interval_seconds: Minimum seconds between scheduled runs (default: 3600) """ def __init__( self, dry_run: bool = False, urgent_threshold: float = DEFAULT_URGENT_THRESHOLD, interval_seconds: int | None = None, interval_hours: float | None = None, ) -> None: """Initialize the scheduler. Args: dry_run: If True, agents preview changes without modifying data urgent_threshold: Score threshold for urgent detection (default: 0.10) interval_seconds: Minimum seconds between scheduled runs interval_hours: Alternative way to specify interval (converted to seconds) """ self.dry_run = dry_run self.urgent_threshold = urgent_threshold # Handle interval - hours takes precedence if both specified if interval_hours is not None: self.interval_seconds = int(interval_hours * 3600) elif interval_seconds is not None: self.interval_seconds = interval_seconds else: self.interval_seconds = DEFAULT_INTERVAL_SECONDS def get_storage(self) -> JSONLStorage: """Get the storage instance. Uses get_db() for consistent access across the application. Returns: Configured JSONLStorage instance """ from cortexgraph.context import get_db return get_db() def _get_agent(self, name: str) -> ConsolidationAgent[Any]: """Factory method to create agent instances. Args: name: Agent name (decay, cluster, merge, promote, relations) Returns: Configured agent instance Raises: ValueError: If agent name is unknown """ # Import agents lazily to avoid circular imports from cortexgraph.agents.cluster_detector import ClusterDetector from cortexgraph.agents.decay_analyzer import DecayAnalyzer from cortexgraph.agents.ltm_promoter import LTMPromoter from cortexgraph.agents.relationship_discovery import RelationshipDiscovery from cortexgraph.agents.semantic_merge import SemanticMerge if name == "decay": return DecayAnalyzer(dry_run=self.dry_run) elif name == "cluster": return ClusterDetector(dry_run=self.dry_run) elif name == "merge": return SemanticMerge(dry_run=self.dry_run) elif name == "promote": return LTMPromoter(dry_run=self.dry_run) elif name == "relations": return RelationshipDiscovery(dry_run=self.dry_run) else: raise ValueError(f"Unknown agent: {name}") def run_agent(self, name: str) -> list[Any]: """Run a single consolidation agent. Args: name: Agent name (decay, cluster, merge, promote, relations) Returns: List of results from the agent's run() method Raises: ValueError: If agent name is unknown """ agent = self._get_agent(name) logger.info(f"Running agent: {name} (dry_run={self.dry_run})") results = agent.run() logger.info(f"Agent {name} completed: {len(results) if results else 0} items processed") return results if results else [] def run_pipeline(self) -> dict[str, list[Any]]: """Run all consolidation agents in pipeline order. Executes agents in order: decay → cluster → merge → promote → relations. Stops immediately if any agent raises an error. Returns: Dictionary mapping agent names to their results Raises: Exception: Re-raises any exception from an agent """ logger.info(f"Starting consolidation pipeline (dry_run={self.dry_run})") results: dict[str, list[Any]] = {} for agent_name in AGENT_ORDER: # Run each agent, let exceptions propagate agent_results = self.run_agent(agent_name) results[agent_name] = agent_results logger.info("Consolidation pipeline completed successfully") return results def post_save_check(self, memory_id: str) -> dict[str, Any] | None: """Check if a newly saved memory needs urgent attention. This is the event-driven hook that can be called after save_memory to detect memories that are decaying rapidly and need immediate processing. Args: memory_id: ID of the memory to check Returns: None if no action needed, otherwise dict with action details """ score = calculate_score(memory_id) if score >= self.urgent_threshold: # Score is above threshold, no urgent action needed logger.debug(f"Memory {memory_id} score {score:.3f} >= {self.urgent_threshold}") return None # Score is below threshold - urgent! logger.warning( f"Urgent decay detected: memory {memory_id} score {score:.3f} < {self.urgent_threshold}" ) return self._handle_urgent_memory(memory_id, score) def _handle_urgent_memory(self, memory_id: str, score: float) -> dict[str, Any]: """Handle a memory with urgent decay score. In dry_run mode, returns what would happen without taking action. In live mode, flags the memory for immediate processing. Args: memory_id: ID of the urgent memory score: Current decay score Returns: Dictionary with action details """ if self.dry_run: logger.info(f"[DRY RUN] Would flag memory {memory_id} as urgent (score={score:.3f})") return { "memory_id": memory_id, "score": score, "dry_run": True, "action": "would_flag_urgent", } # Live mode - flag memory for urgent processing logger.info(f"Flagging memory {memory_id} as urgent (score={score:.3f})") # TODO: Integration with beads for issue creation # For now, just return the action taken return { "memory_id": memory_id, "score": score, "dry_run": False, "action": "flagged_urgent", } # ========================================================================= # Scheduled Execution Methods (T089) # ========================================================================= def _get_last_run_file(self) -> Path: """Get the path to the last run timestamp file. Returns: Path to the timestamp file in the storage directory """ from pathlib import Path from cortexgraph.config import get_config config = get_config() return Path(config.storage_path) / LAST_RUN_FILENAME def _get_last_run_time(self) -> int | None: """Get the timestamp of the last scheduled run. Returns: Unix timestamp of last run, or None if never run """ last_run_file = self._get_last_run_file() if not last_run_file.exists(): return None try: content = last_run_file.read_text().strip() return int(content) except (ValueError, OSError) as e: logger.warning(f"Failed to read last run time: {e}") return None def _save_last_run_time(self, timestamp: int) -> None: """Save the timestamp of the current run. Args: timestamp: Unix timestamp to save """ last_run_file = self._get_last_run_file() try: # Ensure parent directory exists last_run_file.parent.mkdir(parents=True, exist_ok=True) last_run_file.write_text(str(timestamp)) except OSError as e: logger.error(f"Failed to save last run time: {e}") def should_run(self, force: bool = False) -> bool: """Check if a scheduled run should execute. Args: force: If True, always return True regardless of interval Returns: True if run should execute, False if interval hasn't elapsed """ if force: return True last_run = self._get_last_run_time() if last_run is None: # Never run before return True import time elapsed = int(time.time()) - last_run return elapsed >= self.interval_seconds def record_run(self) -> None: """Record the current time as the last run time.""" import time self._save_last_run_time(int(time.time())) def run_scheduled(self, force: bool = False) -> dict[str, Any]: """Run the consolidation pipeline if the interval has elapsed. This is the main entry point for cron/launchd scheduled execution. It checks if enough time has passed since the last run and either executes the pipeline or skips. Args: force: If True, run regardless of interval elapsed Returns: Dictionary with execution results or skip information """ if not force and not self.should_run(): logger.info(f"Scheduled run skipped: interval ({self.interval_seconds}s) not elapsed") return { "skipped": True, "reason": "Interval not due - last run was too recent", "interval_seconds": self.interval_seconds, } # Execute the pipeline logger.info(f"Starting scheduled consolidation (force={force})") results = self.run_pipeline() # Record this run self.record_run() return { "skipped": False, "results": results, "interval_seconds": self.interval_seconds, }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/prefrontalsys/mnemex'

If you have feedback or need assistance with the MCP directory API, please join our Discord server