Reaper MCP Server

import os from datetime import datetime from typing import Any, Dict, List, Optional from elasticsearch import AsyncElasticsearch class ElasticsearchStorage: """Класс для работы с Elasticsearch""" def __init__(self): self.es = AsyncElasticsearch( [os.getenv("ELASTICSEARCH_URL", "http://elasticsearch:9200")] ) self.indices = {"prompts": "mcp_prompts", "resources": "mcp_resources"} async def initialize(self): """Инициализация индексов""" # Маппинг для промптов prompt_mapping = { "mappings": { "properties": { "name": {"type": "keyword"}, "description": {"type": "text"}, "content": {"type": "text"}, "arguments": {"type": "nested"}, "vector": {"type": "dense_vector", "dims": 384}, "created_at": {"type": "date"}, "updated_at": {"type": "date"}, } } } # Маппинг для ресурсов resource_mapping = { "mappings": { "properties": { "uri": {"type": "keyword"}, "name": {"type": "text"}, "content": {"type": "text"}, "mime_type": {"type": "keyword"}, "vector": {"type": "dense_vector", "dims": 384}, "metadata": {"type": "object"}, } } } # Создаем индексы если их нет for index_name in self.indices.values(): if not await self.es.indices.exists(index=index_name): mapping = ( prompt_mapping if "prompts" in index_name else resource_mapping ) await self.es.indices.create(index=index_name, body=mapping) async def close(self): """Закрытие соединения""" await self.es.close() # Методы для работы с промптами async def index_prompt(self, prompt_data: Dict[str, Any]) -> str: """Индексация промпта""" prompt_data["created_at"] = datetime.utcnow() prompt_data["updated_at"] = datetime.utcnow() result = await self.es.index( index=self.indices["prompts"], document=prompt_data ) return result["_id"] async def get_prompt(self, prompt_id: str) -> Optional[Dict[str, Any]]: """Получение промпта по ID""" try: result = await self.es.get( index=self.indices["prompts"], id=prompt_id ) return result["_source"] except: return None async def search_prompts( self, query: str, size: int = 10 ) -> List[Dict[str, Any]]: """Поиск промптов""" body = { "query": { "multi_match": { "query": query, "fields": ["name^2", "description", "content"], } }, "size": size, } result = await self.es.search(index=self.indices["prompts"], body=body) return [hit["_source"] for hit in result["hits"]["hits"]] # Методы для работы с ресурсами async def index_resource(self, resource_data: Dict[str, Any]) -> str: """Индексация ресурса""" result = await self.es.index( index=self.indices["resources"], document=resource_data ) return result["_id"] async def get_resource( self, resource_uri: str ) -> Optional[Dict[str, Any]]: """Получение ресурса по URI""" try: result = await self.es.search( index=self.indices["resources"], body={"query": {"term": {"uri.keyword": resource_uri}}}, ) hits = result["hits"]["hits"] return hits[0]["_source"] if hits else None except: return None async def search_resources( self, query: str, mime_type: Optional[str] = None, size: int = 10 ) -> List[Dict[str, Any]]: """Поиск ресурсов""" should_queries = [ {"multi_match": {"query": query, "fields": ["name^2", "content"]}} ] if mime_type: should_queries.append({"term": {"mime_type.keyword": mime_type}}) body = { "query": { "bool": {"should": should_queries, "minimum_should_match": 1} }, "size": size, } result = await self.es.search( index=self.indices["resources"], body=body ) return [hit["_source"] for hit in result["hits"]["hits"]] async def delete_resource(self, resource_uri: str) -> bool: """Удаление ресурса""" try: await self.es.delete_by_query( index=self.indices["resources"], body={"query": {"term": {"uri.keyword": resource_uri}}}, ) return True except: return False # Создаем глобальный экземпляр es_storage = ElasticsearchStorage()