Skip to main content
Glama
main.py13.7 kB
import json from typing import Any, Dict, List, Optional from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from prometheus_fastapi_instrumentator import Instrumentator from pydantic import BaseModel from app.models.graphql import graphql_router # Импорт GraphQL маршрутизатора from app.services.mcp_service import mcp_service app = FastAPI(title="MCP Server", docs_url="/docs", redoc_url="/redoc") Instrumentator().instrument(app).expose(app) @app.on_event("startup") async def startup_event(): # Register example tools from app.tools.example_tool import register_tools await register_tools() print("MCP Server started with the following tools:") tools = await mcp_service.list_tools() for tool_name, tool in tools.items(): print(f"- {tool_name}: {tool.description}") # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.get("/") async def root(): return {"message": "Welcome to MCP Server"} @app.get("/health") async def health_check(): return {"status": "healthy"} # Добавляем GraphQL маршрутизатор app.include_router(graphql_router, prefix="/graphql") # Resources API @app.get("/resources") async def list_resources(): """Получить список всех ресурсов""" resources = await mcp_service.list_resources() return {"resources": resources} @app.get("/resources/{uri}") async def get_resource(uri: str): """Получить ресурс по URI""" resource = await mcp_service.get_resource(uri) if not resource: raise HTTPException( status_code=404, detail=f"Resource '{uri}' not found", ) return resource class ResourceCreate(BaseModel): """Модель для создания ресурса""" uri: str name: str description: Optional[str] = None mime_type: Optional[str] = None content: Optional[str] = None @app.post("/resources") async def create_resource(resource: ResourceCreate): """Создать новый ресурс""" from app.models.mcp import Resource as MCPResource new_resource = MCPResource( uri=resource.uri, name=resource.name, description=resource.description, mimeType=resource.mime_type, ) await mcp_service.register_resource(new_resource) return { "message": f"Resource '{resource.uri}' created", "resource": new_resource, } # Tools API @app.get("/tools") async def list_tools(): tools = await mcp_service.list_tools() return {"tools": list(tools.values())} class ToolParameters(BaseModel): # Общие параметры operation: Optional[str] = None path: Optional[str] = None content: Optional[str] = None # Параметры для text_analysis text: Optional[str] = None analysis_type: Optional[str] = None # Параметры для weather latitude: Optional[float] = None longitude: Optional[float] = None # Параметры для image_processing image_data: Optional[str] = None params: Optional[Dict[str, Any]] = None model_config = {"extra": "allow"} # Разрешаем дополнительные поля @app.post("/tools/{tool_name}") async def execute_tool(tool_name: str, parameters: ToolParameters): try: # Convert to dict and remove None values params_dict = { k: v for k, v in parameters.model_dump().items() if v is not None } result = await mcp_service.execute_tool(tool_name, params_dict) return result except HTTPException as e: raise e except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Prompts API @app.get("/prompts") async def list_prompts(): """Получить список всех промптов""" prompts = await mcp_service.list_prompts() return {"prompts": prompts} @app.get("/prompts/{name}") async def get_prompt(name: str): """Получить промпт по имени""" prompt = await mcp_service.get_prompt(name) if not prompt: raise HTTPException( status_code=404, detail=f"Prompt '{name}' not found", ) return prompt class PromptArguments(BaseModel): """Модель для аргументов промпта""" model_config = {"extra": "allow"} # Разрешаем дополнительные поля @app.post("/prompts/{name}") async def execute_prompt(name: str, arguments: PromptArguments): """Выполнить промпт""" try: # Преобразуем в словарь и удаляем значения None args_dict = {k: v for k, v in arguments.model_dump().items() if v is not None} messages = await mcp_service.execute_prompt(name, args_dict) return {"messages": messages} except HTTPException as e: raise e except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Sampling API class SamplingRequest(BaseModel): """Модель для запроса сэмплирования""" messages: List[Dict[str, Any]] model_preferences: Optional[Dict[str, Any]] = None system_prompt: Optional[str] = None include_context: Optional[str] = "none" temperature: Optional[float] = None max_tokens: Optional[int] = 1024 stop_sequences: Optional[List[str]] = None metadata: Optional[Dict[str, Any]] = None @app.post("/sampling") async def create_sampling(request: SamplingRequest): """Создать запрос на сэмплирование LLM""" try: from app.models.mcp import SamplingRequest as MCPSamplingRequest # Преобразование запроса в формат MCP mcp_request = MCPSamplingRequest( messages=request.messages, modelPreferences=request.model_preferences, systemPrompt=request.system_prompt, includeContext=request.include_context, temperature=request.temperature, maxTokens=request.max_tokens or 1024, stopSequences=request.stop_sequences, metadata=request.metadata, ) # Выполнение запроса сэмплирования result = await mcp_service.create_sampling(mcp_request) return result except NotImplementedError: raise HTTPException( status_code=501, detail="Sampling functionality is not implemented yet", ) except HTTPException as e: raise e except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: data = await websocket.receive_text() try: message = json.loads(data) message_type = message.get("type") message_data = message.get("data", {}) if message_type == "tool_request": response = await mcp_service.execute_tool( message_data.get("name", ""), message_data.get("parameters", {}) ) await websocket.send_json( {"type": "tool_response", "data": response} ) elif message_type == "register_tool": from app.models.mcp import Tool tool = Tool( name=message_data.get("name"), description=message_data.get("description"), input_schema=message_data.get("input_schema", {}), ) await mcp_service.register_tool(tool) await websocket.send_json( { "type": "registration_response", "data": { "status": "success", "message": f"Tool '{tool.name}' registered", }, } ) elif message_type == "resource_request": resource_uri = message_data.get("uri") resource = await mcp_service.get_resource(resource_uri) if resource: await websocket.send_json( { "type": "resource_response", "data": { "resource": resource, "status": "success", }, } ) else: await websocket.send_json( { "type": "resource_response", "data": { "status": "error", "message": f"Resource '{resource_uri}' not found", }, } ) elif message_type == "prompt_request": prompt_name = message_data.get("name") prompt_args = message_data.get("arguments", {}) messages = await mcp_service.execute_prompt( prompt_name, prompt_args, ) await websocket.send_json( { "type": "prompt_response", "data": { "messages": messages, "status": "success", }, } ) elif message_type == "sampling_request": # Преобразование запроса в формат MCP from app.models.mcp import SamplingRequest as MCPSamplingRequest try: mcp_request = MCPSamplingRequest( messages=message_data.get("messages", []), modelPreferences=message_data.get("model_preferences"), systemPrompt=message_data.get("system_prompt"), includeContext=message_data.get("include_context", "none"), temperature=message_data.get("temperature"), maxTokens=message_data.get("max_tokens", 1024), stopSequences=message_data.get("stop_sequences"), metadata=message_data.get("metadata"), ) result = await mcp_service.create_sampling(mcp_request) await websocket.send_json( { "type": "sampling_response", "data": { "result": result, "status": "success", }, } ) except NotImplementedError: await websocket.send_json( { "type": "sampling_response", "data": { "status": "error", "message": "Sampling functionality is not implemented yet", }, } ) except Exception as e: await websocket.send_json( { "type": "sampling_response", "data": { "status": "error", "message": str(e), }, } ) else: await websocket.send_json( {"type": "error", "data": {"message": "Unknown message type"}} ) except json.JSONDecodeError: await websocket.send_json( {"type": "error", "data": {"message": "Invalid JSON"}} ) except Exception as e: await websocket.send_json( {"type": "error", "data": {"message": str(e)}} ) except WebSocketDisconnect: manager.disconnect(websocket) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/eagurin/myaiserv'

If you have feedback or need assistance with the MCP directory API, please join our Discord server