Reaper MCP Server
by dschuler36
- app
- tools
import json
import os
import platform
from datetime import datetime
import psutil
from app.core.base_resources import (APIResource, FileResource, JSONResource,
MemoryResource)
from app.services.mcp_service import mcp_service
class SystemInfoResource(MemoryResource):
"""Ресурс с системной информацией"""
async def initialize(self) -> None:
await super().initialize()
# Обновляем системную информацию при инициализации
await self.update_system_info()
async def update_system_info(self) -> None:
"""Обновление системной информации"""
info = {
"os": platform.system(),
"python_version": platform.python_version(),
"cpu_count": psutil.cpu_count(),
"memory_total": psutil.virtual_memory().total,
"memory_available": psutil.virtual_memory().available,
"disk_usage": psutil.disk_usage("/").percent,
"timestamp": datetime.now().isoformat(),
}
await self.write(info)
class ApplicationLogsResource(FileResource):
"""Ресурс с логами приложения"""
async def initialize(self) -> None:
"""Создаем директорию для логов при необходимости"""
await super().initialize()
if not os.path.exists(self.path):
await self.write("Log file initialized")
class APIDocsResource(JSONResource):
"""Ресурс с OpenAPI документацией"""
async def initialize(self) -> None:
await super().initialize()
# Инициализируем базовую OpenAPI документацию
api_docs = {
"openapi": "3.0.0",
"info": {"title": "MCP Server API", "version": "1.0.0"},
"paths": {
"/tools": {
"get": {
"summary": "List available tools",
"responses": {"200": {"description": "List of tools"}},
}
}
},
}
await self.write(api_docs)
class DBSchemaResource(JSONResource):
"""Ресурс со схемой базы данных"""
async def initialize(self) -> None:
await super().initialize()
# Инициализируем базовую схему
schema = {
"tables": [
{
"name": "users",
"columns": [
{"name": "id", "type": "integer", "primary_key": True},
{"name": "username", "type": "varchar(255)"},
{"name": "email", "type": "varchar(255)"},
],
}
]
}
await self.write(schema)
async def register_resources():
"""Регистрация всех ресурсов"""
# Создаем экземпляры ресурсов
resources = [
SystemInfoResource(
name="System Information", mime_type="application/json"
),
ApplicationLogsResource(
name="Application Logs",
path=os.path.join(os.getcwd(), "logs", "app.log"),
mime_type="text/plain",
),
APIDocsResource(
name="API Documentation",
path=os.path.join(os.getcwd(), "docs", "api.json"),
),
DBSchemaResource(
name="Database Schema",
path=os.path.join(os.getcwd(), "docs", "schema.json"),
),
]
# Инициализируем и регистрируем ресурсы
for resource in resources:
await resource.initialize()
await mcp_service.register_resource(resource)
# Регистрируем обработчик чтения ресурсов
async def resource_reader(uri: str) -> dict:
"""Обработчик чтения ресурсов"""
resource = next((r for r in resources if r.uri == uri), None)
if not resource:
return {
"uri": uri,
"error": "Resource not found",
"timestamp": datetime.now().isoformat(),
}
try:
content = await resource.read()
return {
"uri": uri,
"content": content,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
return {
"uri": uri,
"error": str(e),
"timestamp": datetime.now().isoformat(),
}
# Устанавливаем обработчик чтения ресурсов
mcp_service._read_resource = resource_reader
# Регистрируем ресурсы при импорте
import asyncio
asyncio.create_task(register_resources())