Reaper MCP Server

import asyncio from contextlib import asynccontextmanager from typing import Any, AsyncGenerator, Dict, List, Optional import jsonschema from fastapi import HTTPException from graphql import GraphQLSchema, graphql from app.core.base_sampling import BaseSampler, KeywordSampler, MLSampler from app.models.mcp import (ContentType, MCPError, Message, MessageContent, MessageRole, ModelPreferences, Prompt, Resource, SamplingRequest, SamplingResponse, Tool, mcp_schema) from app.utils.prompt_loader import prompt_loader class MCPService: """Сервис для обработки MCP запросов Provides: - Resource management - Tool execution - Prompt handling - Sampling support - GraphQL API """ def __init__(self): self.tools: Dict[str, Tool] = {} self.resources: Dict[str, Resource] = {} self.prompts: Dict[str, Prompt] = {} self.subscriptions: Dict[str, List[AsyncGenerator]] = {} self.samplers: Dict[str, BaseSampler] = {} # Resource Management async def register_resource(self, resource: Resource) -> None: """Регистрация нового ресурса""" self.resources[resource.uri] = resource async def get_resource(self, uri: str) -> Optional[Resource]: """Получение ресурса по URI""" return self.resources.get(uri) async def list_resources(self) -> List[Resource]: """Список всех доступных ресурсов""" return list(self.resources.values()) @asynccontextmanager async def subscribe_to_resource(self, uri: str): """Подписка на обновления ресурса""" if uri not in self.subscriptions: self.subscriptions[uri] = [] queue = asyncio.Queue() self.subscriptions[uri].append(queue) try: yield queue finally: self.subscriptions[uri].remove(queue) # Tool Management async def register_tool(self, tool: Tool) -> None: """Регистрация нового инструмента""" self.tools[tool.name] = tool async def get_tool(self, name: str) -> Optional[Tool]: """Получение инструмента по имени""" return self.tools.get(name) async def list_tools(self) -> Dict[str, Tool]: """Список всех доступных инструментов""" return self.tools async def execute_tool( self, name: str, parameters: Dict[str, Any] ) -> Dict[str, Any]: """Выполнение инструмента""" tool = await self.get_tool(name) if not tool: raise HTTPException( status_code=404, detail=f"Tool '{name}' not found" ) # Validate parameters against schema try: jsonschema.validate(parameters, tool.input_schema) except jsonschema.exceptions.ValidationError as e: raise HTTPException(status_code=400, detail=str(e)) # Execute tool logic try: result = await self._execute_tool_logic(tool, parameters) return {"content": [{"type": "text", "text": str(result)}]} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Prompt Management async def register_prompt(self, prompt: Prompt) -> None: """Регистрация нового промпта""" self.prompts[prompt.name] = prompt async def get_prompt(self, name: str) -> Optional[Prompt]: """Получение промпта по имени""" return self.prompts.get(name) async def list_prompts(self) -> List[Prompt]: """Список всех доступных промптов""" return list(self.prompts.values()) async def execute_prompt( self, name: str, arguments: Dict[str, Any] ) -> List[Message]: """Выполнение промпта""" prompt = await self.get_prompt(name) if not prompt: raise HTTPException( status_code=404, detail=f"Prompt '{name}' not found" ) # Validate required arguments if prompt.arguments: required = [arg.name for arg in prompt.arguments if arg.required] missing = [arg for arg in required if arg not in arguments] if missing: raise HTTPException( status_code=400, detail=f"Missing required arguments: {', '.join(missing)}", ) # Generate messages return await self._generate_prompt_messages(prompt, arguments) # Sampling Support async def register_sampler(self, sampler: BaseSampler) -> None: """Регистрация нового сэмплера""" self.samplers[sampler.name] = sampler async def get_sampler(self, name: str) -> Optional[BaseSampler]: """Получение сэмплера по имени""" return self.samplers.get(name) async def list_samplers(self) -> List[BaseSampler]: """Список всех доступных сэмплеров""" return list(self.samplers.values()) async def create_sampling( self, request: SamplingRequest ) -> SamplingResponse: """Создание сэмплинга для LLM""" sampler = await self.get_sampler("keyword_sampler") if not sampler: raise HTTPException(status_code=500, detail="No sampler available") prepared_request = await sampler.execute(request.dict()) # Implement sampling logic here raise NotImplementedError("Sampling not implemented") # GraphQL API async def handle_graphql( self, query: str, variables: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Обработка GraphQL запроса""" from app.models.mcp import mcp_schema result = await graphql( mcp_schema, query, variable_values=variables ) if result.errors: raise HTTPException(status_code=400, detail=str(result.errors[0])) return result.data # Private methods async def _execute_tool_logic( self, tool: Tool, parameters: Dict[str, Any] ) -> Any: """Execute tool logic""" try: result = await tool.execute(parameters) return result except Exception as e: raise HTTPException( status_code=500, detail=f"Tool execution failed: {str(e)}" ) async def _generate_prompt_messages( self, prompt: Prompt, arguments: Dict[str, Any] ) -> List[Message]: """Генерация сообщений для промпта""" raise NotImplementedError( f"Prompt '{prompt.name}' generation not implemented" ) async def _notify_resource_subscribers( self, uri: str, content: Any ) -> None: """Оповещение подписчиков об обновлении ресурса""" if uri in self.subscriptions: for queue in self.subscriptions[uri]: await queue.put(content) # Создаем глобальный экземпляр сервиса mcp_service = MCPService() # Регистрируем сэмплеры async def register_samplers(): samplers = [KeywordSampler(), MLSampler()] for sampler in samplers: await sampler.initialize() await mcp_service.register_sampler(sampler) # Запускаем регистрацию сэмплеров asyncio.create_task(register_samplers())