state_store.py•2.04 kB
import json
from typing import Dict, Any, Optional
from datetime import datetime
import aioredis
class StateStore:
"""Multi-tier state management system."""
def __init__(self, redis_url: str = "redis://localhost"):
self.redis_url = redis_url
self.redis_client = None
self.local_cache: Dict[str, Any] = {}
async def initialize(self):
"""Initialize connections."""
self.redis_client = await aioredis.from_url(self.redis_url)
async def save_state(self, key: str, state: Dict[str, Any], ttl: int = None):
"""Save state with optional TTL."""
# Save to local cache
self.local_cache[key] = state
# Save to Redis
if self.redis_client:
serialized = json.dumps(state)
if ttl:
await self.redis_client.setex(key, ttl, serialized)
else:
await self.redis_client.set(key, serialized)
return True
async def load_state(self, key: str) -> Optional[Dict[str, Any]]:
"""Load state from storage."""
# Check local cache first
if key in self.local_cache:
return self.local_cache[key]
# Check Redis
if self.redis_client:
data = await self.redis_client.get(key)
if data:
state = json.loads(data)
self.local_cache[key] = state
return state
return None
async def delete_state(self, key: str):
"""Delete state from all stores."""
if key in self.local_cache:
del self.local_cache[key]
if self.redis_client:
await self.redis_client.delete(key)
async def create_snapshot(self, snapshot_id: str) -> bool:
"""Create a system-wide snapshot."""
snapshot = {
"id": snapshot_id,
"timestamp": datetime.utcnow().isoformat(),
"cache": self.local_cache,
}
await self.save_state(f"snapshot:{snapshot_id}", snapshot)
return True