base.pyā¢5.28 kB
from authlib.consts import default_json_headers
from ..errors import InvalidRequestError
from ..hooks import Hookable
from ..hooks import hooked
from ..requests import OAuth2Request
class BaseGrant(Hookable):
#: Allowed client auth methods for token endpoint
TOKEN_ENDPOINT_AUTH_METHODS = ["client_secret_basic"]
#: Designed for which "grant_type"
GRANT_TYPE = None
# NOTE: there is no charset for application/json, since
# application/json should always in UTF-8.
# The example on RFC is incorrect.
# https://tools.ietf.org/html/rfc4627
TOKEN_RESPONSE_HEADER = default_json_headers
def __init__(self, request: OAuth2Request, server):
super().__init__()
self.prompt = None
self.redirect_uri = None
self.request = request
self.server = server
@property
def client(self):
return self.request.client
def generate_token(
self,
user=None,
scope=None,
grant_type=None,
expires_in=None,
include_refresh_token=True,
):
if grant_type is None:
grant_type = self.GRANT_TYPE
return self.server.generate_token(
client=self.request.client,
grant_type=grant_type,
user=user,
scope=scope,
expires_in=expires_in,
include_refresh_token=include_refresh_token,
)
def authenticate_token_endpoint_client(self):
"""Authenticate client with the given methods for token endpoint.
For example, the client makes the following HTTP request using TLS:
.. code-block:: http
POST /token HTTP/1.1
Host: server.example.com
Authorization: Basic czZCaGRSa3F0MzpnWDFmQmF0M2JW
Content-Type: application/x-www-form-urlencoded
grant_type=authorization_code&code=SplxlOBeZQQYbYS6WxSbIA
&redirect_uri=https%3A%2F%2Fclient%2Eexample%2Ecom%2Fcb
Default available methods are: "none", "client_secret_basic" and
"client_secret_post".
:return: client
"""
client = self.server.authenticate_client(
self.request, self.TOKEN_ENDPOINT_AUTH_METHODS
)
self.server.send_signal("after_authenticate_client", client=client, grant=self)
return client
def save_token(self, token):
"""A method to save token into database."""
return self.server.save_token(token, self.request)
def validate_requested_scope(self):
"""Validate if requested scope is supported by Authorization Server."""
scope = self.request.payload.scope
return self.server.validate_requested_scope(scope)
class TokenEndpointMixin:
#: Allowed HTTP methods of this token endpoint
TOKEN_ENDPOINT_HTTP_METHODS = ["POST"]
#: Designed for which "grant_type"
GRANT_TYPE = None
@classmethod
def check_token_endpoint(cls, request: OAuth2Request):
return (
request.payload.grant_type == cls.GRANT_TYPE
and request.method in cls.TOKEN_ENDPOINT_HTTP_METHODS
)
def validate_token_request(self):
raise NotImplementedError()
def create_token_response(self):
raise NotImplementedError()
class AuthorizationEndpointMixin:
RESPONSE_TYPES = set()
ERROR_RESPONSE_FRAGMENT = False
@classmethod
def check_authorization_endpoint(cls, request: OAuth2Request):
return request.payload.response_type in cls.RESPONSE_TYPES
@staticmethod
def validate_authorization_redirect_uri(request: OAuth2Request, client):
if request.payload.redirect_uri:
if not client.check_redirect_uri(request.payload.redirect_uri):
raise InvalidRequestError(
f"Redirect URI {request.payload.redirect_uri} is not supported by client.",
)
return request.payload.redirect_uri
else:
redirect_uri = client.get_default_redirect_uri()
if not redirect_uri:
raise InvalidRequestError(
"Missing 'redirect_uri' in request.", state=request.payload.state
)
return redirect_uri
@staticmethod
def validate_no_multiple_request_parameter(request: OAuth2Request):
"""For the Authorization Endpoint, request and response parameters MUST NOT be included
more than once. Per `Section 3.1`_.
.. _`Section 3.1`: https://tools.ietf.org/html/rfc6749#section-3.1
"""
datalist = request.payload.datalist
parameters = ["response_type", "client_id", "redirect_uri", "scope", "state"]
for param in parameters:
if len(datalist.get(param, [])) > 1:
raise InvalidRequestError(
f"Multiple '{param}' in request.", state=request.payload.state
)
@hooked
def validate_consent_request(self):
redirect_uri = self.validate_authorization_request()
self.redirect_uri = redirect_uri
return redirect_uri
def validate_authorization_request(self):
raise NotImplementedError()
def create_authorization_response(self, redirect_uri: str, grant_user):
raise NotImplementedError()