Skip to main content
Glama

propublica-mcp

resource_manager.py•19.8 kB
"""Resource manager functionality.""" from __future__ import annotations import inspect import warnings from collections.abc import Callable from typing import TYPE_CHECKING, Any from pydantic import AnyUrl from fastmcp import settings from fastmcp.exceptions import NotFoundError, ResourceError from fastmcp.resources.resource import Resource from fastmcp.resources.template import ( ResourceTemplate, match_uri_template, ) from fastmcp.settings import DuplicateBehavior from fastmcp.utilities.logging import get_logger if TYPE_CHECKING: from fastmcp.server.server import MountedServer logger = get_logger(__name__) class ResourceManager: """Manages FastMCP resources.""" def __init__( self, duplicate_behavior: DuplicateBehavior | None = None, mask_error_details: bool | None = None, ): """Initialize the ResourceManager. Args: duplicate_behavior: How to handle duplicate resources (warn, error, replace, ignore) mask_error_details: Whether to mask error details from exceptions other than ResourceError """ self._resources: dict[str, Resource] = {} self._templates: dict[str, ResourceTemplate] = {} self._mounted_servers: list[MountedServer] = [] self.mask_error_details = mask_error_details or settings.mask_error_details # Default to "warn" if None is provided if duplicate_behavior is None: duplicate_behavior = "warn" if duplicate_behavior not in DuplicateBehavior.__args__: raise ValueError( f"Invalid duplicate_behavior: {duplicate_behavior}. " f"Must be one of: {', '.join(DuplicateBehavior.__args__)}" ) self.duplicate_behavior = duplicate_behavior def mount(self, server: MountedServer) -> None: """Adds a mounted server as a source for resources and templates.""" self._mounted_servers.append(server) async def get_resources(self) -> dict[str, Resource]: """Get all registered resources, keyed by URI.""" return await self._load_resources(via_server=False) async def get_resource_templates(self) -> dict[str, ResourceTemplate]: """Get all registered templates, keyed by URI template.""" return await self._load_resource_templates(via_server=False) async def _load_resources(self, *, via_server: bool = False) -> dict[str, Resource]: """ The single, consolidated recursive method for fetching resources. The 'via_server' parameter determines the communication path. - via_server=False: Manager-to-manager path for complete, unfiltered inventory - via_server=True: Server-to-server path for filtered MCP requests """ all_resources: dict[str, Resource] = {} for mounted in self._mounted_servers: try: if via_server: # Use the server-to-server filtered path child_resources_list = await mounted.server._list_resources() child_resources = { resource.key: resource for resource in child_resources_list } else: # Use the manager-to-manager unfiltered path child_resources = ( await mounted.server._resource_manager.get_resources() ) # Apply prefix if needed if mounted.prefix: from fastmcp.server.server import add_resource_prefix for uri, resource in child_resources.items(): prefixed_uri = add_resource_prefix( uri, mounted.prefix, mounted.resource_prefix_format ) # Create a copy of the resource with the prefixed key prefixed_resource = resource.with_key(prefixed_uri) all_resources[prefixed_uri] = prefixed_resource else: all_resources.update(child_resources) except Exception as e: # Skip failed mounts silently, matches existing behavior logger.warning( f"Failed to get resources from mounted server '{mounted.prefix}': {e}" ) continue # Finally, add local resources, which always take precedence all_resources.update(self._resources) return all_resources async def _load_resource_templates( self, *, via_server: bool = False ) -> dict[str, ResourceTemplate]: """ The single, consolidated recursive method for fetching templates. The 'via_server' parameter determines the communication path. - via_server=False: Manager-to-manager path for complete, unfiltered inventory - via_server=True: Server-to-server path for filtered MCP requests """ all_templates: dict[str, ResourceTemplate] = {} for mounted in self._mounted_servers: try: if via_server: # Use the server-to-server filtered path child_templates = await mounted.server._list_resource_templates() else: # Use the manager-to-manager unfiltered path child_templates = ( await mounted.server._resource_manager.list_resource_templates() ) child_dict = {template.key: template for template in child_templates} # Apply prefix if needed if mounted.prefix: from fastmcp.server.server import add_resource_prefix for uri_template, template in child_dict.items(): prefixed_uri_template = add_resource_prefix( uri_template, mounted.prefix, mounted.resource_prefix_format ) # Create a copy of the template with the prefixed key prefixed_template = template.with_key(prefixed_uri_template) all_templates[prefixed_uri_template] = prefixed_template else: all_templates.update(child_dict) except Exception as e: # Skip failed mounts silently, matches existing behavior logger.warning( f"Failed to get templates from mounted server '{mounted.prefix}': {e}" ) continue # Finally, add local templates, which always take precedence all_templates.update(self._templates) return all_templates async def list_resources(self) -> list[Resource]: """ Lists all resources, applying protocol filtering. """ resources_dict = await self._load_resources(via_server=True) return list(resources_dict.values()) async def list_resource_templates(self) -> list[ResourceTemplate]: """ Lists all templates, applying protocol filtering. """ templates_dict = await self._load_resource_templates(via_server=True) return list(templates_dict.values()) def add_resource_or_template_from_fn( self, fn: Callable[..., Any], uri: str, name: str | None = None, description: str | None = None, mime_type: str | None = None, tags: set[str] | None = None, ) -> Resource | ResourceTemplate: """Add a resource or template to the manager from a function. Args: fn: The function to register as a resource or template uri: The URI for the resource or template name: Optional name for the resource or template description: Optional description of the resource or template mime_type: Optional MIME type for the resource or template tags: Optional set of tags for categorizing the resource or template Returns: The added resource or template. If a resource or template with the same URI already exists, returns the existing resource or template. """ from fastmcp.server.context import Context # Check if this should be a template has_uri_params = "{" in uri and "}" in uri # check if the function has any parameters (other than injected context) has_func_params = any( p for p in inspect.signature(fn).parameters.values() if p.annotation is not Context ) if has_uri_params or has_func_params: return self.add_template_from_fn( fn, uri, name, description, mime_type, tags ) elif not has_uri_params and not has_func_params: return self.add_resource_from_fn( fn, uri, name, description, mime_type, tags ) else: raise ValueError( "Invalid resource or template definition due to a " "mismatch between URI parameters and function parameters." ) def add_resource_from_fn( self, fn: Callable[..., Any], uri: str, name: str | None = None, description: str | None = None, mime_type: str | None = None, tags: set[str] | None = None, ) -> Resource: """Add a resource to the manager from a function. Args: fn: The function to register as a resource uri: The URI for the resource name: Optional name for the resource description: Optional description of the resource mime_type: Optional MIME type for the resource tags: Optional set of tags for categorizing the resource Returns: The added resource. If a resource with the same URI already exists, returns the existing resource. """ # deprecated in 2.7.0 if settings.deprecation_warnings: warnings.warn( "add_resource_from_fn is deprecated. Use Resource.from_function() and call add_resource() instead.", DeprecationWarning, stacklevel=2, ) resource = Resource.from_function( fn=fn, uri=uri, name=name, description=description, mime_type=mime_type, tags=tags, ) return self.add_resource(resource) def add_resource(self, resource: Resource) -> Resource: """Add a resource to the manager. Args: resource: A Resource instance to add. The resource's .key attribute will be used as the storage key. To overwrite it, call Resource.with_key() before calling this method. """ existing = self._resources.get(resource.key) if existing: if self.duplicate_behavior == "warn": logger.warning(f"Resource already exists: {resource.key}") self._resources[resource.key] = resource elif self.duplicate_behavior == "replace": self._resources[resource.key] = resource elif self.duplicate_behavior == "error": raise ValueError(f"Resource already exists: {resource.key}") elif self.duplicate_behavior == "ignore": return existing self._resources[resource.key] = resource return resource def add_template_from_fn( self, fn: Callable[..., Any], uri_template: str, name: str | None = None, description: str | None = None, mime_type: str | None = None, tags: set[str] | None = None, ) -> ResourceTemplate: """Create a template from a function.""" # deprecated in 2.7.0 if settings.deprecation_warnings: warnings.warn( "add_template_from_fn is deprecated. Use ResourceTemplate.from_function() and call add_template() instead.", DeprecationWarning, stacklevel=2, ) template = ResourceTemplate.from_function( fn, uri_template=uri_template, name=name, description=description, mime_type=mime_type, tags=tags, ) return self.add_template(template) def add_template(self, template: ResourceTemplate) -> ResourceTemplate: """Add a template to the manager. Args: template: A ResourceTemplate instance to add. The template's .key attribute will be used as the storage key. To overwrite it, call ResourceTemplate.with_key() before calling this method. Returns: The added template. If a template with the same URI already exists, returns the existing template. """ existing = self._templates.get(template.key) if existing: if self.duplicate_behavior == "warn": logger.warning(f"Template already exists: {template.key}") self._templates[template.key] = template elif self.duplicate_behavior == "replace": self._templates[template.key] = template elif self.duplicate_behavior == "error": raise ValueError(f"Template already exists: {template.key}") elif self.duplicate_behavior == "ignore": return existing self._templates[template.key] = template return template async def has_resource(self, uri: AnyUrl | str) -> bool: """Check if a resource exists.""" uri_str = str(uri) # First check concrete resources (local and mounted) resources = await self.get_resources() if uri_str in resources: return True # Then check templates (local and mounted) only if not found in concrete resources templates = await self.get_resource_templates() for template_key in templates.keys(): if match_uri_template(uri_str, template_key): return True return False async def get_resource(self, uri: AnyUrl | str) -> Resource: """Get resource by URI, checking concrete resources first, then templates. Args: uri: The URI of the resource to get Raises: NotFoundError: If no resource or template matching the URI is found. """ uri_str = str(uri) logger.debug("Getting resource", extra={"uri": uri_str}) # First check concrete resources (local and mounted) resources = await self.get_resources() if resource := resources.get(uri_str): return resource # Then check templates (local and mounted) - use the utility function to match against storage keys templates = await self.get_resource_templates() for storage_key, template in templates.items(): # Try to match against the storage key (which might be a custom key) if params := match_uri_template(uri_str, storage_key): try: return await template.create_resource( uri_str, params=params, ) # Pass through ResourceErrors as-is except ResourceError as e: logger.error(f"Error creating resource from template: {e}") raise e # Handle other exceptions except Exception as e: logger.error(f"Error creating resource from template: {e}") if self.mask_error_details: # Mask internal details raise ValueError("Error creating resource from template") from e else: # Include original error details raise ValueError( f"Error creating resource from template: {e}" ) from e raise NotFoundError(f"Unknown resource: {uri_str}") async def read_resource(self, uri: AnyUrl | str) -> str | bytes: """ Internal API for servers: Finds and reads a resource, respecting the filtered protocol path. """ uri_str = str(uri) # 1. Check local resources first. The server will have already applied its filter. if uri_str in self._resources: resource = await self.get_resource(uri_str) if not resource: raise NotFoundError(f"Resource {uri_str!r} not found") try: return await resource.read() # raise ResourceErrors as-is except ResourceError as e: logger.exception(f"Error reading resource {uri_str!r}") raise e # Handle other exceptions except Exception as e: logger.exception(f"Error reading resource {uri_str!r}") if self.mask_error_details: # Mask internal details raise ResourceError(f"Error reading resource {uri_str!r}") from e else: # Include original error details raise ResourceError( f"Error reading resource {uri_str!r}: {e}" ) from e # 1b. Check local templates if not found in concrete resources for key, template in self._templates.items(): if params := match_uri_template(uri_str, key): try: resource = await template.create_resource(uri_str, params=params) return await resource.read() except ResourceError as e: logger.exception( f"Error reading resource from template {uri_str!r}" ) raise e except Exception as e: logger.exception( f"Error reading resource from template {uri_str!r}" ) if self.mask_error_details: raise ResourceError( f"Error reading resource from template {uri_str!r}" ) from e else: raise ResourceError( f"Error reading resource from template {uri_str!r}: {e}" ) from e # 2. Check mounted servers using the filtered protocol path. from fastmcp.server.server import has_resource_prefix, remove_resource_prefix for mounted in reversed(self._mounted_servers): key = uri_str try: if mounted.prefix: if has_resource_prefix( key, mounted.prefix, mounted.resource_prefix_format, ): key = remove_resource_prefix( key, mounted.prefix, mounted.resource_prefix_format, ) else: continue try: result = await mounted.server._read_resource(key) return result[0].content except NotFoundError: continue except NotFoundError: continue raise NotFoundError(f"Resource {uri_str!r} not found.")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/asachs01/propublica-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server