registry_persistence.py•9.86 kB
"""
Registry persistence layer for maintaining state across sessions and calls.
"""
import json
import asyncio
from pathlib import Path
from typing import Dict, Any, Optional
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class RegistryPersistence:
"""Manages persistence for dynamic registries."""
def __init__(self, storage_dir: Optional[str] = None):
"""Initialize persistence manager.
Args:
storage_dir: Directory for storing registry data. Defaults to .katamari/registry/
"""
if storage_dir is None:
# Default to .katamari/registry/ in project root
project_root = Path(__file__).parent.parent.parent
storage_path = project_root / ".katamari" / "registry"
else:
storage_path = Path(storage_dir)
self.storage_dir = storage_path
self.storage_dir.mkdir(parents=True, exist_ok=True)
# File paths for different registries
self.capabilities_file = self.storage_dir / "capabilities.json"
self.patterns_file = self.storage_dir / "patterns.json"
self.system_context_file = self.storage_dir / "system_context.json"
self.execution_history_file = self.storage_dir / "execution_history.json"
# Lock for concurrent access
self._lock = asyncio.Lock()
async def save_capabilities(self, capabilities: Dict[str, Any]) -> bool:
"""Save capabilities registry to disk."""
try:
async with self._lock:
data = {
"timestamp": datetime.now().isoformat(),
"capabilities": capabilities
}
with open(self.capabilities_file, 'w') as f:
json.dump(data, f, indent=2, default=str)
logger.debug(f"Saved {len(capabilities)} capabilities to {self.capabilities_file}")
return True
except Exception as e:
logger.error(f"Failed to save capabilities: {e}")
return False
async def load_capabilities(self) -> Optional[Dict[str, Any]]:
"""Load capabilities registry from disk."""
try:
if not self.capabilities_file.exists():
return None
async with self._lock:
with open(self.capabilities_file, 'r') as f:
data = json.load(f)
capabilities = data.get("capabilities", {})
logger.debug(f"Loaded {len(capabilities)} capabilities from {self.capabilities_file}")
return capabilities
except Exception as e:
logger.error(f"Failed to load capabilities: {e}")
return None
async def save_patterns(self, patterns: Dict[str, Any]) -> bool:
"""Save execution patterns registry to disk."""
try:
async with self._lock:
data = {
"timestamp": datetime.now().isoformat(),
"patterns": patterns
}
with open(self.patterns_file, 'w') as f:
json.dump(data, f, indent=2, default=str)
logger.debug(f"Saved {len(patterns)} patterns to {self.patterns_file}")
return True
except Exception as e:
logger.error(f"Failed to save patterns: {e}")
return False
async def load_patterns(self) -> Optional[Dict[str, Any]]:
"""Load execution patterns registry from disk."""
try:
if not self.patterns_file.exists():
return None
async with self._lock:
with open(self.patterns_file, 'r') as f:
data = json.load(f)
patterns = data.get("patterns", {})
logger.debug(f"Loaded {len(patterns)} patterns from {self.patterns_file}")
return patterns
except Exception as e:
logger.error(f"Failed to load patterns: {e}")
return None
async def save_system_context(self, context: Dict[str, Any]) -> bool:
"""Save system context to disk."""
try:
async with self._lock:
data = {
"timestamp": datetime.now().isoformat(),
"context": context
}
with open(self.system_context_file, 'w') as f:
json.dump(data, f, indent=2, default=str)
logger.debug(f"Saved system context to {self.system_context_file}")
return True
except Exception as e:
logger.error(f"Failed to save system context: {e}")
return False
async def load_system_context(self) -> Optional[Dict[str, Any]]:
"""Load system context from disk."""
try:
if not self.system_context_file.exists():
return None
async with self._lock:
with open(self.system_context_file, 'r') as f:
data = json.load(f)
context = data.get("context", {})
logger.debug(f"Loaded system context from {self.system_context_file}")
return context
except Exception as e:
logger.error(f"Failed to load system context: {e}")
return None
async def append_execution_history(self, execution_record: Dict[str, Any]) -> bool:
"""Append execution record to history log."""
try:
async with self._lock:
# Load existing history
history = []
if self.execution_history_file.exists():
with open(self.execution_history_file, 'r') as f:
data = json.load(f)
history = data.get("executions", [])
# Add new record
execution_record["timestamp"] = datetime.now().isoformat()
history.append(execution_record)
# Keep only last 1000 records to prevent file from growing too large
if len(history) > 1000:
history = history[-1000:]
# Save updated history
data = {
"last_updated": datetime.now().isoformat(),
"total_executions": len(history),
"executions": history
}
with open(self.execution_history_file, 'w') as f:
json.dump(data, f, indent=2, default=str)
logger.debug(f"Added execution record to history (total: {len(history)})")
return True
except Exception as e:
logger.error(f"Failed to append execution history: {e}")
return False
async def get_execution_history(self, limit: int = 100) -> list:
"""Get recent execution history."""
try:
if not self.execution_history_file.exists():
return []
async with self._lock:
with open(self.execution_history_file, 'r') as f:
data = json.load(f)
executions = data.get("executions", [])
# Return most recent executions first
return executions[-limit:] if len(executions) > limit else executions
except Exception as e:
logger.error(f"Failed to get execution history: {e}")
return []
async def clear_all_data(self) -> bool:
"""Clear all persisted data (for testing/reset)."""
try:
async with self._lock:
for file_path in [
self.capabilities_file,
self.patterns_file,
self.system_context_file,
self.execution_history_file
]:
if file_path.exists():
file_path.unlink()
logger.info("Cleared all registry persistence data")
return True
except Exception as e:
logger.error(f"Failed to clear persistence data: {e}")
return False
def get_storage_info(self) -> Dict[str, Any]:
"""Get information about storage usage."""
info = {
"storage_dir": str(self.storage_dir),
"files": {}
}
for file_path, name in [
(self.capabilities_file, "capabilities"),
(self.patterns_file, "patterns"),
(self.system_context_file, "system_context"),
(self.execution_history_file, "execution_history")
]:
if file_path.exists():
stat = file_path.stat()
info["files"][name] = {
"exists": True,
"size_bytes": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
}
else:
info["files"][name] = {"exists": False}
return info
# Global persistence instance
_persistence_instance = None
def get_persistence() -> RegistryPersistence:
"""Get global persistence instance."""
global _persistence_instance
if _persistence_instance is None:
_persistence_instance = RegistryPersistence()
return _persistence_instance