Reaper MCP Server
by dschuler36
- scripts
#!/usr/bin/env python3
import os
import sys
import asyncio
import importlib.util
from pathlib import Path
from typing import List
import click
from elasticsearch import AsyncElasticsearch
import aioredis
import logging
# Настройка логирования
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class StorageManager:
def __init__(self):
self.es = AsyncElasticsearch(
[os.getenv("ELASTICSEARCH_URL", "http://elasticsearch:9200")]
)
self.redis = aioredis.from_url(
os.getenv("REDIS_URL", "redis://redis:6379"),
encoding="utf-8",
decode_responses=True,
)
async def close(self):
await self.es.close()
await self.redis.close()
async def check_connections(self) -> bool:
"""Проверка подключений к ES и Redis"""
try:
# Проверяем ES
es_health = await self.es.cluster.health()
logger.info(f"Elasticsearch status: {es_health['status']}")
# Проверяем Redis
redis_info = await self.redis.info()
logger.info(f"Redis version: {redis_info['redis_version']}")
return True
except Exception as e:
logger.error(f"Connection check failed: {str(e)}")
return False
async def get_migrations(self) -> List[str]:
"""Получение списка выполненных миграций"""
try:
result = await self.es.search(
index=".migrations", body={"sort": [{"executed_at": "desc"}]}
)
return [hit["_id"] for hit in result["hits"]["hits"]]
except:
return []
async def run_migration(self, migration_file: Path, direction: str = "up") -> bool:
"""Запуск миграции"""
try:
# Импортируем модуль миграции
spec = importlib.util.spec_from_file_location(
migration_file.stem, migration_file
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Выполняем миграцию
if direction == "up":
await module.migrate_up()
else:
await module.migrate_down()
return True
except Exception as e:
logger.error(f"Migration failed: {str(e)}")
return False
async def clear_redis(self) -> None:
"""Очистка Redis"""
await self.redis.flushall()
logger.info("Redis cleared")
@click.group()
def cli():
"""Утилита для управления хранилищем данных"""
pass
@cli.command()
def check():
"""Проверка подключений к ES и Redis"""
async def run():
manager = StorageManager()
try:
result = await manager.check_connections()
if not result:
sys.exit(1)
finally:
await manager.close()
asyncio.run(run())
@cli.command()
@click.option("--direction", type=click.Choice(["up", "down"]), default="up")
def migrate(direction):
"""Запуск миграций"""
async def run():
manager = StorageManager()
try:
# Получаем список миграций
migrations_dir = (
Path(__file__).parent.parent / "migrations" / "elasticsearch"
)
migration_files = sorted(migrations_dir.glob("*.py"))
if direction == "up":
# Выполняем миграции вперед
executed = await manager.get_migrations()
for migration_file in migration_files:
if migration_file.stem not in executed:
logger.info(f"Running migration: {migration_file.name}")
if await manager.run_migration(migration_file, "up"):
logger.info("Migration successful")
else:
logger.error("Migration failed")
sys.exit(1)
else:
# Откатываем миграции
for migration_file in reversed(migration_files):
logger.info(f"Rolling back migration: {migration_file.name}")
if await manager.run_migration(migration_file, "down"):
logger.info("Rollback successful")
else:
logger.error("Rollback failed")
sys.exit(1)
finally:
await manager.close()
asyncio.run(run())
@cli.command()
def clear_cache():
"""Очистка кэша Redis"""
async def run():
manager = StorageManager()
try:
await manager.clear_redis()
finally:
await manager.close()
asyncio.run(run())
if __name__ == "__main__":
cli()