Skip to main content
Glama
mcp_registry.py2.76 kB
""" MCP Registry module for managing server registrations. """ import threading from typing import List, Optional try: from fastapi import FastAPI, HTTPException from pydantic import BaseModel HAS_FASTAPI = True except ImportError: HAS_FASTAPI = False # Dummy classes to prevent NameError if module is imported but not used class FastAPI: def post(self, *args, **kwargs): return lambda x: x def delete(self, *args, **kwargs): return lambda x: x def get(self, *args, **kwargs): return lambda x: x class HTTPException(Exception): def __init__(self, status_code, detail): super().__init__(detail) class BaseModel: def dict(self): return {} app = FastAPI() class Server: """Represents a server in the MCP Registry.""" def __init__(self, server_id: str, name: str, url: str, metadata: Optional[dict] = None): self.id = server_id self.name = name self.url = url self.metadata = metadata class MCPRegistry: """In-memory MCP Registry to manage server registrations.""" def __init__(self): self.servers = {} self.lock = threading.Lock() def register_server(self, server: Server): """Registers a new server in the registry.""" with self.lock: if server.id in self.servers: raise HTTPException(status_code=400, detail="Server ID already registered.") self.servers[server.id] = server def unregister_server(self, server_id: str): """Unregisters a server from the registry by ID.""" with self.lock: if server_id not in self.servers: raise HTTPException(status_code=404, detail="Server not found.") del self.servers[server_id] def list_servers(self) -> List[Server]: """Lists all registered servers.""" with self.lock: return list(self.servers.values()) registry = MCPRegistry() class ServerCreate(BaseModel): """Schema for creating a new server registration.""" id: str name: str url: str metadata: Optional[dict] = None @app.post("/registry/register") def register_server(server: ServerCreate): """Register a new server.""" new_server = Server(**server.dict()) registry.register_server(new_server) return {"message": "Server registered successfully."} @app.delete("/registry/unregister/{server_id}") def unregister_server(server_id: str): """Unregister a server.""" registry.unregister_server(server_id) return {"message": "Server unregistered successfully."} @app.get("/registry/list", response_model=List[Server]) def list_servers(): """List all registered servers.""" return registry.list_servers()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/4R9UN/mcp-kql-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server