Skip to main content
Glama
jezweb

Australian Postcodes MCP Server

auth.py11.1 kB
from __future__ import annotations from typing import Any from mcp.server.auth.provider import ( AccessToken as _SDKAccessToken, ) from mcp.server.auth.provider import ( AuthorizationCode, OAuthAuthorizationServerProvider, RefreshToken, ) from mcp.server.auth.provider import ( TokenVerifier as TokenVerifierProtocol, ) from mcp.server.auth.routes import ( create_auth_routes, create_protected_resource_routes, ) from mcp.server.auth.settings import ( ClientRegistrationOptions, RevocationOptions, ) from pydantic import AnyHttpUrl from starlette.routing import Route class AccessToken(_SDKAccessToken): """AccessToken that includes all JWT claims.""" claims: dict[str, Any] = {} class AuthProvider(TokenVerifierProtocol): """Base class for all FastMCP authentication providers. This class provides a unified interface for all authentication providers, whether they are simple token verifiers or full OAuth authorization servers. All providers must be able to verify tokens and can optionally provide custom authentication routes. """ def __init__(self, resource_server_url: AnyHttpUrl | str | None = None): """ Initialize the auth provider. Args: resource_server_url: The URL of this resource server. This is used for RFC 8707 resource indicators, including creating the WWW-Authenticate header. """ if isinstance(resource_server_url, str): resource_server_url = AnyHttpUrl(resource_server_url) self.resource_server_url = resource_server_url async def verify_token(self, token: str) -> AccessToken | None: """Verify a bearer token and return access info if valid. All auth providers must implement token verification. Args: token: The token string to validate Returns: AccessToken object if valid, None if invalid or expired """ raise NotImplementedError("Subclasses must implement verify_token") def get_routes(self) -> list[Route]: """Get the routes for this authentication provider. Each provider is responsible for creating whatever routes it needs: - TokenVerifier: typically no routes (default implementation) - RemoteAuthProvider: protected resource metadata routes - OAuthProvider: full OAuth authorization server routes - Custom providers: whatever routes they need Returns: List of routes for this provider """ return [] def get_resource_metadata_url(self) -> AnyHttpUrl | None: """Get the resource metadata URL for RFC 9728 compliance.""" if self.resource_server_url is None: return None # Add .well-known path for RFC 9728 compliance resource_metadata_url = AnyHttpUrl( str(self.resource_server_url).rstrip("/") + "/.well-known/oauth-protected-resource" ) return resource_metadata_url class TokenVerifier(AuthProvider): """Base class for token verifiers (Resource Servers). This class provides token verification capability without OAuth server functionality. Token verifiers typically don't provide authentication routes by default. """ def __init__( self, resource_server_url: AnyHttpUrl | str | None = None, required_scopes: list[str] | None = None, ): """ Initialize the token verifier. Args: resource_server_url: The URL of this resource server. This is used for RFC 8707 resource indicators, including creating the WWW-Authenticate header. required_scopes: Scopes that are required for all requests """ super().__init__(resource_server_url=resource_server_url) self.required_scopes = required_scopes or [] async def verify_token(self, token: str) -> AccessToken | None: """Verify a bearer token and return access info if valid.""" raise NotImplementedError("Subclasses must implement verify_token") class RemoteAuthProvider(AuthProvider): """Authentication provider for resource servers that verify tokens from known authorization servers. This provider composes a TokenVerifier with authorization server metadata to create standardized OAuth 2.0 Protected Resource endpoints (RFC 9728). Perfect for: - JWT verification with known issuers - Remote token introspection services - Any resource server that knows where its tokens come from Use this when you have token verification logic and want to advertise the authorization servers that issue valid tokens. """ def __init__( self, token_verifier: TokenVerifier, authorization_servers: list[AnyHttpUrl], resource_server_url: AnyHttpUrl | str, resource_name: str | None = None, resource_documentation: AnyHttpUrl | None = None, ): """Initialize the remote auth provider. Args: token_verifier: TokenVerifier instance for token validation authorization_servers: List of authorization servers that issue valid tokens resource_server_url: URL of this resource server. This is used for RFC 8707 resource indicators, including creating the WWW-Authenticate header. """ super().__init__(resource_server_url=resource_server_url) self.token_verifier = token_verifier self.authorization_servers = authorization_servers self.resource_name = resource_name self.resource_documentation = resource_documentation async def verify_token(self, token: str) -> AccessToken | None: """Verify token using the configured token verifier.""" return await self.token_verifier.verify_token(token) def get_routes(self) -> list[Route]: """Get OAuth routes for this provider. By default, returns only the standardized OAuth 2.0 Protected Resource routes. Subclasses can override this method to add additional routes by calling super().get_routes() and extending the returned list. """ assert self.resource_server_url is not None return create_protected_resource_routes( resource_url=self.resource_server_url, authorization_servers=self.authorization_servers, scopes_supported=self.token_verifier.required_scopes, resource_name=self.resource_name, resource_documentation=self.resource_documentation, ) class OAuthProvider( AuthProvider, OAuthAuthorizationServerProvider[AuthorizationCode, RefreshToken, AccessToken], ): """OAuth Authorization Server provider. This class provides full OAuth server functionality including client registration, authorization flows, token issuance, and token verification. """ def __init__( self, *, base_url: AnyHttpUrl | str, issuer_url: AnyHttpUrl | str | None = None, service_documentation_url: AnyHttpUrl | str | None = None, client_registration_options: ClientRegistrationOptions | None = None, revocation_options: RevocationOptions | None = None, required_scopes: list[str] | None = None, resource_server_url: AnyHttpUrl | str | None = None, ): """ Initialize the OAuth provider. Args: base_url: The public URL of this FastMCP server issuer_url: The issuer URL for OAuth metadata (defaults to base_url) service_documentation_url: The URL of the service documentation. client_registration_options: The client registration options. revocation_options: The revocation options. required_scopes: Scopes that are required for all requests. resource_server_url: The URL of this resource server (for RFC 8707 resource indicators, defaults to base_url) """ super().__init__() # Convert URLs to proper types if isinstance(base_url, str): base_url = AnyHttpUrl(base_url) self.base_url = base_url if issuer_url is None: self.issuer_url = base_url elif isinstance(issuer_url, str): self.issuer_url = AnyHttpUrl(issuer_url) else: self.issuer_url = issuer_url # Handle our own resource_server_url and required_scopes if resource_server_url is None: self.resource_server_url = base_url elif isinstance(resource_server_url, str): self.resource_server_url = AnyHttpUrl(resource_server_url) else: self.resource_server_url = resource_server_url self.required_scopes = required_scopes or [] # Initialize OAuth Authorization Server Provider OAuthAuthorizationServerProvider.__init__(self) if isinstance(service_documentation_url, str): service_documentation_url = AnyHttpUrl(service_documentation_url) self.service_documentation_url = service_documentation_url self.client_registration_options = client_registration_options self.revocation_options = revocation_options async def verify_token(self, token: str) -> AccessToken | None: """ Verify a bearer token and return access info if valid. This method implements the TokenVerifier protocol by delegating to our existing load_access_token method. Args: token: The token string to validate Returns: AccessToken object if valid, None if invalid or expired """ return await self.load_access_token(token) def get_routes(self) -> list[Route]: """Get OAuth authorization server routes and optional protected resource routes. This method creates the full set of OAuth routes including: - Standard OAuth authorization server routes (/.well-known/oauth-authorization-server, /authorize, /token, etc.) - Optional protected resource routes if resource_server_url is configured Returns: List of OAuth routes """ # Create standard OAuth authorization server routes oauth_routes = create_auth_routes( provider=self, issuer_url=self.issuer_url, service_documentation_url=self.service_documentation_url, client_registration_options=self.client_registration_options, revocation_options=self.revocation_options, ) # Add protected resource routes if this server is also acting as a resource server if self.resource_server_url: protected_routes = create_protected_resource_routes( resource_url=self.resource_server_url, authorization_servers=[self.issuer_url], scopes_supported=self.required_scopes, ) oauth_routes.extend(protected_routes) return oauth_routes

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jezweb/australian-postcodes-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server