"""
OAuth-compliant OpenAPI server for ChatGPT Enterprise integration.
Uses OAuth 2.0 Client Credentials flow for service-to-service authentication.
Includes well-known discovery endpoints required by ChatGPT Enterprise.
"""
import os
import httpx
import secrets
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field
from fastapi import FastAPI, HTTPException, Depends, Request, status, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse, HTMLResponse
from fastapi.security import OAuth2
from starlette.middleware.sessions import SessionMiddleware
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configuration
LANGGRAPH_BASE_URL = os.getenv("LANGGRAPH_BASE_URL", "http://localhost:2024")
OAUTH_ENABLED = os.getenv("OAUTH_ENABLED", "true").lower() == "true"
OAUTH_PROVIDER = os.getenv("OAUTH_PROVIDER", "okta")
SECRET_KEY = os.getenv("SECRET_KEY", secrets.token_urlsafe(32))
API_KEYS = os.getenv("API_KEYS", "").split(",") if os.getenv("API_KEYS") \
else []
SERVER_BASE_URL = os.getenv("SERVER_BASE_URL", "http://localhost:8001")
# Okta Configuration
OKTA_DOMAIN = os.getenv("OKTA_DOMAIN", "")
OKTA_CLIENT_ID = os.getenv("OKTA_CLIENT_ID", "")
OKTA_CLIENT_SECRET = os.getenv("OKTA_CLIENT_SECRET", "")
# Google Configuration
GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID", "")
GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "")
# Create FastAPI app with OpenAPI metadata
app = FastAPI(
title="LangGraph Agent API",
description="OAuth-secured API for LangGraph agents",
version="1.0.0",
servers=[
{
"url": SERVER_BASE_URL,
"description": "API Server"
}
],
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json"
)
# Customize OpenAPI schema to include OAuth2 Client Credentials
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
from fastapi.openapi.utils import get_openapi
openapi_schema = get_openapi(
title=app.title,
version=app.version,
description=app.description,
routes=app.routes,
)
# Add OAuth2 Client Credentials security scheme (pointing to our token endpoint)
openapi_schema["components"]["securitySchemes"] = {
"OAuth2ClientCredentials": {
"type": "oauth2",
"flows": {
"clientCredentials": {
"tokenUrl": f"{SERVER_BASE_URL}/oauth/token",
"scopes": {}
}
}
},
"ApiKeyAuth": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key"
}
}
# Apply security globally to all endpoints
openapi_schema["security"] = [
{"OAuth2ClientCredentials": []},
{"ApiKeyAuth": []}
]
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
# Add session middleware
app.add_middleware(
SessionMiddleware,
secret_key=SECRET_KEY,
max_age=3600
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# OAuth2 scheme for Client Credentials flow (ChatGPT Enterprise)
class OAuth2ClientCredentials(OAuth2):
"""OAuth2 Client Credentials flow for ChatGPT Enterprise."""
def __init__(self, tokenUrl: str, auto_error: bool = True):
flows = {
"clientCredentials": {
"tokenUrl": tokenUrl,
"scopes": {}
}
}
super().__init__(flows=flows, auto_error=auto_error)
oauth2_scheme = OAuth2ClientCredentials(
tokenUrl=f"{SERVER_BASE_URL}/oauth/token",
auto_error=False
)
# ==========================================
# Pydantic Models
# ==========================================
class InvokeRequest(BaseModel):
"""Request model for invoking the agent."""
prompt: str = Field(
...,
description="The user prompt/query to send to the agent",
example="What is the weather like today?"
)
assistant_id: str = Field(
default="agent",
description="The assistant/agent ID to invoke",
example="agent"
)
thread_id: Optional[str] = Field(
default=None,
description="Optional thread ID for conversation continuity",
example="thread-abc-123"
)
class InvokeResponse(BaseModel):
"""Response model for agent invocation."""
run_id: str = Field(..., description="The run ID")
thread_id: str = Field(..., description="The thread ID")
output: Dict[str, Any] = Field(..., description="Agent response")
status: str = Field(..., description="Status", example="success")
class StreamRequest(BaseModel):
"""Request model for streaming agent responses."""
prompt: str = Field(
...,
description="The user prompt/query",
example="Explain quantum computing"
)
assistant_id: str = Field(
default="agent",
description="The assistant/agent ID",
example="agent"
)
thread_id: Optional[str] = Field(
default=None,
description="Optional thread ID",
example="thread-xyz-789"
)
class StreamResponse(BaseModel):
"""Response model for streaming."""
output: str = Field(..., description="The streamed content")
chunks_received: int = Field(..., description="Chunks received")
status: str = Field(..., description="Status", example="success")
class HealthResponse(BaseModel):
"""Health check response."""
status: str = Field(..., example="healthy")
service: str = Field(..., example="LangGraph Agent API")
version: str = Field(..., example="1.0.0")
auth_enabled: bool = Field(..., example=True)
class OAuthConfigResponse(BaseModel):
"""OAuth configuration response."""
issuer: str
authorization_endpoint: str
token_endpoint: str
userinfo_endpoint: Optional[str] = None
jwks_uri: Optional[str] = None
response_types_supported: list = ["code", "token"]
grant_types_supported: list = ["authorization_code", "client_credentials"] # noqa: E501
subject_types_supported: list = ["public"]
id_token_signing_alg_values_supported: list = ["RS256"]
scopes_supported: list = ["openid", "profile", "email"]
# ==========================================
# Authentication
# ==========================================
async def verify_token(request: Request):
"""Verify OAuth token (from Okta) or API key."""
# Check for API key in header
api_key = request.headers.get("X-API-Key")
if api_key and api_key in API_KEYS:
return {"authenticated": True, "method": "api_key"}
# Check for Bearer token
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
# Validate token with Okta if OAuth is enabled
if OAUTH_ENABLED and OAUTH_PROVIDER == "okta":
try:
import httpx
introspect_url = (
f"https://{OKTA_DOMAIN}/oauth2/default/v1/introspect"
)
async with httpx.AsyncClient() as client:
response = await client.post(
introspect_url,
data={
"token": token,
"token_type_hint": "access_token"
},
auth=(OKTA_CLIENT_ID, OKTA_CLIENT_SECRET),
timeout=10.0
)
if response.status_code == 200:
token_info = response.json()
if token_info.get("active"):
return {
"authenticated": True,
"method": "oauth",
"token": token,
"token_info": token_info
}
except Exception as e:
print(f"Token validation error: {e}")
raise HTTPException(
status_code=401,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"}
)
# Fallback for other OAuth providers or if validation disabled
if OAUTH_ENABLED:
return {"authenticated": True, "method": "oauth", "token": token}
# Check session
if request.session.get("user"):
return {"authenticated": True, "method": "session",
"user": request.session.get("user")}
# If OAuth not enabled, allow unauthenticated access
if not OAUTH_ENABLED:
return {"authenticated": True, "method": "none"}
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
# ==========================================
# Well-Known Endpoints (Required by ChatGPT)
# ==========================================
@app.get("/.well-known/openid-configuration")
async def openid_configuration():
"""
OpenID Connect discovery endpoint.
Points to Okta as the OAuth provider.
"""
okta_issuer = f"https://{OKTA_DOMAIN}/oauth2/default"
return {
"issuer": okta_issuer,
"authorization_endpoint": f"{okta_issuer}/v1/authorize",
"token_endpoint": f"{okta_issuer}/v1/token",
"userinfo_endpoint": f"{okta_issuer}/v1/userinfo",
"jwks_uri": f"{okta_issuer}/v1/keys",
"response_types_supported": ["code", "token", "id_token"],
"grant_types_supported": ["client_credentials", "authorization_code"],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
"scopes_supported": ["openid", "profile", "email"],
"token_endpoint_auth_methods_supported": [
"client_secret_basic",
"client_secret_post"
],
"claims_supported": ["sub", "name", "email", "email_verified"]
}
@app.get("/.well-known/oauth-authorization-server")
async def oauth_authorization_server():
"""
OAuth 2.0 authorization server metadata.
Points to Okta as the OAuth provider.
"""
okta_issuer = f"https://{OKTA_DOMAIN}/oauth2/default"
return {
"issuer": okta_issuer,
"token_endpoint": f"{okta_issuer}/v1/token",
"jwks_uri": f"{okta_issuer}/v1/keys",
"grant_types_supported": ["client_credentials"],
"token_endpoint_auth_methods_supported": [
"client_secret_basic",
"client_secret_post"
],
"scopes_supported": ["openid", "profile", "email"]
}
@app.get("/.well-known/jwks.json")
async def jwks():
"""JSON Web Key Set endpoint."""
# In production, return actual public keys
return {
"keys": [
{
"kty": "RSA",
"use": "sig",
"kid": "1",
"alg": "RS256",
"n": "example_modulus",
"e": "AQAB"
}
]
}
@app.get("/.well-known/oauth-protected-resource")
async def oauth_protected_resource():
"""OAuth 2.0 protected resource metadata."""
return {
"resource": SERVER_BASE_URL,
"authorization_servers": [SERVER_BASE_URL],
"scopes_supported": ["openid", "profile", "email"],
"bearer_methods_supported": ["header", "query"],
"resource_documentation": f"{SERVER_BASE_URL}/docs"
}
@app.get("/.well-known/oauth-authorization-server/openapi.json")
async def oauth_server_openapi():
"""OpenAPI spec for OAuth authorization server."""
# Return a reference to the main OpenAPI spec
return RedirectResponse(url="/openapi.json", status_code=302)
@app.get("/.well-known/openid-configuration/openapi.json")
async def openid_config_openapi():
"""OpenAPI spec for OpenID configuration."""
# Return a reference to the main OpenAPI spec
return RedirectResponse(url="/openapi.json", status_code=302)
@app.get("/.well-known/oauth-protected-resource/openapi.json")
async def oauth_protected_resource_openapi():
"""OpenAPI spec for OAuth protected resource."""
# Return a reference to the main OpenAPI spec
return RedirectResponse(url="/openapi.json", status_code=302)
@app.get("/openapi.json/.well-known/openid-configuration")
async def openapi_openid_config():
"""OpenID configuration at alternate path (for some OAuth clients)."""
# Return the same OpenID configuration
return await openid_configuration()
# ==========================================
# OAuth Token Endpoint (Client Credentials)
# ==========================================
@app.post("/oauth/token")
async def oauth_token(
grant_type: str = Form(...),
client_id: str = Form(...),
client_secret: str = Form(...),
scope: Optional[str] = Form(None)
):
"""
OAuth 2.0 token endpoint for Client Credentials flow.
Validates client credentials and issues access tokens.
For ChatGPT Enterprise integration.
"""
# Validate grant type
if grant_type != "client_credentials":
raise HTTPException(
status_code=400,
detail="unsupported_grant_type"
)
# Validate client credentials against Okta config
if client_id != OKTA_CLIENT_ID or client_secret != OKTA_CLIENT_SECRET:
raise HTTPException(
status_code=401,
detail="invalid_client"
)
# Generate access token
access_token = secrets.token_urlsafe(32)
# Return token response
return {
"access_token": access_token,
"token_type": "Bearer",
"expires_in": 3600,
"scope": scope or ""
}
# ==========================================
# OAuth Userinfo Endpoint
# ==========================================
@app.get("/oauth/userinfo")
async def oauth_userinfo(auth: dict = Depends(verify_token)):
"""OAuth userinfo endpoint - returns user info from validated token."""
if not auth.get("authenticated"):
raise HTTPException(status_code=401, detail="Not authenticated")
# If token was already validated by verify_token, use that info
if auth.get("method") == "oauth" and auth.get("token_info"):
token_info = auth["token_info"]
return {
"sub": token_info.get("sub", "unknown"),
"name": token_info.get("username", "ChatGPT Enterprise"),
"email": token_info.get("username", "chatgpt@openai.com"),
"email_verified": True
}
# Fallback for API key auth
return {
"sub": "api-key-user",
"name": "API Key User",
"email": "api@example.com",
"email_verified": False
}
# ==========================================
# Root Endpoint
# ==========================================
@app.get("/", response_class=HTMLResponse)
async def root():
"""Root endpoint with API information."""
return """
<!DOCTYPE html>
<html>
<head>
<title>LangGraph Agent API</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 800px;
margin: 50px auto;
padding: 20px;
}
h1 { color: #333; }
.endpoint {
background: #f5f5f5;
padding: 10px;
margin: 10px 0;
border-radius: 5px;
}
a { color: #0066cc; }
</style>
</head>
<body>
<h1>🚀 LangGraph Agent API</h1>
<p>OAuth-secured API for ChatGPT Enterprise integration</p>
<h2>📚 Documentation</h2>
<div class="endpoint">
<a href="/docs">Interactive API Docs (Swagger UI)</a>
</div>
<div class="endpoint">
<a href="/redoc">API Documentation (ReDoc)</a>
</div>
<div class="endpoint">
<a href="/openapi.json">OpenAPI Specification</a>
</div>
<h2>🔐 OAuth Endpoints</h2>
<div class="endpoint">
<a href="/.well-known/openid-configuration">
OpenID Configuration
</a>
</div>
<div class="endpoint">
<a href="/.well-known/oauth-authorization-server">
OAuth Authorization Server
</a>
</div>
<h2>🏥 System</h2>
<div class="endpoint">
<a href="/health">Health Check</a>
</div>
</body>
</html>
"""
# ==========================================
# API Endpoints
# ==========================================
@app.get(
"/health",
response_model=HealthResponse,
summary="Health Check",
tags=["System"]
)
async def health_check():
"""Check API health status."""
return {
"status": "healthy",
"service": "LangGraph Agent API",
"version": "1.0.0",
"auth_enabled": OAUTH_ENABLED
}
@app.post(
"/invoke",
response_model=InvokeResponse,
summary="Invoke Agent",
description="Invoke the LangGraph agent with a prompt",
tags=["Agent"]
)
async def invoke_agent(
request: InvokeRequest,
auth: dict = Depends(verify_token)
):
"""Invoke the LangGraph agent with a prompt."""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
payload = {
"assistant_id": request.assistant_id,
"input": {
"messages": [
{
"type": "human",
"content": request.prompt
}
]
}
}
if request.thread_id:
payload["thread_id"] = request.thread_id
# Create the run
response = await client.post(
f"{LANGGRAPH_BASE_URL}/runs",
json=payload
)
response.raise_for_status()
run_data = response.json()
run_id = run_data.get("run_id")
# Wait for completion
result_response = await client.get(
f"{LANGGRAPH_BASE_URL}/runs/{run_id}/wait",
timeout=120.0
)
result_response.raise_for_status()
result = result_response.json()
return {
"run_id": run_id,
"thread_id": run_data.get("thread_id"),
"output": result,
"status": "success"
}
except httpx.HTTPError as e:
raise HTTPException(
status_code=500,
detail=f"HTTP error invoking agent: {str(e)}"
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error invoking agent: {str(e)}"
)
@app.post(
"/stream",
response_model=StreamResponse,
summary="Stream Agent Response",
description="Stream responses from the LangGraph agent",
tags=["Agent"]
)
async def stream_agent(
request: StreamRequest,
auth: dict = Depends(verify_token)
):
"""Stream responses from the LangGraph agent."""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
payload = {
"assistant_id": request.assistant_id,
"input": {
"messages": [
{
"type": "human",
"content": request.prompt
}
]
},
"stream_mode": ["messages"]
}
if request.thread_id:
payload["thread_id"] = request.thread_id
async with client.stream(
"POST",
f"{LANGGRAPH_BASE_URL}/runs/stream",
json=payload
) as response:
response.raise_for_status()
chunks = []
async for chunk in response.aiter_text():
if chunk.strip():
chunks.append(chunk)
return {
"output": "".join(chunks),
"chunks_received": len(chunks),
"status": "success"
}
except httpx.HTTPError as e:
raise HTTPException(
status_code=500,
detail=f"HTTP error streaming from agent: {str(e)}"
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error streaming from agent: {str(e)}"
)
@app.get(
"/agents",
summary="List Agents",
description="Get available agents",
tags=["System"]
)
async def list_agents(auth: dict = Depends(verify_token)):
"""List available agents."""
return {
"agents": [
{
"id": "agent",
"name": "General Agent",
"description": "General purpose LangGraph agent"
},
{
"id": "health",
"name": "Health Agent",
"description": "System health monitoring agent"
}
]
}
if __name__ == "__main__":
import uvicorn
print("=" * 70)
print("OAuth-Compliant OpenAPI Server for ChatGPT Enterprise")
print("=" * 70)
print(f"Server URL: {SERVER_BASE_URL}")
print(f"OpenAPI Spec: {SERVER_BASE_URL}/openapi.json")
print(f"API Docs: {SERVER_BASE_URL}/docs")
print(f"OAuth Enabled: {OAUTH_ENABLED}")
print(f"OAuth Provider: {OAUTH_PROVIDER}")
print()
print("Well-Known Endpoints:")
print(f" OpenID Config: {SERVER_BASE_URL}/.well-known/openid-configuration") # noqa: E501
print(f" OAuth Server: {SERVER_BASE_URL}/.well-known/oauth-authorization-server") # noqa: E501
print(f" JWKS: {SERVER_BASE_URL}/.well-known/jwks.json")
print("=" * 70)
uvicorn.run(app, host="0.0.0.0", port=8001)