Skip to main content
Glama

Agentic AI System MCP Server

by saikumarjash
state_store.py2.04 kB
import json from typing import Dict, Any, Optional from datetime import datetime import aioredis class StateStore: """Multi-tier state management system.""" def __init__(self, redis_url: str = "redis://localhost"): self.redis_url = redis_url self.redis_client = None self.local_cache: Dict[str, Any] = {} async def initialize(self): """Initialize connections.""" self.redis_client = await aioredis.from_url(self.redis_url) async def save_state(self, key: str, state: Dict[str, Any], ttl: int = None): """Save state with optional TTL.""" # Save to local cache self.local_cache[key] = state # Save to Redis if self.redis_client: serialized = json.dumps(state) if ttl: await self.redis_client.setex(key, ttl, serialized) else: await self.redis_client.set(key, serialized) return True async def load_state(self, key: str) -> Optional[Dict[str, Any]]: """Load state from storage.""" # Check local cache first if key in self.local_cache: return self.local_cache[key] # Check Redis if self.redis_client: data = await self.redis_client.get(key) if data: state = json.loads(data) self.local_cache[key] = state return state return None async def delete_state(self, key: str): """Delete state from all stores.""" if key in self.local_cache: del self.local_cache[key] if self.redis_client: await self.redis_client.delete(key) async def create_snapshot(self, snapshot_id: str) -> bool: """Create a system-wide snapshot.""" snapshot = { "id": snapshot_id, "timestamp": datetime.utcnow().isoformat(), "cache": self.local_cache, } await self.save_state(f"snapshot:{snapshot_id}", snapshot) return True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/saikumarjash/mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server