# Implementation Plan: OAuth 2.1 Resource Server Mode
**Branch**: `002-oauth21-resource-server` | **Date**: 2025-12-12 | **Parent**: `001-mcp-sso-checklist`
**Input**: MCP Specification June 2025 updates, OAuth 2.1 requirements
**Status**: IMPLEMENTED (2025-12-12) | **Version**: 0.2.0
## Summary
Add OAuth 2.1 Resource Server support for cloud/multi-tenant deployment while preserving the
existing local authentication mode. This enables the MCP server to validate incoming Bearer
tokens per the June 2025 MCP specification, implement Protected Resource Metadata discovery,
and support Resource Indicators (RFC 8707) for token audience binding.
## Background
### Current State (Local Mode)
The existing implementation acts as an **OAuth 2.0 Client**:
- Acquires tokens from Azure Entra ID on behalf of the user
- Uses MSAL PKCE flow with interactive browser authentication
- Persists tokens locally in encrypted cache
- Appropriate for local/desktop deployments
### New Requirement (Cloud Mode)
The June 2025 MCP specification formally classifies MCP servers as **OAuth 2.1 Resource Servers**:
- Must validate incoming Bearer tokens (not acquire them)
- Must serve Protected Resource Metadata at `/.well-known/oauth-protected-resource`
- Must validate token audience (Resource Indicators - RFC 8707)
- Must return proper WWW-Authenticate headers on 401 responses
## Technical Context
**New Dependencies**:
- `PyJWT>=2.8.0` - JWT decoding and validation
- `cryptography>=42.0.0` - For JWKS key handling
- `httpx>=0.27.0` - Async HTTP client for JWKS fetching (already transitive)
**Standards Compliance**:
- OAuth 2.1 (draft-ietf-oauth-v2-1-13)
- RFC 9728 - Protected Resource Metadata
- RFC 8707 - Resource Indicators
- RFC 6750 - Bearer Token Usage
**Configuration**:
- `AUTH_MODE`: `local` (default) | `cloud` | `auto`
- `RESOURCE_IDENTIFIER`: URL identifying this MCP server (required for cloud mode)
- `ALLOWED_ISSUERS`: Comma-separated list of allowed token issuers
## Architecture
### Dual-Mode Authentication Flow
```
┌─────────────────────────────────────────────────────────────────────┐
│ MCP Tool Request │
└────────────────────┬────────────────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────────────────┐
│ Auth Mode Router (middleware.py) │
│ Checks AUTH_MODE setting │
└────────────────────┬────────────────────────────────────────────────┘
│
┌────────────┴────────────┐
│ │
[AUTH_MODE=local] [AUTH_MODE=cloud]
│ │
┌───────▼───────┐ ┌───────▼───────┐
│ Local Mode │ │ Cloud Mode │
│ (OAuth Client)│ │ (Resource Server)│
│ │ │ │
│ • Silent auth │ │ • Extract Bearer│
│ • Browser SSO │ │ • Validate JWT │
│ • Token cache │ │ • Check audience│
└───────────────┘ └───────────────┘
```
### Cloud Mode Token Validation Flow
```
┌──────────────────────────────────────────────────────────────────────┐
│ 1. Extract Bearer Token from Authorization Header │
│ Authorization: Bearer eyJhbGciOi... │
└────────────────────┬─────────────────────────────────────────────────┘
│
┌────────────────────▼─────────────────────────────────────────────────┐
│ 2. Decode JWT Header (without verification) │
│ Get 'kid' (key ID) and 'iss' (issuer) │
└────────────────────┬─────────────────────────────────────────────────┘
│
┌────────────────────▼─────────────────────────────────────────────────┐
│ 3. Fetch JWKS from issuer's well-known endpoint │
│ GET {issuer}/.well-known/openid-configuration → jwks_uri │
│ GET {jwks_uri} → keys[] │
│ (Cached for performance) │
└────────────────────┬─────────────────────────────────────────────────┘
│
┌────────────────────▼─────────────────────────────────────────────────┐
│ 4. Verify JWT Signature │
│ • Match 'kid' to JWKS key │
│ • Verify signature with public key │
│ • Validate exp, nbf, iat claims │
└────────────────────┬─────────────────────────────────────────────────┘
│
┌────────────────────▼─────────────────────────────────────────────────┐
│ 5. Validate Audience (Resource Indicator) │
│ • 'aud' claim MUST contain RESOURCE_IDENTIFIER │
│ • Prevents token mis-redemption attacks │
└────────────────────┬─────────────────────────────────────────────────┘
│
┌────────────────────▼─────────────────────────────────────────────────┐
│ 6. Validate Issuer │
│ • 'iss' claim MUST be in ALLOWED_ISSUERS │
│ • Prevents accepting tokens from untrusted IdPs │
└────────────────────┬─────────────────────────────────────────────────┘
│
┌────────────────────▼─────────────────────────────────────────────────┐
│ 7. Optional: Validate Scopes │
│ • Check 'scp' or 'scope' claim │
│ • Return 403 if insufficient scope │
└────────────────────┬─────────────────────────────────────────────────┘
│
┌────────────────────▼─────────────────────────────────────────────────┐
│ ✓ Token Valid - Proceed with Tool Execution │
└──────────────────────────────────────────────────────────────────────┘
```
## Project Structure Changes
```text
src/sso_mcp_server/
├── auth/
│ ├── __init__.py # Updated exports
│ ├── manager.py # Existing - Local mode orchestration
│ ├── browser.py # Existing - OAuth PKCE flow
│ ├── token_store.py # Existing - Local token persistence
│ ├── middleware.py # UPDATED - Dual-mode routing
│ ├── exceptions.py # UPDATED - New exception types
│ │
│ └── cloud/ # NEW - Cloud mode components
│ ├── __init__.py
│ ├── validator.py # JWT validation logic
│ ├── jwks_client.py # JWKS fetching with cache
│ └── claims.py # Token claims extraction
│
├── metadata/ # NEW - Protected Resource Metadata
│ ├── __init__.py
│ └── resource_metadata.py # /.well-known endpoint handler
│
└── config/
└── settings.py # UPDATED - New auth mode settings
```
## Implementation Phases
### Phase 1: Configuration & Exceptions
**Tasks**:
1. Update `config/settings.py`:
- Add `AUTH_MODE` enum: `local`, `cloud`, `auto`
- Add `RESOURCE_IDENTIFIER` (URL)
- Add `ALLOWED_ISSUERS` (list)
- Add `JWKS_CACHE_TTL` (seconds, default 3600)
- Add `SCOPES_SUPPORTED` (list, optional)
2. Update `auth/exceptions.py`:
- Add `InvalidTokenError` (401)
- Add `TokenExpiredError` (401)
- Add `InvalidAudienceError` (401)
- Add `InvalidIssuerError` (401)
- Add `InsufficientScopeError` (403)
- Add `MissingAuthorizationError` (401)
**Output**: Configuration ready for dual-mode operation
### Phase 2: JWKS Client
**Tasks**:
1. Create `auth/cloud/jwks_client.py`:
- Async JWKS fetching from issuer
- In-memory cache with TTL
- Automatic key rotation handling
- Error handling for unreachable JWKS endpoints
2. Unit tests for JWKS client:
- Cache hit/miss scenarios
- Key rotation handling
- Network error handling
**Key Implementation**:
```python
class JWKSClient:
"""Fetches and caches JWKS from OAuth issuers."""
async def get_signing_key(self, token: str) -> RSAPublicKey:
"""Get the signing key for a JWT token."""
# 1. Decode header to get 'kid' and 'iss'
# 2. Check cache for JWKS
# 3. Fetch from {iss}/.well-known/openid-configuration if needed
# 4. Return matching key
```
**Dependencies**: Phase 1
**Output**: JWKS client with caching
### Phase 3: Token Validator
**Tasks**:
1. Create `auth/cloud/validator.py`:
- JWT signature verification
- Standard claims validation (exp, nbf, iat)
- Audience validation (RESOURCE_IDENTIFIER)
- Issuer validation (ALLOWED_ISSUERS)
- Scope extraction
2. Create `auth/cloud/claims.py`:
- Token claims dataclass
- Scope parsing (space-separated or array)
- User identity extraction
3. Unit tests:
- Valid token scenarios
- Expired token handling
- Invalid signature handling
- Wrong audience handling
- Wrong issuer handling
**Key Implementation**:
```python
class TokenValidator:
"""Validates OAuth 2.1 Bearer tokens."""
def __init__(
self,
resource_identifier: str,
allowed_issuers: list[str],
jwks_client: JWKSClient,
):
...
async def validate(self, token: str) -> TokenClaims:
"""Validate token and return claims."""
# 1. Get signing key from JWKS
# 2. Verify signature
# 3. Validate standard claims
# 4. Validate audience
# 5. Validate issuer
# 6. Return TokenClaims
```
**Dependencies**: Phase 2
**Output**: Token validation with full JWT support
### Phase 4: Protected Resource Metadata
**Tasks**:
1. Create `metadata/resource_metadata.py`:
- Generate RFC 9728 compliant metadata
- Expose via `/.well-known/oauth-protected-resource`
2. Integrate with FastMCP server:
- Add custom route for well-known endpoint
- Return JSON metadata document
**Metadata Format**:
```json
{
"resource": "https://mcp.example.com",
"authorization_servers": [
"https://login.microsoftonline.com/{tenant}/v2.0"
],
"scopes_supported": [
"checklist:read",
"checklist:list"
],
"bearer_methods_supported": ["header"]
}
```
**Dependencies**: Phase 1
**Output**: Protected Resource Metadata endpoint
### Phase 5: Middleware Integration
**Tasks**:
1. Update `auth/middleware.py`:
- Add auth mode routing logic
- Implement Bearer token extraction
- Integrate with TokenValidator for cloud mode
- Preserve existing local mode behavior
2. Add WWW-Authenticate header handling:
- Return proper headers on 401 responses
- Include `resource_metadata` URL
- Include `scope` when insufficient
**Key Implementation**:
```python
def require_auth(func: F) -> F:
"""Decorator to require authentication for a tool function."""
@wraps(func)
async def wrapper(*args, **kwargs):
settings = get_settings()
if settings.auth_mode == AuthMode.LOCAL:
# Existing local mode logic
return await _local_auth_flow(func, *args, **kwargs)
elif settings.auth_mode == AuthMode.CLOUD:
# New cloud mode logic
return await _cloud_auth_flow(func, *args, **kwargs)
else: # AUTO mode
# Try to detect from request context
...
return wrapper
```
**Dependencies**: Phase 3, Phase 4
**Output**: Dual-mode authentication middleware
### Phase 6: HTTP Error Responses
**Tasks**:
1. Implement proper 401 responses:
```http
HTTP/1.1 401 Unauthorized
WWW-Authenticate: Bearer realm="sso-mcp-server",
resource_metadata="https://mcp.example.com/.well-known/oauth-protected-resource",
error="invalid_token",
error_description="Token has expired"
```
2. Implement proper 403 responses:
```http
HTTP/1.1 403 Forbidden
WWW-Authenticate: Bearer realm="sso-mcp-server",
scope="checklist:read checklist:write",
error="insufficient_scope",
error_description="Requires checklist:write scope"
```
3. Integration tests for error responses
**Dependencies**: Phase 5
**Output**: RFC 6750 compliant error responses
### Phase 7: Integration & Testing
**Tasks**:
1. End-to-end tests:
- Cloud mode with mock JWT tokens
- Local mode preserved functionality
- Auto mode detection
2. Security tests:
- Token replay attack prevention
- Audience mismatch rejection
- Issuer mismatch rejection
- Expired token rejection
3. Documentation:
- Update quickstart.md with cloud mode setup
- Add deployment guide for cloud scenarios
- Azure App Registration configuration
**Dependencies**: Phase 5, Phase 6
**Output**: Production-ready dual-mode server
---
## Configuration Reference
### Environment Variables
| Variable | Local Mode | Cloud Mode | Description |
|----------|------------|------------|-------------|
| `AUTH_MODE` | `local` (default) | `cloud` | Authentication mode |
| `AZURE_CLIENT_ID` | Required | Not used | Azure app client ID |
| `AZURE_TENANT_ID` | Required | Not used | Azure tenant ID |
| `RESOURCE_IDENTIFIER` | Not used | Required | This server's resource URL |
| `ALLOWED_ISSUERS` | Not used | Required | Comma-separated issuer URLs |
| `JWKS_CACHE_TTL` | Not used | Optional (3600) | JWKS cache TTL in seconds |
| `SCOPES_SUPPORTED` | Not used | Optional | Advertised scopes |
### Example Configurations
**Local Mode** (existing behavior):
```env
AUTH_MODE=local
AZURE_CLIENT_ID=your-client-id
AZURE_TENANT_ID=your-tenant-id
CHECKLIST_DIR=./checklists
```
**Cloud Mode** (new):
```env
AUTH_MODE=cloud
RESOURCE_IDENTIFIER=https://mcp.example.com
ALLOWED_ISSUERS=https://login.microsoftonline.com/your-tenant/v2.0,https://login.microsoftonline.com/common/v2.0
SCOPES_SUPPORTED=checklist:read,checklist:list
CHECKLIST_DIR=./checklists
```
---
## Security Considerations
### Token Validation Checklist
| Check | Implementation | Criticality |
|-------|----------------|-------------|
| Signature verification | PyJWT + JWKS | Critical |
| Expiration (exp) | PyJWT standard | Critical |
| Not Before (nbf) | PyJWT standard | High |
| Issued At (iat) | Optional validation | Medium |
| Audience (aud) | Custom validation | Critical |
| Issuer (iss) | Custom validation | Critical |
| Scope (scp/scope) | Custom extraction | High |
### Attack Mitigations
| Attack | Mitigation |
|--------|------------|
| Token replay | Short-lived tokens (IdP responsibility) |
| Token theft | HTTPS required, no token logging |
| Confused deputy | Audience validation (RFC 8707) |
| Issuer spoofing | Allowlist validation |
| JWKS poisoning | Fetch only from known issuers |
---
## Success Criteria
| Criteria | Validation |
|----------|------------|
| Cloud mode validates Bearer tokens | Integration test with mock JWT |
| Local mode unchanged | Existing tests pass |
| Protected Resource Metadata served | HTTP GET test |
| Invalid tokens return 401 | Security test suite |
| Missing scope returns 403 | Security test suite |
| JWKS caching works | Performance test |
| Configuration validation | Unit tests |
---
## Dependencies
### New Python Packages
```toml
# pyproject.toml additions
dependencies = [
# ... existing ...
"PyJWT>=2.8.0",
"cryptography>=42.0.0",
]
```
### Azure Configuration (Cloud Mode)
1. **App Registration** in target tenant:
- Expose an API with Application ID URI
- Define scopes: `checklist:read`, `checklist:list`
- Configure allowed client applications
2. **Token Configuration**:
- Tokens must include `aud` claim matching `RESOURCE_IDENTIFIER`
- Tokens must include `scp` or `scope` claim
---
## Related Documents
- **Parent Feature**: [001-mcp-sso-checklist/design.md](../001-mcp-sso-checklist/design.md)
- **MCP Spec**: [Authorization](https://modelcontextprotocol.io/specification/draft/basic/authorization)
- **MCP Spec**: [Security Best Practices](https://modelcontextprotocol.io/specification/draft/basic/security_best_practices)
- **RFC 9728**: Protected Resource Metadata
- **RFC 8707**: Resource Indicators
- **RFC 6750**: Bearer Token Usage
- **OAuth 2.1**: draft-ietf-oauth-v2-1-13