Skip to main content
Glama
main.py17.2 kB
from fastapi import FastAPI, Request, Response, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware from fastapi.responses import JSONResponse from contextlib import asynccontextmanager import structlog import time import uuid from typing import Dict, Any from app.config import settings from app.exceptions import MCPException, ValidationException, RateLimitException from app.core.database import init_db, close_db from app.core.redis_client import redis_client # ------------------------ ЛОГИРОВАНИЕ ------------------------ # structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer(), ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) logger = structlog.get_logger() # ------------------------ LIFESPAN ------------------------ # @asynccontextmanager async def lifespan(app: FastAPI): logger.info("Starting MCP Business AI Server", version=settings.app_version) # DB и Redis считаем опциональными – если их нет локально, просто логируем и идём дальше try: await init_db() logger.info("Database initialized") except Exception as e: logger.warning("Failed to initialize database, continuing without DB", error=str(e)) try: await redis_client.connect() logger.info("Redis client connected") except Exception as e: logger.warning("Failed to connect Redis, continuing without Redis", error=str(e)) yield try: await redis_client.disconnect() logger.info("Redis client disconnected") except Exception as e: logger.warning("Failed to disconnect Redis", error=str(e)) try: await close_db() logger.info("Database connections closed") except Exception as e: logger.warning("Failed to close database", error=str(e)) logger.info("Shutting down MCP Business AI Server") # ------------------------ APP ------------------------ # app = FastAPI( title=settings.app_name, version=settings.app_version, description="MCP server for business AI transformation", docs_url="/docs" if settings.debug else None, redoc_url="/redoc" if settings.debug else None, lifespan=lifespan, ) # CORS / Trusted hosts app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.add_middleware( TrustedHostMiddleware, allowed_hosts=["*"], ) # ------------------------ SAFE MIDDLEWARES ------------------------ # def _safe_add_middleware(mw_name: str, import_path: str, cls_name: str): """ Пытаемся подключить middleware. Если нет зависимостей (jwt, opentelemetry и т.п.) — не валим всё приложение, а просто логируем предупреждение. """ try: module = __import__(import_path, fromlist=[cls_name]) cls = getattr(module, cls_name) app.add_middleware(cls) logger.info("Middleware enabled", middleware=mw_name) except Exception as e: logger.warning("Middleware disabled", middleware=mw_name, error=str(e)) # Эти middlewares зависят от jwt, opentelemetry и прочего. Подключаем по возможности. _safe_add_middleware("auth", "app.middleware.auth", "AuthMiddleware") _safe_add_middleware("rate_limiting", "app.middleware.rate_limiting", "RateLimitingMiddleware") _safe_add_middleware("correlation", "app.middleware.correlation", "CorrelationMiddleware") _safe_add_middleware("metrics", "app.middleware.metrics", "MetricsMiddleware") # ------------------------ ROUTERS ------------------------ # # Обязательный роутер – tools (инструменты MCP) from app.api.v1 import tools app.include_router(tools.router, prefix="/api/v1/tools", tags=["tools"]) def _safe_include_router(module_path: str, tag: str, prefix: str): """ Подключаем роутер, если модуль есть. Если нет (resource_manager, agent_system и т.п.) — логируем и идём дальше. """ try: module = __import__(module_path, fromlist=["router"]) router = getattr(module, "router") app.include_router(router, prefix=prefix, tags=[tag]) logger.info("Router enabled", module=module_path, prefix=prefix) except Exception as e: logger.warning("Router disabled", module=module_path, error=str(e)) # Необязательные роутеры – падают, если нет resource_manager / agent_system. _safe_include_router("app.api.v1.resources", "resources", "/api/v1/resources") _safe_include_router("app.api.v1.admin", "admin", "/api/v1/admin") # ------------------------ ЛОГИРОВАНИЕ ЗАПРОСОВ ------------------------ # @app.middleware("http") async def log_requests(request: Request, call_next): start_time = time.time() correlation_id = getattr(request.state, "correlation_id", None) or str(uuid.uuid4()) logger.info( "Request started", method=request.method, path=request.url.path, correlation_id=correlation_id, ) try: response: Response = await call_next(request) except Exception as e: logger.error( "Unhandled exception during request", method=request.method, path=request.url.path, error=str(e), correlation_id=correlation_id, ) raise process_time = time.time() - start_time logger.info( "Request completed", method=request.method, path=request.url.path, status_code=response.status_code, duration_ms=round(process_time * 1000, 2), correlation_id=correlation_id, ) return response # ------------------------ HANDLERS EXCEPTIONS ------------------------ # @app.exception_handler(MCPException) async def mcp_exception_handler(request: Request, exc: MCPException): logger.error( "MCP exception occurred", error_code=exc.error_code, message=exc.message, correlation_id=getattr(request.state, "correlation_id", None), ) return JSONResponse( status_code=exc.status_code, content={ "error": { "code": exc.error_code, "message": exc.message, "details": exc.details, } }, ) @app.exception_handler(ValidationException) async def validation_exception_handler(request: Request, exc: ValidationException): logger.warning( "Validation error", field=exc.field, message=exc.message, correlation_id=getattr(request.state, "correlation_id", None), ) return JSONResponse( status_code=400, content={ "error": { "code": "VALIDATION_ERROR", "message": exc.message, "field": exc.field, } }, ) @app.exception_handler(RateLimitException) async def rate_limit_exception_handler(request: Request, exc: RateLimitException): logger.warning( "Rate limit exceeded", limit=exc.limit, window=exc.window, retry_after=exc.retry_after, correlation_id=getattr(request.state, "correlation_id", None), ) return JSONResponse( status_code=429, content={ "error": { "code": "RATE_LIMIT_EXCEEDED", "message": "Too many requests", "details": { "limit": exc.limit, "window": exc.window, "retry_after": exc.retry_after, }, } }, headers={"Retry-After": str(exc.retry_after)}, ) @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): logger.warning( "HTTP exception", status_code=exc.status_code, detail=exc.detail, correlation_id=getattr(request.state, "correlation_id", None), ) return JSONResponse( status_code=exc.status_code, content={ "error": { "code": "HTTP_ERROR", "message": exc.detail, } }, ) # ------------------------ HEALTH + MCP ------------------------ # @app.get("/health", tags=["health"]) @app.get("/api/v1/health", tags=["health"]) async def health_check(): """Простой health-эндпоинт без обращения к БД/Redis.""" return { "status": "ok", "version": settings.app_version, "app": settings.app_name, } @app.post("/mcp") async def mcp_endpoint(request: Request): """ Единая точка входа MCP (JSON-RPC). Поддерживает методы: - initialize - tools/list - tools/call - resources/list - resources/read """ try: message = await request.json() except Exception as e: logger.error("Failed to parse MCP request", error=str(e)) raise MCPException( "Invalid JSON-RPC request", error_code="INVALID_REQUEST", status_code=400, ) correlation_id = getattr(request.state, "correlation_id", None) or str(uuid.uuid4()) method = message.get("method") try: if not method: raise MCPException( "Missing method in MCP request", error_code="INVALID_REQUEST", status_code=400, ) if method == "initialize": return await handle_initialize(message, correlation_id) elif method == "tools/list": return await handle_tools_list(message, correlation_id) elif method == "tools/call": return await handle_tools_call(message, correlation_id) elif method == "resources/list": return await handle_resources_list(message, correlation_id) elif method == "resources/read": return await handle_resources_read(message, correlation_id) else: raise MCPException( error_code="METHOD_NOT_FOUND", message=f"Method '{method}' not found", status_code=404, ) except MCPException as e: logger.error( "Error processing MCP request", method=method, error=e.message, correlation_id=correlation_id, ) return { "jsonrpc": "2.0", "id": message.get("id"), "error": { "code": -32000, "message": e.message, }, } except Exception as e: logger.error( "Error processing MCP request", method=method, error=str(e), correlation_id=correlation_id, ) return { "jsonrpc": "2.0", "id": message.get("id"), "error": { "code": -32603, "message": "Internal error", }, } async def handle_initialize(message: Dict[str, Any], correlation_id: str): """Инициализация MCP-клиента.""" logger.info("MCP initialization", correlation_id=correlation_id) return { "jsonrpc": "2.0", "id": message.get("id"), "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {}, "resources": {}, }, "serverInfo": { "name": settings.app_name, "version": settings.app_version, }, }, } async def handle_tools_list(message: Dict[str, Any], correlation_id: str): """Обработка MCP-запроса tools/list.""" from app.api.v1 import tools # локальный импорт, чтобы не ловить циклы try: tools_map = await tools.tool_registry.get_all_tools() tool_items = [] for tool_name, tool_obj in tools_map.items(): tool_items.append( { "name": tool_obj.name, "description": tool_obj.description, "inputSchema": tool_obj.input_schema, } ) logger.info( "MCP tools/list handled", count=len(tool_items), correlation_id=correlation_id, ) return { "jsonrpc": "2.0", "id": message.get("id"), "result": { "tools": tool_items, }, } except Exception as e: logger.error( "Failed to handle tools/list", error=str(e), correlation_id=correlation_id, ) return { "jsonrpc": "2.0", "id": message.get("id"), "error": { "code": -32603, "message": "Internal error while listing tools", }, } async def handle_tools_call(message: Dict[str, Any], correlation_id: str): """Обработка MCP-запроса tools/call.""" from app.api.v1 import tools params = message.get("params") or {} tool_name = params.get("name") arguments = params.get("arguments") or {} agent_id = params.get("agent_id") or "mcp-client" if not tool_name: return { "jsonrpc": "2.0", "id": message.get("id"), "error": { "code": -32602, "message": "Invalid params: 'name' is required in tools/call", }, } try: logger.info( "MCP tools/call started", tool_name=tool_name, agent_id=agent_id, correlation_id=correlation_id, ) execution = await tools.tool_registry.execute_tool( tool_name=tool_name, parameters=arguments, agent_id=agent_id, ) is_error = execution.error is not None content_items = [] if execution.result is not None: content_items.append( { "type": "json", "json": execution.result, } ) result_payload: Dict[str, Any] = { "content": content_items, "isError": is_error, "toolName": execution.tool_name, "executionTime": execution.execution_time, } if is_error: result_payload["error"] = execution.error logger.info( "MCP tools/call completed", tool_name=tool_name, agent_id=agent_id, is_error=is_error, execution_time=execution.execution_time, correlation_id=correlation_id, ) return { "jsonrpc": "2.0", "id": message.get("id"), "result": result_payload, } except Exception as e: logger.error( "MCP tools/call failed", tool_name=tool_name, error=str(e), correlation_id=correlation_id, ) return { "jsonrpc": "2.0", "id": message.get("id"), "error": { "code": -32000, "message": "Tool execution failed", "data": { "toolName": tool_name, "details": str(e), }, }, } async def handle_resources_list(message: Dict[str, Any], correlation_id: str): """Пока заглушка для resources/list.""" logger.info("MCP resources/list called", correlation_id=correlation_id) return { "jsonrpc": "2.0", "id": message.get("id"), "result": { "resources": [], }, } async def handle_resources_read(message: Dict[str, Any], correlation_id: str): """Пока заглушка для resources/read.""" logger.info("MCP resources/read called", correlation_id=correlation_id) return { "jsonrpc": "2.0", "id": message.get("id"), "result": { "contents": [], }, } if __name__ == "__main__": import uvicorn uvicorn.run( "app.main:app", host=settings.host, port=settings.port, reload=settings.debug, workers=1 if settings.debug else settings.workers, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Vovchansky6/mcp_OVS'

If you have feedback or need assistance with the MCP directory API, please join our Discord server