Skip to main content
Glama
redis.py4.72 kB
import json import os from typing import Any, Dict, List, Optional import redis.asyncio as redis class RedisStorage: """Класс для работы с Redis""" def __init__(self): self.redis = redis.from_url( os.getenv("REDIS_URL", "redis://redis:6379"), encoding="utf-8", decode_responses=True, ) self.default_ttl = 3600 # 1 час async def close(self): """Закрытие соединения""" await self.redis.close() # Методы для работы с промптами async def cache_prompt( self, prompt_id: str, data: Dict[str, Any], ttl: int = None ) -> None: """Кэширование промпта""" key = f"prompt:{prompt_id}" await self.redis.set(key, json.dumps(data), ex=ttl or self.default_ttl) # Добавляем в список последних промптов await self.redis.lpush("prompt:recent", prompt_id) await self.redis.ltrim("prompt:recent", 0, 99) # Храним только 100 последних async def get_cached_prompt(self, prompt_id: str) -> Optional[Dict[str, Any]]: """Получение промпта из кэша""" key = f"prompt:{prompt_id}" data = await self.redis.get(key) return json.loads(data) if data else None async def get_recent_prompts(self, limit: int = 10) -> List[str]: """Получение списка последних промптов""" return await self.redis.lrange("prompt:recent", 0, limit - 1) # Методы для работы с ресурсами async def cache_resource( self, uri: str, data: Dict[str, Any], ttl: int = None ) -> None: """Кэширование ресурса""" key = f"resource:{uri}" await self.redis.set(key, json.dumps(data), ex=ttl or self.default_ttl) async def get_cached_resource(self, uri: str) -> Optional[Dict[str, Any]]: """Получение ресурса из кэша""" key = f"resource:{uri}" data = await self.redis.get(key) return json.loads(data) if data else None async def increment_resource_usage(self, uri: str) -> int: """Увеличение счетчика использования ресурса""" key = f"resource:usage:{uri}" count = await self.redis.incr(key) # Добавляем в сортированное множество популярных ресурсов await self.redis.zadd("resource:popular", {uri: count}) return count async def get_popular_resources(self, limit: int = 10) -> List[str]: """Получение списка популярных ресурсов""" return await self.redis.zrevrange("resource:popular", 0, limit - 1) # Методы для работы с сессиями async def create_session( self, session_id: str, data: Dict[str, Any], ttl: int = 3600 ) -> None: """Создание сессии""" key = f"session:{session_id}" await self.redis.set(key, json.dumps(data), ex=ttl) async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """Получение сессии""" key = f"session:{session_id}" data = await self.redis.get(key) return json.loads(data) if data else None async def update_session( self, session_id: str, data: Dict[str, Any], ttl: int = 3600 ) -> None: """Обновление сессии""" await self.create_session(session_id, data, ttl) async def delete_session(self, session_id: str) -> None: """Удаление сессии""" key = f"session:{session_id}" await self.redis.delete(key) # Методы для работы с rate limiting async def check_rate_limit(self, key: str, limit: int, window: int) -> bool: """Проверка rate limit Args: key: Ключ для rate limit (например, "ip:123.123.123.123") limit: Максимальное количество запросов window: Временное окно в секундах """ current = await self.redis.get(f"ratelimit:{key}") if not current: await self.redis.set(f"ratelimit:{key}", 1, ex=window) return True count = int(current) if count >= limit: return False await self.redis.incr(f"ratelimit:{key}") return True # Создаем глобальный экземпляр redis_storage = RedisStorage()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/eagurin/myaiserv'

If you have feedback or need assistance with the MCP directory API, please join our Discord server