Skip to main content
Glama
registry.py3.58 kB
"""Resource registry for managing MCP resources.""" from typing import Any, Callable import mcp.types as types from pydantic import AnyUrl from .base import BaseResource class ResourceRegistry: """Registry for managing MCP resources.""" def __init__(self) -> None: """Initialize the resource registry.""" self._resources: dict[str, BaseResource | Callable[..., Any]] = {} self._names: dict[str, str] = {} self._descriptions: dict[str, str] = {} self._mime_types: dict[str, str] = {} def register( self, uri: str, resource_func: Any, name: str = "", description: str = "", mime_type: str = "text/plain" ) -> None: """Register a resource. Args: uri: Resource URI resource_func: Resource function or BaseResource instance name: Resource name description: Resource description mime_type: MIME type of the resource """ if isinstance(resource_func, BaseResource): self._resources[uri] = resource_func else: self._resources[uri] = resource_func self._names[uri] = name or self._extract_name_from_uri(uri) self._descriptions[uri] = description or self._extract_description(resource_func) self._mime_types[uri] = mime_type def _extract_name_from_uri(self, uri: str) -> str: """Extract name from URI. Args: uri: Resource URI Returns: Extracted name """ return uri.split("/")[-1] or "resource" def _extract_description(self, func: Callable[..., Any]) -> str: """Extract description from function docstring. Args: func: Function to extract description from Returns: Function description """ return (func.__doc__ or "").strip() async def list_resources(self) -> list[types.Resource]: """List all registered resources. Returns: List of MCP resources """ resources = [] for uri, resource in self._resources.items(): if isinstance(resource, BaseResource): resources.append(resource.to_mcp_resource()) else: resources.append( types.Resource( uri=AnyUrl(uri), name=self._names.get(uri, ""), description=self._descriptions.get(uri, ""), mimeType=self._mime_types.get(uri, "text/plain"), ) ) return resources async def read_resource(self, uri: str) -> str: """Read a resource by URI. Args: uri: Resource URI Returns: Resource content Raises: ValueError: If resource is not found """ if uri not in self._resources: raise ValueError(f"Unknown resource: {uri}") resource = self._resources[uri] if isinstance(resource, BaseResource): return await resource.read() else: # Call regular function try: if hasattr(resource, "__call__"): result = resource() if hasattr(result, "__await__"): result = await result return str(result) else: return str(resource) except Exception as e: return f"Error reading resource {uri}: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/namnd00/mcp-server-hero'

If you have feedback or need assistance with the MCP directory API, please join our Discord server