"""
OAuth 2.0 Passthrough for Zintlr MCP Server.
This module implements OAuth endpoints that MCP clients expect, but instead of
issuing our own tokens, we redirect to Zintlr's login and passthrough their JWT.
Flow:
1. LLM client calls /.well-known/oauth-authorization-server → get OAuth metadata
2. LLM client redirects user to /oauth/authorize → we redirect to auth.zintlr.com/login
3. User logs in at Zintlr → Zintlr redirects to /oauth/callback with tokens
4. We store tokens in Redis, redirect to LLM client with session_id as "code"
5. LLM client calls /oauth/token with code → we return session_id as "access_token"
6. LLM client uses session_id in Authorization header for MCP requests
"""
import hashlib
import base64
import secrets
from urllib.parse import urlencode, quote
from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import RedirectResponse, JSONResponse
from pydantic import BaseModel
from app.config import settings
from app.session import session_manager
router = APIRouter(tags=["oauth"])
# ============ OAuth Metadata ============
@router.get("/.well-known/oauth-protected-resource")
async def oauth_protected_resource():
"""
OAuth 2.0 Protected Resource Metadata.
MCP clients check this endpoint to discover how to access the protected resource.
See: RFC 9728
"""
return {
"resource": settings.mcp_server_url,
"authorization_servers": [settings.mcp_server_url],
"bearer_methods_supported": ["header"],
}
@router.get("/.well-known/oauth-authorization-server")
async def oauth_metadata():
"""
OAuth 2.0 Authorization Server Metadata.
MCP clients check this endpoint to discover OAuth endpoints.
See: RFC 8414
"""
return {
"issuer": settings.mcp_server_url,
"authorization_endpoint": f"{settings.mcp_server_url}/oauth/authorize",
"token_endpoint": f"{settings.mcp_server_url}/oauth/token",
"registration_endpoint": f"{settings.mcp_server_url}/oauth/register",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code"],
"code_challenge_methods_supported": ["S256"],
"token_endpoint_auth_methods_supported": ["none"],
"scopes_supported": ["openid", "profile"],
}
# ============ Dynamic Client Registration ============
@router.post("/oauth/register")
async def register_client(request: Request):
"""
OAuth 2.0 Dynamic Client Registration Endpoint.
MCP clients use this to register themselves and obtain a client_id.
See: RFC 7591
Since we're using Zintlr's auth, we accept any registration and return
a dummy client_id. The actual auth happens via Zintlr's login.
"""
try:
body = await request.json()
except Exception:
body = {}
# Extract client metadata from request
client_name = body.get("client_name", "MCP Client")
redirect_uris = body.get("redirect_uris", [])
# Generate a simple client_id (we don't actually need to track these
# since auth is handled by Zintlr, but MCP clients expect a client_id)
client_id = f"mcp_client_{secrets.token_hex(16)}"
# Return registration response per RFC 7591
return {
"client_id": client_id,
"client_name": client_name,
"redirect_uris": redirect_uris,
"grant_types": ["authorization_code"],
"response_types": ["code"],
"token_endpoint_auth_method": "none",
}
# ============ Authorization Endpoint ============
@router.get("/oauth/authorize")
async def authorize(
request: Request,
redirect_uri: str,
state: str,
response_type: str = "code",
client_id: str | None = None,
code_challenge: str | None = None,
code_challenge_method: str | None = None,
):
"""
OAuth Authorization Endpoint.
MCP clients redirect users here to authenticate. We then redirect to
Zintlr's login page with our callback URL.
"""
# Build our callback URL that Zintlr will redirect to after login
# Include code_challenge for PKCE verification during token exchange
callback_params = {
"redirect_uri": redirect_uri,
"state": state,
}
if code_challenge:
callback_params["code_challenge"] = code_challenge
callback_params["code_challenge_method"] = code_challenge_method or "S256"
our_callback = f"{settings.mcp_server_url}/oauth/callback?{urlencode(callback_params)}"
# Redirect to Zintlr login
# Zintlr login accepts redirect_url parameter and will redirect back with tokens
zintlr_login_url = f"{settings.zintlr_frontend_url}/login?redirect_url={quote(our_callback)}"
return RedirectResponse(url=zintlr_login_url, status_code=302)
# ============ Callback Endpoint ============
@router.get("/oauth/callback")
async def oauth_callback(
request: Request,
redirect_uri: str,
state: str,
access_token: str | None = None,
key: str | None = None,
visitor_id: str | None = None,
error: str | None = None,
code_challenge: str | None = None,
code_challenge_method: str | None = None,
):
"""
OAuth Callback Endpoint.
Zintlr redirects here after user logs in, with JWT tokens as query params.
We store the tokens in Redis and redirect back to LLM client with a "code".
"""
# Check for error from Zintlr
if error:
error_params = urlencode({"error": error, "state": state})
return RedirectResponse(url=f"{redirect_uri}?{error_params}", status_code=302)
# Validate required tokens
if not all([access_token, key, visitor_id]):
missing = []
if not access_token:
missing.append("access_token")
if not key:
missing.append("key")
if not visitor_id:
missing.append("visitor_id")
error_params = urlencode({
"error": "invalid_request",
"error_description": f"Missing required parameters: {', '.join(missing)}",
"state": state,
})
return RedirectResponse(url=f"{redirect_uri}?{error_params}", status_code=302)
# Store tokens in Redis and get session ID
# Include code_challenge for PKCE verification during token exchange
session_id = await session_manager.create_session(
access_token=access_token,
key=key,
visitor_id=visitor_id,
code_challenge=code_challenge,
code_challenge_method=code_challenge_method,
)
# Redirect back to LLM client with session_id as the authorization "code"
success_params = urlencode({
"code": session_id,
"state": state,
})
return RedirectResponse(url=f"{redirect_uri}?{success_params}", status_code=302)
# ============ Token Endpoint ============
class TokenRequest(BaseModel):
"""Token exchange request body."""
grant_type: str = "authorization_code"
code: str
redirect_uri: str | None = None
client_id: str | None = None
client_secret: str | None = None
code_verifier: str | None = None
def verify_pkce(code_verifier: str, code_challenge: str, method: str = "S256") -> bool:
"""
Verify PKCE code_verifier against stored code_challenge.
Args:
code_verifier: The code verifier from the token request
code_challenge: The stored code challenge from authorization
method: The challenge method (S256 or plain)
Returns:
True if verification passes, False otherwise
"""
if method == "plain":
return code_verifier == code_challenge
# S256: BASE64URL(SHA256(code_verifier)) == code_challenge
digest = hashlib.sha256(code_verifier.encode("ascii")).digest()
computed_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii")
return computed_challenge == code_challenge
@router.post("/oauth/token")
async def token_exchange(request: Request):
"""
OAuth Token Endpoint.
LLM client exchanges the authorization code for an access token.
We return the session_id as the access_token (it's used to lookup
the real Zintlr tokens from Redis).
"""
# Parse form data or JSON
content_type = request.headers.get("content-type", "")
if "application/x-www-form-urlencoded" in content_type:
form_data = await request.form()
code = form_data.get("code", "")
grant_type = form_data.get("grant_type", "authorization_code")
code_verifier = form_data.get("code_verifier", "")
elif "application/json" in content_type:
json_data = await request.json()
code = json_data.get("code", "")
grant_type = json_data.get("grant_type", "authorization_code")
code_verifier = json_data.get("code_verifier", "")
else:
# Try form data as default
form_data = await request.form()
code = form_data.get("code", "")
grant_type = form_data.get("grant_type", "authorization_code")
code_verifier = form_data.get("code_verifier", "")
# Validate grant type
if grant_type != "authorization_code":
return JSONResponse(
status_code=400,
content={
"error": "unsupported_grant_type",
"error_description": "Only authorization_code grant is supported",
},
)
# Validate code exists in Redis
session = await session_manager.get_session(code)
if not session:
return JSONResponse(
status_code=400,
content={
"error": "invalid_grant",
"error_description": "Invalid or expired authorization code",
},
)
# Verify PKCE if code_challenge was provided during authorization
stored_challenge = session.get("code_challenge")
if stored_challenge:
if not code_verifier:
return JSONResponse(
status_code=400,
content={
"error": "invalid_request",
"error_description": "code_verifier is required",
},
)
challenge_method = session.get("code_challenge_method", "S256")
if not verify_pkce(code_verifier, stored_challenge, challenge_method):
return JSONResponse(
status_code=400,
content={
"error": "invalid_grant",
"error_description": "PKCE verification failed",
},
)
# Refresh session TTL
await session_manager.refresh_session(code)
# Return the session_id as the access_token
# LLM client will include this in Authorization: Bearer <session_id> header
return {
"access_token": code,
"token_type": "Bearer",
"expires_in": settings.session_expire_seconds,
}
# ============ Token Revocation (Optional) ============
@router.post("/oauth/revoke")
async def revoke_token(request: Request):
"""
OAuth Token Revocation Endpoint.
Allows LLM clients or users to revoke/logout.
"""
content_type = request.headers.get("content-type", "")
if "application/x-www-form-urlencoded" in content_type:
form_data = await request.form()
token = form_data.get("token", "")
else:
json_data = await request.json()
token = json_data.get("token", "")
if token:
await session_manager.delete_session(token)
# Always return 200 per RFC 7009
return {"status": "ok"}