"""
OAuth 2.0 Passthrough for Zintlr MCP Server.
This module implements OAuth endpoints that MCP clients expect, but instead of
issuing our own tokens, we redirect to Zintlr's login and passthrough their JWT.
Flow:
1. LLM client calls /.well-known/oauth-authorization-server → get OAuth metadata
2. LLM client redirects user to /oauth/authorize → we redirect to auth.zintlr.com/login
3. User logs in at Zintlr → Zintlr redirects to /oauth/callback with tokens
4. We store tokens in Redis, redirect to LLM client with session_id as "code"
5. LLM client calls /oauth/token with code → we return session_id as "access_token"
6. LLM client uses session_id in Authorization header for MCP requests
"""
from urllib.parse import urlencode, quote
from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import RedirectResponse, JSONResponse
from pydantic import BaseModel
from app.config import settings
from app.session import session_manager
router = APIRouter(tags=["oauth"])
# ============ OAuth Metadata ============
@router.get("/.well-known/oauth-authorization-server")
async def oauth_metadata():
"""
OAuth 2.0 Authorization Server Metadata.
MCP clients check this endpoint to discover OAuth endpoints.
See: RFC 8414
"""
return {
"issuer": settings.mcp_server_url,
"authorization_endpoint": f"{settings.mcp_server_url}/oauth/authorize",
"token_endpoint": f"{settings.mcp_server_url}/oauth/token",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code"],
"code_challenge_methods_supported": ["S256"],
"token_endpoint_auth_methods_supported": ["client_secret_post", "none"],
}
# ============ Authorization Endpoint ============
@router.get("/oauth/authorize")
async def authorize(
request: Request,
redirect_uri: str,
state: str,
response_type: str = "code",
client_id: str | None = None,
code_challenge: str | None = None,
code_challenge_method: str | None = None,
):
"""
OAuth Authorization Endpoint.
MCP clients redirect users here to authenticate. We then redirect to
Zintlr's login page with our callback URL.
"""
# Build our callback URL that Zintlr will redirect to after login
callback_params = urlencode({
"redirect_uri": redirect_uri,
"state": state,
})
our_callback = f"{settings.mcp_server_url}/oauth/callback?{callback_params}"
# Redirect to Zintlr login
# Zintlr login accepts redirect_url parameter and will redirect back with tokens
zintlr_login_url = f"{settings.zintlr_frontend_url}/login?redirect_url={quote(our_callback)}"
return RedirectResponse(url=zintlr_login_url, status_code=302)
# ============ Callback Endpoint ============
@router.get("/oauth/callback")
async def oauth_callback(
request: Request,
redirect_uri: str,
state: str,
access_token: str | None = None,
key: str | None = None,
visitor_id: str | None = None,
error: str | None = None,
):
"""
OAuth Callback Endpoint.
Zintlr redirects here after user logs in, with JWT tokens as query params.
We store the tokens in Redis and redirect back to LLM client with a "code".
"""
# Check for error from Zintlr
if error:
error_params = urlencode({"error": error, "state": state})
return RedirectResponse(url=f"{redirect_uri}?{error_params}", status_code=302)
# Validate required tokens
if not all([access_token, key, visitor_id]):
missing = []
if not access_token:
missing.append("access_token")
if not key:
missing.append("key")
if not visitor_id:
missing.append("visitor_id")
error_params = urlencode({
"error": "invalid_request",
"error_description": f"Missing required parameters: {', '.join(missing)}",
"state": state,
})
return RedirectResponse(url=f"{redirect_uri}?{error_params}", status_code=302)
# Store tokens in Redis and get session ID
session_id = await session_manager.create_session(
access_token=access_token,
key=key,
visitor_id=visitor_id,
)
# Redirect back to LLM client with session_id as the authorization "code"
success_params = urlencode({
"code": session_id,
"state": state,
})
return RedirectResponse(url=f"{redirect_uri}?{success_params}", status_code=302)
# ============ Token Endpoint ============
class TokenRequest(BaseModel):
"""Token exchange request body."""
grant_type: str = "authorization_code"
code: str
redirect_uri: str | None = None
client_id: str | None = None
client_secret: str | None = None
code_verifier: str | None = None
@router.post("/oauth/token")
async def token_exchange(request: Request):
"""
OAuth Token Endpoint.
LLM client exchanges the authorization code for an access token.
We return the session_id as the access_token (it's used to lookup
the real Zintlr tokens from Redis).
"""
# Parse form data or JSON
content_type = request.headers.get("content-type", "")
if "application/x-www-form-urlencoded" in content_type:
form_data = await request.form()
code = form_data.get("code", "")
grant_type = form_data.get("grant_type", "authorization_code")
elif "application/json" in content_type:
json_data = await request.json()
code = json_data.get("code", "")
grant_type = json_data.get("grant_type", "authorization_code")
else:
# Try form data as default
form_data = await request.form()
code = form_data.get("code", "")
grant_type = form_data.get("grant_type", "authorization_code")
# Validate grant type
if grant_type != "authorization_code":
return JSONResponse(
status_code=400,
content={
"error": "unsupported_grant_type",
"error_description": "Only authorization_code grant is supported",
},
)
# Validate code exists in Redis
session = await session_manager.get_session(code)
if not session:
return JSONResponse(
status_code=400,
content={
"error": "invalid_grant",
"error_description": "Invalid or expired authorization code",
},
)
# Refresh session TTL
await session_manager.refresh_session(code)
# Return the session_id as the access_token
# LLM client will include this in Authorization: Bearer <session_id> header
return {
"access_token": code,
"token_type": "Bearer",
"expires_in": settings.session_expire_seconds,
}
# ============ Token Revocation (Optional) ============
@router.post("/oauth/revoke")
async def revoke_token(request: Request):
"""
OAuth Token Revocation Endpoint.
Allows LLM clients or users to revoke/logout.
"""
content_type = request.headers.get("content-type", "")
if "application/x-www-form-urlencoded" in content_type:
form_data = await request.form()
token = form_data.get("token", "")
else:
json_data = await request.json()
token = json_data.get("token", "")
if token:
await session_manager.delete_session(token)
# Always return 200 per RFC 7009
return {"status": "ok"}