Reaper MCP Server

import json import os from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Union import aioredis class RedisStorage: """Класс для работы с Redis""" def __init__(self): self.redis = aioredis.from_url( os.getenv("REDIS_URL", "redis://redis:6379"), encoding="utf-8", decode_responses=True, ) self.default_ttl = 3600 # 1 час async def close(self): """Закрытие соединения""" await self.redis.close() # Методы для работы с промптами async def cache_prompt( self, prompt_id: str, data: Dict[str, Any], ttl: int = None ) -> None: """Кэширование промпта""" key = f"prompt:{prompt_id}" await self.redis.set(key, json.dumps(data), ex=ttl or self.default_ttl) # Добавляем в список последних промптов await self.redis.lpush("prompt:recent", prompt_id) await self.redis.ltrim( "prompt:recent", 0, 99 ) # Храним только 100 последних async def get_cached_prompt( self, prompt_id: str ) -> Optional[Dict[str, Any]]: """Получение промпта из кэша""" key = f"prompt:{prompt_id}" data = await self.redis.get(key) return json.loads(data) if data else None async def get_recent_prompts(self, limit: int = 10) -> List[str]: """Получение списка последних промптов""" return await self.redis.lrange("prompt:recent", 0, limit - 1) # Методы для работы с ресурсами async def cache_resource( self, uri: str, data: Dict[str, Any], ttl: int = None ) -> None: """Кэширование ресурса""" key = f"resource:{uri}" await self.redis.set(key, json.dumps(data), ex=ttl or self.default_ttl) async def get_cached_resource(self, uri: str) -> Optional[Dict[str, Any]]: """Получение ресурса из кэша""" key = f"resource:{uri}" data = await self.redis.get(key) return json.loads(data) if data else None async def increment_resource_usage(self, uri: str) -> int: """Увеличение счетчика использования ресурса""" key = f"resource:usage:{uri}" count = await self.redis.incr(key) # Добавляем в сортированное множество популярных ресурсов await self.redis.zadd("resource:popular", {uri: count}) return count async def get_popular_resources(self, limit: int = 10) -> List[str]: """Получение списка популярных ресурсов""" return await self.redis.zrevrange("resource:popular", 0, limit - 1) # Методы для работы с сессиями async def create_session( self, session_id: str, data: Dict[str, Any], ttl: int = 3600 ) -> None: """Создание сессии""" key = f"session:{session_id}" await self.redis.set(key, json.dumps(data), ex=ttl) async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """Получение сессии""" key = f"session:{session_id}" data = await self.redis.get(key) return json.loads(data) if data else None async def update_session( self, session_id: str, data: Dict[str, Any], ttl: int = 3600 ) -> None: """Обновление сессии""" await self.create_session(session_id, data, ttl) async def delete_session(self, session_id: str) -> None: """Удаление сессии""" key = f"session:{session_id}" await self.redis.delete(key) # Методы для работы с rate limiting async def check_rate_limit( self, key: str, limit: int, window: int ) -> bool: """Проверка rate limit Args: key: Ключ для rate limit (например, "ip:123.123.123.123") limit: Максимальное количество запросов window: Временное окно в секундах """ current = await self.redis.get(f"ratelimit:{key}") if not current: await self.redis.set(f"ratelimit:{key}", 1, ex=window) return True count = int(current) if count >= limit: return False await self.redis.incr(f"ratelimit:{key}") return True # Создаем глобальный экземпляр redis_storage = RedisStorage()