Skip to main content
Glama

MCP Memory Service

# Copyright 2024 Heinrich Krupp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """APScheduler integration for autonomous consolidation operations.""" import asyncio import logging from typing import Dict, Any, Optional, Callable, Awaitable from datetime import datetime, timedelta try: from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.asyncio import AsyncIOExecutor from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR APSCHEDULER_AVAILABLE = True except ImportError: APSCHEDULER_AVAILABLE = False from .consolidator import DreamInspiredConsolidator from .base import ConsolidationConfig class ConsolidationScheduler: """ Scheduler for autonomous consolidation operations. Integrates with APScheduler to run consolidation operations at specified intervals based on time horizons (daily, weekly, monthly, quarterly, yearly). """ def __init__( self, consolidator: DreamInspiredConsolidator, schedule_config: Dict[str, str], enabled: bool = True ): self.consolidator = consolidator self.schedule_config = schedule_config self.enabled = enabled self.logger = logging.getLogger(__name__) # Job execution tracking self.job_history = [] self.last_execution_times = {} self.execution_stats = { 'total_jobs': 0, 'successful_jobs': 0, 'failed_jobs': 0 } # Initialize scheduler if APScheduler is available if APSCHEDULER_AVAILABLE and enabled: self.scheduler = AsyncIOScheduler( jobstores={'default': MemoryJobStore()}, executors={'default': AsyncIOExecutor()}, job_defaults={ 'coalesce': True, # Combine multiple pending executions 'max_instances': 1, # Only one instance of each job at a time 'misfire_grace_time': 3600 # 1 hour grace period for missed jobs } ) # Add event listeners self.scheduler.add_listener(self._job_executed_listener, EVENT_JOB_EXECUTED) self.scheduler.add_listener(self._job_error_listener, EVENT_JOB_ERROR) else: self.scheduler = None if not APSCHEDULER_AVAILABLE: self.logger.warning("APScheduler not available - consolidation scheduling disabled") elif not enabled: self.logger.info("Consolidation scheduling disabled by configuration") async def start(self) -> bool: """Start the consolidation scheduler.""" if not self.scheduler: return False try: # Add consolidation jobs based on configuration self._schedule_consolidation_jobs() # Start the scheduler self.scheduler.start() self.logger.info("Consolidation scheduler started successfully") # Log scheduled jobs jobs = self.scheduler.get_jobs() for job in jobs: self.logger.info(f"Scheduled job: {job.id} - next run: {job.next_run_time}") return True except Exception as e: self.logger.error(f"Failed to start consolidation scheduler: {e}") return False async def stop(self) -> bool: """Stop the consolidation scheduler.""" if not self.scheduler: return True try: self.scheduler.shutdown(wait=True) self.logger.info("Consolidation scheduler stopped") return True except Exception as e: self.logger.error(f"Error stopping consolidation scheduler: {e}") return False def _schedule_consolidation_jobs(self): """Schedule consolidation jobs based on configuration.""" time_horizons = ['daily', 'weekly', 'monthly', 'quarterly', 'yearly'] for horizon in time_horizons: schedule_spec = self.schedule_config.get(horizon, 'disabled') if schedule_spec == 'disabled': self.logger.debug(f"Consolidation for {horizon} horizon is disabled") continue try: trigger = self._create_trigger(horizon, schedule_spec) if trigger: job_id = f"consolidation_{horizon}" self.scheduler.add_job( func=self._run_consolidation_job, trigger=trigger, args=[horizon], id=job_id, name=f"Consolidation - {horizon.title()}", replace_existing=True ) self.logger.info(f"Scheduled {horizon} consolidation: {schedule_spec}") except Exception as e: self.logger.error(f"Error scheduling {horizon} consolidation: {e}") def _create_trigger(self, horizon: str, schedule_spec: str): """Create APScheduler trigger from schedule specification.""" try: if horizon == 'daily': # Daily format: "HH:MM" (e.g., "02:00") hour, minute = map(int, schedule_spec.split(':')) return CronTrigger(hour=hour, minute=minute) elif horizon == 'weekly': # Weekly format: "DAY HH:MM" (e.g., "SUN 03:00") day_time = schedule_spec.split(' ') if len(day_time) != 2: raise ValueError(f"Invalid weekly schedule format: {schedule_spec}") day_map = { 'MON': 0, 'TUE': 1, 'WED': 2, 'THU': 3, 'FRI': 4, 'SAT': 5, 'SUN': 6 } day = day_map.get(day_time[0].upper()) if day is None: raise ValueError(f"Invalid day: {day_time[0]}") hour, minute = map(int, day_time[1].split(':')) return CronTrigger(day_of_week=day, hour=hour, minute=minute) elif horizon == 'monthly': # Monthly format: "DD HH:MM" (e.g., "01 04:00") day_time = schedule_spec.split(' ') if len(day_time) != 2: raise ValueError(f"Invalid monthly schedule format: {schedule_spec}") day = int(day_time[0]) hour, minute = map(int, day_time[1].split(':')) return CronTrigger(day=day, hour=hour, minute=minute) elif horizon == 'quarterly': # Quarterly format: "MM-DD HH:MM" (e.g., "01-01 05:00") # Run on the first day of quarters (Jan, Apr, Jul, Oct) parts = schedule_spec.split(' ') if len(parts) != 2: raise ValueError(f"Invalid quarterly schedule format: {schedule_spec}") month_day = parts[0].split('-') if len(month_day) != 2: raise ValueError(f"Invalid quarterly date format: {parts[0]}") day = int(month_day[1]) hour, minute = map(int, parts[1].split(':')) # Quarters: Jan(1), Apr(4), Jul(7), Oct(10) return CronTrigger(month='1,4,7,10', day=day, hour=hour, minute=minute) elif horizon == 'yearly': # Yearly format: "MM-DD HH:MM" (e.g., "01-01 06:00") parts = schedule_spec.split(' ') if len(parts) != 2: raise ValueError(f"Invalid yearly schedule format: {schedule_spec}") month_day = parts[0].split('-') if len(month_day) != 2: raise ValueError(f"Invalid yearly date format: {parts[0]}") month = int(month_day[0]) day = int(month_day[1]) hour, minute = map(int, parts[1].split(':')) return CronTrigger(month=month, day=day, hour=hour, minute=minute) else: self.logger.error(f"Unknown time horizon: {horizon}") return None except Exception as e: self.logger.error(f"Error creating trigger for {horizon} with spec '{schedule_spec}': {e}") return None async def _run_consolidation_job(self, time_horizon: str): """Execute a consolidation job for the specified time horizon.""" job_start_time = datetime.now() self.logger.info(f"Starting scheduled {time_horizon} consolidation") try: # Run the consolidation report = await self.consolidator.consolidate(time_horizon) # Record successful execution self.execution_stats['successful_jobs'] += 1 self.last_execution_times[time_horizon] = job_start_time # Add to job history job_record = { 'time_horizon': time_horizon, 'start_time': job_start_time, 'end_time': datetime.now(), 'status': 'success', 'memories_processed': report.memories_processed, 'associations_discovered': report.associations_discovered, 'clusters_created': report.clusters_created, 'memories_compressed': report.memories_compressed, 'memories_archived': report.memories_archived, 'errors': report.errors } self._add_job_to_history(job_record) # Log success duration = (job_record['end_time'] - job_record['start_time']).total_seconds() self.logger.info( f"Completed {time_horizon} consolidation successfully in {duration:.2f}s: " f"{report.memories_processed} memories processed, " f"{report.associations_discovered} associations, " f"{report.clusters_created} clusters, " f"{report.memories_compressed} compressed, " f"{report.memories_archived} archived" ) except Exception as e: # Record failed execution self.execution_stats['failed_jobs'] += 1 job_record = { 'time_horizon': time_horizon, 'start_time': job_start_time, 'end_time': datetime.now(), 'status': 'failed', 'error': str(e), 'memories_processed': 0, 'associations_discovered': 0, 'clusters_created': 0, 'memories_compressed': 0, 'memories_archived': 0, 'errors': [str(e)] } self._add_job_to_history(job_record) self.logger.error(f"Failed {time_horizon} consolidation: {e}") raise def _add_job_to_history(self, job_record: Dict[str, Any]): """Add job record to history with size limit.""" self.job_history.append(job_record) # Keep only last 100 job records if len(self.job_history) > 100: self.job_history = self.job_history[-100:] def _job_executed_listener(self, event): """Handle job execution events.""" self.execution_stats['total_jobs'] += 1 self.logger.debug(f"Job executed: {event.job_id}") def _job_error_listener(self, event): """Handle job error events.""" self.logger.error(f"Job error: {event.job_id} - {event.exception}") async def trigger_consolidation(self, time_horizon: str, immediate: bool = True) -> bool: """Manually trigger a consolidation job.""" if not self.scheduler: self.logger.error("Scheduler not available") return False try: if immediate: # Run immediately await self._run_consolidation_job(time_horizon) return True else: # Schedule to run in 1 minute job_id = f"manual_consolidation_{time_horizon}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" trigger = IntervalTrigger(seconds=60) # Run once after 60 seconds self.scheduler.add_job( func=self._run_consolidation_job, trigger=trigger, args=[time_horizon], id=job_id, name=f"Manual Consolidation - {time_horizon.title()}", max_instances=1 ) self.logger.info(f"Scheduled manual {time_horizon} consolidation") return True except Exception as e: self.logger.error(f"Error triggering {time_horizon} consolidation: {e}") return False async def get_scheduler_status(self) -> Dict[str, Any]: """Get scheduler status and job information.""" if not self.scheduler: return { 'enabled': False, 'reason': 'APScheduler not available or disabled' } jobs = self.scheduler.get_jobs() job_info = [] for job in jobs: job_info.append({ 'id': job.id, 'name': job.name, 'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None, 'trigger': str(job.trigger) }) return { 'enabled': True, 'running': self.scheduler.running, 'jobs': job_info, 'execution_stats': self.execution_stats.copy(), 'last_execution_times': { horizon: time.isoformat() for horizon, time in self.last_execution_times.items() }, 'recent_jobs': self.job_history[-10:] # Last 10 jobs } async def update_schedule(self, new_schedule_config: Dict[str, str]) -> bool: """Update the consolidation schedule.""" if not self.scheduler: return False try: # Remove existing consolidation jobs job_ids = [f"consolidation_{horizon}" for horizon in ['daily', 'weekly', 'monthly', 'quarterly', 'yearly']] for job_id in job_ids: if self.scheduler.get_job(job_id): self.scheduler.remove_job(job_id) # Update configuration self.schedule_config = new_schedule_config # Re-schedule jobs self._schedule_consolidation_jobs() self.logger.info("Consolidation schedule updated successfully") return True except Exception as e: self.logger.error(f"Error updating consolidation schedule: {e}") return False async def pause_consolidation(self, time_horizon: Optional[str] = None) -> bool: """Pause consolidation jobs (all or specific horizon).""" if not self.scheduler: return False try: if time_horizon: job_id = f"consolidation_{time_horizon}" job = self.scheduler.get_job(job_id) if job: self.scheduler.pause_job(job_id) self.logger.info(f"Paused {time_horizon} consolidation") else: self.logger.warning(f"No job found for {time_horizon} consolidation") else: # Pause all consolidation jobs jobs = self.scheduler.get_jobs() for job in jobs: if job.id.startswith('consolidation_'): self.scheduler.pause_job(job.id) self.logger.info("Paused all consolidation jobs") return True except Exception as e: self.logger.error(f"Error pausing consolidation: {e}") return False async def resume_consolidation(self, time_horizon: Optional[str] = None) -> bool: """Resume consolidation jobs (all or specific horizon).""" if not self.scheduler: return False try: if time_horizon: job_id = f"consolidation_{time_horizon}" job = self.scheduler.get_job(job_id) if job: self.scheduler.resume_job(job_id) self.logger.info(f"Resumed {time_horizon} consolidation") else: self.logger.warning(f"No job found for {time_horizon} consolidation") else: # Resume all consolidation jobs jobs = self.scheduler.get_jobs() for job in jobs: if job.id.startswith('consolidation_'): self.scheduler.resume_job(job.id) self.logger.info("Resumed all consolidation jobs") return True except Exception as e: self.logger.error(f"Error resuming consolidation: {e}") return False

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server