Skip to main content
Glama

Katamari MCP Server

by ciphernaut
registry_persistence.py9.86 kB
""" Registry persistence layer for maintaining state across sessions and calls. """ import json import asyncio from pathlib import Path from typing import Dict, Any, Optional from datetime import datetime import logging logger = logging.getLogger(__name__) class RegistryPersistence: """Manages persistence for dynamic registries.""" def __init__(self, storage_dir: Optional[str] = None): """Initialize persistence manager. Args: storage_dir: Directory for storing registry data. Defaults to .katamari/registry/ """ if storage_dir is None: # Default to .katamari/registry/ in project root project_root = Path(__file__).parent.parent.parent storage_path = project_root / ".katamari" / "registry" else: storage_path = Path(storage_dir) self.storage_dir = storage_path self.storage_dir.mkdir(parents=True, exist_ok=True) # File paths for different registries self.capabilities_file = self.storage_dir / "capabilities.json" self.patterns_file = self.storage_dir / "patterns.json" self.system_context_file = self.storage_dir / "system_context.json" self.execution_history_file = self.storage_dir / "execution_history.json" # Lock for concurrent access self._lock = asyncio.Lock() async def save_capabilities(self, capabilities: Dict[str, Any]) -> bool: """Save capabilities registry to disk.""" try: async with self._lock: data = { "timestamp": datetime.now().isoformat(), "capabilities": capabilities } with open(self.capabilities_file, 'w') as f: json.dump(data, f, indent=2, default=str) logger.debug(f"Saved {len(capabilities)} capabilities to {self.capabilities_file}") return True except Exception as e: logger.error(f"Failed to save capabilities: {e}") return False async def load_capabilities(self) -> Optional[Dict[str, Any]]: """Load capabilities registry from disk.""" try: if not self.capabilities_file.exists(): return None async with self._lock: with open(self.capabilities_file, 'r') as f: data = json.load(f) capabilities = data.get("capabilities", {}) logger.debug(f"Loaded {len(capabilities)} capabilities from {self.capabilities_file}") return capabilities except Exception as e: logger.error(f"Failed to load capabilities: {e}") return None async def save_patterns(self, patterns: Dict[str, Any]) -> bool: """Save execution patterns registry to disk.""" try: async with self._lock: data = { "timestamp": datetime.now().isoformat(), "patterns": patterns } with open(self.patterns_file, 'w') as f: json.dump(data, f, indent=2, default=str) logger.debug(f"Saved {len(patterns)} patterns to {self.patterns_file}") return True except Exception as e: logger.error(f"Failed to save patterns: {e}") return False async def load_patterns(self) -> Optional[Dict[str, Any]]: """Load execution patterns registry from disk.""" try: if not self.patterns_file.exists(): return None async with self._lock: with open(self.patterns_file, 'r') as f: data = json.load(f) patterns = data.get("patterns", {}) logger.debug(f"Loaded {len(patterns)} patterns from {self.patterns_file}") return patterns except Exception as e: logger.error(f"Failed to load patterns: {e}") return None async def save_system_context(self, context: Dict[str, Any]) -> bool: """Save system context to disk.""" try: async with self._lock: data = { "timestamp": datetime.now().isoformat(), "context": context } with open(self.system_context_file, 'w') as f: json.dump(data, f, indent=2, default=str) logger.debug(f"Saved system context to {self.system_context_file}") return True except Exception as e: logger.error(f"Failed to save system context: {e}") return False async def load_system_context(self) -> Optional[Dict[str, Any]]: """Load system context from disk.""" try: if not self.system_context_file.exists(): return None async with self._lock: with open(self.system_context_file, 'r') as f: data = json.load(f) context = data.get("context", {}) logger.debug(f"Loaded system context from {self.system_context_file}") return context except Exception as e: logger.error(f"Failed to load system context: {e}") return None async def append_execution_history(self, execution_record: Dict[str, Any]) -> bool: """Append execution record to history log.""" try: async with self._lock: # Load existing history history = [] if self.execution_history_file.exists(): with open(self.execution_history_file, 'r') as f: data = json.load(f) history = data.get("executions", []) # Add new record execution_record["timestamp"] = datetime.now().isoformat() history.append(execution_record) # Keep only last 1000 records to prevent file from growing too large if len(history) > 1000: history = history[-1000:] # Save updated history data = { "last_updated": datetime.now().isoformat(), "total_executions": len(history), "executions": history } with open(self.execution_history_file, 'w') as f: json.dump(data, f, indent=2, default=str) logger.debug(f"Added execution record to history (total: {len(history)})") return True except Exception as e: logger.error(f"Failed to append execution history: {e}") return False async def get_execution_history(self, limit: int = 100) -> list: """Get recent execution history.""" try: if not self.execution_history_file.exists(): return [] async with self._lock: with open(self.execution_history_file, 'r') as f: data = json.load(f) executions = data.get("executions", []) # Return most recent executions first return executions[-limit:] if len(executions) > limit else executions except Exception as e: logger.error(f"Failed to get execution history: {e}") return [] async def clear_all_data(self) -> bool: """Clear all persisted data (for testing/reset).""" try: async with self._lock: for file_path in [ self.capabilities_file, self.patterns_file, self.system_context_file, self.execution_history_file ]: if file_path.exists(): file_path.unlink() logger.info("Cleared all registry persistence data") return True except Exception as e: logger.error(f"Failed to clear persistence data: {e}") return False def get_storage_info(self) -> Dict[str, Any]: """Get information about storage usage.""" info = { "storage_dir": str(self.storage_dir), "files": {} } for file_path, name in [ (self.capabilities_file, "capabilities"), (self.patterns_file, "patterns"), (self.system_context_file, "system_context"), (self.execution_history_file, "execution_history") ]: if file_path.exists(): stat = file_path.stat() info["files"][name] = { "exists": True, "size_bytes": stat.st_size, "modified": datetime.fromtimestamp(stat.st_mtime).isoformat() } else: info["files"][name] = {"exists": False} return info # Global persistence instance _persistence_instance = None def get_persistence() -> RegistryPersistence: """Get global persistence instance.""" global _persistence_instance if _persistence_instance is None: _persistence_instance = RegistryPersistence() return _persistence_instance

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server