Skip to main content
Glama
openapi_oauth_server.py.backup22.5 kB
""" OAuth-compliant OpenAPI server for ChatGPT Enterprise integration. Uses OAuth 2.0 Client Credentials flow for service-to-service authentication. Includes well-known discovery endpoints required by ChatGPT Enterprise. """ import os import httpx import secrets from typing import Optional, Dict, Any from pydantic import BaseModel, Field from fastapi import FastAPI, HTTPException, Depends, Request, status, Form from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import RedirectResponse, HTMLResponse from fastapi.security import OAuth2 from starlette.middleware.sessions import SessionMiddleware from dotenv import load_dotenv # Load environment variables load_dotenv() # Configuration LANGGRAPH_BASE_URL = os.getenv("LANGGRAPH_BASE_URL", "http://localhost:2024") OAUTH_ENABLED = os.getenv("OAUTH_ENABLED", "true").lower() == "true" OAUTH_PROVIDER = os.getenv("OAUTH_PROVIDER", "okta") SECRET_KEY = os.getenv("SECRET_KEY", secrets.token_urlsafe(32)) API_KEYS = os.getenv("API_KEYS", "").split(",") if os.getenv("API_KEYS") \ else [] SERVER_BASE_URL = os.getenv("SERVER_BASE_URL", "http://localhost:8001") # Okta Configuration OKTA_DOMAIN = os.getenv("OKTA_DOMAIN", "") OKTA_CLIENT_ID = os.getenv("OKTA_CLIENT_ID", "") OKTA_CLIENT_SECRET = os.getenv("OKTA_CLIENT_SECRET", "") # Google Configuration GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID", "") GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "") # Create FastAPI app with OpenAPI metadata app = FastAPI( title="LangGraph Agent API", description="OAuth-secured API for LangGraph agents", version="1.0.0", servers=[ { "url": SERVER_BASE_URL, "description": "API Server" } ], docs_url="/docs", redoc_url="/redoc", openapi_url="/openapi.json" ) # Customize OpenAPI schema to include OAuth2 Client Credentials def custom_openapi(): if app.openapi_schema: return app.openapi_schema from fastapi.openapi.utils import get_openapi openapi_schema = get_openapi( title=app.title, version=app.version, description=app.description, routes=app.routes, ) # Add OAuth2 Client Credentials security scheme (pointing to our token endpoint) openapi_schema["components"]["securitySchemes"] = { "OAuth2ClientCredentials": { "type": "oauth2", "flows": { "clientCredentials": { "tokenUrl": f"{SERVER_BASE_URL}/oauth/token", "scopes": {} } } }, "ApiKeyAuth": { "type": "apiKey", "in": "header", "name": "X-API-Key" } } # Apply security globally to all endpoints openapi_schema["security"] = [ {"OAuth2ClientCredentials": []}, {"ApiKeyAuth": []} ] app.openapi_schema = openapi_schema return app.openapi_schema app.openapi = custom_openapi # Add session middleware app.add_middleware( SessionMiddleware, secret_key=SECRET_KEY, max_age=3600 ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # OAuth2 scheme for Client Credentials flow (ChatGPT Enterprise) class OAuth2ClientCredentials(OAuth2): """OAuth2 Client Credentials flow for ChatGPT Enterprise.""" def __init__(self, tokenUrl: str, auto_error: bool = True): flows = { "clientCredentials": { "tokenUrl": tokenUrl, "scopes": {} } } super().__init__(flows=flows, auto_error=auto_error) oauth2_scheme = OAuth2ClientCredentials( tokenUrl=f"{SERVER_BASE_URL}/oauth/token", auto_error=False ) # ========================================== # Pydantic Models # ========================================== class InvokeRequest(BaseModel): """Request model for invoking the agent.""" prompt: str = Field( ..., description="The user prompt/query to send to the agent", example="What is the weather like today?" ) assistant_id: str = Field( default="agent", description="The assistant/agent ID to invoke", example="agent" ) thread_id: Optional[str] = Field( default=None, description="Optional thread ID for conversation continuity", example="thread-abc-123" ) class InvokeResponse(BaseModel): """Response model for agent invocation.""" run_id: str = Field(..., description="The run ID") thread_id: str = Field(..., description="The thread ID") output: Dict[str, Any] = Field(..., description="Agent response") status: str = Field(..., description="Status", example="success") class StreamRequest(BaseModel): """Request model for streaming agent responses.""" prompt: str = Field( ..., description="The user prompt/query", example="Explain quantum computing" ) assistant_id: str = Field( default="agent", description="The assistant/agent ID", example="agent" ) thread_id: Optional[str] = Field( default=None, description="Optional thread ID", example="thread-xyz-789" ) class StreamResponse(BaseModel): """Response model for streaming.""" output: str = Field(..., description="The streamed content") chunks_received: int = Field(..., description="Chunks received") status: str = Field(..., description="Status", example="success") class HealthResponse(BaseModel): """Health check response.""" status: str = Field(..., example="healthy") service: str = Field(..., example="LangGraph Agent API") version: str = Field(..., example="1.0.0") auth_enabled: bool = Field(..., example=True) class OAuthConfigResponse(BaseModel): """OAuth configuration response.""" issuer: str authorization_endpoint: str token_endpoint: str userinfo_endpoint: Optional[str] = None jwks_uri: Optional[str] = None response_types_supported: list = ["code", "token"] grant_types_supported: list = ["authorization_code", "client_credentials"] # noqa: E501 subject_types_supported: list = ["public"] id_token_signing_alg_values_supported: list = ["RS256"] scopes_supported: list = ["openid", "profile", "email"] # ========================================== # Authentication # ========================================== async def verify_token(request: Request): """Verify OAuth token (from Okta) or API key.""" # Check for API key in header api_key = request.headers.get("X-API-Key") if api_key and api_key in API_KEYS: return {"authenticated": True, "method": "api_key"} # Check for Bearer token auth_header = request.headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): token = auth_header.split(" ")[1] # Validate token with Okta if OAuth is enabled if OAUTH_ENABLED and OAUTH_PROVIDER == "okta": try: import httpx introspect_url = ( f"https://{OKTA_DOMAIN}/oauth2/default/v1/introspect" ) async with httpx.AsyncClient() as client: response = await client.post( introspect_url, data={ "token": token, "token_type_hint": "access_token" }, auth=(OKTA_CLIENT_ID, OKTA_CLIENT_SECRET), timeout=10.0 ) if response.status_code == 200: token_info = response.json() if token_info.get("active"): return { "authenticated": True, "method": "oauth", "token": token, "token_info": token_info } except Exception as e: print(f"Token validation error: {e}") raise HTTPException( status_code=401, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"} ) # Fallback for other OAuth providers or if validation disabled if OAUTH_ENABLED: return {"authenticated": True, "method": "oauth", "token": token} # Check session if request.session.get("user"): return {"authenticated": True, "method": "session", "user": request.session.get("user")} # If OAuth not enabled, allow unauthenticated access if not OAUTH_ENABLED: return {"authenticated": True, "method": "none"} raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required", headers={"WWW-Authenticate": "Bearer"}, ) # ========================================== # Well-Known Endpoints (Required by ChatGPT) # ========================================== @app.get("/.well-known/openid-configuration") async def openid_configuration(): """ OpenID Connect discovery endpoint. Points to Okta as the OAuth provider. """ okta_issuer = f"https://{OKTA_DOMAIN}/oauth2/default" return { "issuer": okta_issuer, "authorization_endpoint": f"{okta_issuer}/v1/authorize", "token_endpoint": f"{okta_issuer}/v1/token", "userinfo_endpoint": f"{okta_issuer}/v1/userinfo", "jwks_uri": f"{okta_issuer}/v1/keys", "response_types_supported": ["code", "token", "id_token"], "grant_types_supported": ["client_credentials", "authorization_code"], "subject_types_supported": ["public"], "id_token_signing_alg_values_supported": ["RS256"], "scopes_supported": ["openid", "profile", "email"], "token_endpoint_auth_methods_supported": [ "client_secret_basic", "client_secret_post" ], "claims_supported": ["sub", "name", "email", "email_verified"] } @app.get("/.well-known/oauth-authorization-server") async def oauth_authorization_server(): """ OAuth 2.0 authorization server metadata. Points to Okta as the OAuth provider. """ okta_issuer = f"https://{OKTA_DOMAIN}/oauth2/default" return { "issuer": okta_issuer, "token_endpoint": f"{okta_issuer}/v1/token", "jwks_uri": f"{okta_issuer}/v1/keys", "grant_types_supported": ["client_credentials"], "token_endpoint_auth_methods_supported": [ "client_secret_basic", "client_secret_post" ], "scopes_supported": ["openid", "profile", "email"] } @app.get("/.well-known/jwks.json") async def jwks(): """JSON Web Key Set endpoint.""" # In production, return actual public keys return { "keys": [ { "kty": "RSA", "use": "sig", "kid": "1", "alg": "RS256", "n": "example_modulus", "e": "AQAB" } ] } @app.get("/.well-known/oauth-protected-resource") async def oauth_protected_resource(): """OAuth 2.0 protected resource metadata.""" return { "resource": SERVER_BASE_URL, "authorization_servers": [SERVER_BASE_URL], "scopes_supported": ["openid", "profile", "email"], "bearer_methods_supported": ["header", "query"], "resource_documentation": f"{SERVER_BASE_URL}/docs" } @app.get("/.well-known/oauth-authorization-server/openapi.json") async def oauth_server_openapi(): """OpenAPI spec for OAuth authorization server.""" # Return a reference to the main OpenAPI spec return RedirectResponse(url="/openapi.json", status_code=302) @app.get("/.well-known/openid-configuration/openapi.json") async def openid_config_openapi(): """OpenAPI spec for OpenID configuration.""" # Return a reference to the main OpenAPI spec return RedirectResponse(url="/openapi.json", status_code=302) @app.get("/.well-known/oauth-protected-resource/openapi.json") async def oauth_protected_resource_openapi(): """OpenAPI spec for OAuth protected resource.""" # Return a reference to the main OpenAPI spec return RedirectResponse(url="/openapi.json", status_code=302) @app.get("/openapi.json/.well-known/openid-configuration") async def openapi_openid_config(): """OpenID configuration at alternate path (for some OAuth clients).""" # Return the same OpenID configuration return await openid_configuration() # ========================================== # OAuth Token Endpoint (Client Credentials) # ========================================== @app.post("/oauth/token") async def oauth_token( grant_type: str = Form(...), client_id: str = Form(...), client_secret: str = Form(...), scope: Optional[str] = Form(None) ): """ OAuth 2.0 token endpoint for Client Credentials flow. Validates client credentials and issues access tokens. For ChatGPT Enterprise integration. """ # Validate grant type if grant_type != "client_credentials": raise HTTPException( status_code=400, detail="unsupported_grant_type" ) # Validate client credentials against Okta config if client_id != OKTA_CLIENT_ID or client_secret != OKTA_CLIENT_SECRET: raise HTTPException( status_code=401, detail="invalid_client" ) # Generate access token access_token = secrets.token_urlsafe(32) # Return token response return { "access_token": access_token, "token_type": "Bearer", "expires_in": 3600, "scope": scope or "" } # ========================================== # OAuth Userinfo Endpoint # ========================================== @app.get("/oauth/userinfo") async def oauth_userinfo(auth: dict = Depends(verify_token)): """OAuth userinfo endpoint - returns user info from validated token.""" if not auth.get("authenticated"): raise HTTPException(status_code=401, detail="Not authenticated") # If token was already validated by verify_token, use that info if auth.get("method") == "oauth" and auth.get("token_info"): token_info = auth["token_info"] return { "sub": token_info.get("sub", "unknown"), "name": token_info.get("username", "ChatGPT Enterprise"), "email": token_info.get("username", "chatgpt@openai.com"), "email_verified": True } # Fallback for API key auth return { "sub": "api-key-user", "name": "API Key User", "email": "api@example.com", "email_verified": False } # ========================================== # Root Endpoint # ========================================== @app.get("/", response_class=HTMLResponse) async def root(): """Root endpoint with API information.""" return """ <!DOCTYPE html> <html> <head> <title>LangGraph Agent API</title> <style> body { font-family: Arial, sans-serif; max-width: 800px; margin: 50px auto; padding: 20px; } h1 { color: #333; } .endpoint { background: #f5f5f5; padding: 10px; margin: 10px 0; border-radius: 5px; } a { color: #0066cc; } </style> </head> <body> <h1>🚀 LangGraph Agent API</h1> <p>OAuth-secured API for ChatGPT Enterprise integration</p> <h2>📚 Documentation</h2> <div class="endpoint"> <a href="/docs">Interactive API Docs (Swagger UI)</a> </div> <div class="endpoint"> <a href="/redoc">API Documentation (ReDoc)</a> </div> <div class="endpoint"> <a href="/openapi.json">OpenAPI Specification</a> </div> <h2>🔐 OAuth Endpoints</h2> <div class="endpoint"> <a href="/.well-known/openid-configuration"> OpenID Configuration </a> </div> <div class="endpoint"> <a href="/.well-known/oauth-authorization-server"> OAuth Authorization Server </a> </div> <h2>🏥 System</h2> <div class="endpoint"> <a href="/health">Health Check</a> </div> </body> </html> """ # ========================================== # API Endpoints # ========================================== @app.get( "/health", response_model=HealthResponse, summary="Health Check", tags=["System"] ) async def health_check(): """Check API health status.""" return { "status": "healthy", "service": "LangGraph Agent API", "version": "1.0.0", "auth_enabled": OAUTH_ENABLED } @app.post( "/invoke", response_model=InvokeResponse, summary="Invoke Agent", description="Invoke the LangGraph agent with a prompt", tags=["Agent"] ) async def invoke_agent( request: InvokeRequest, auth: dict = Depends(verify_token) ): """Invoke the LangGraph agent with a prompt.""" try: async with httpx.AsyncClient(timeout=120.0) as client: payload = { "assistant_id": request.assistant_id, "input": { "messages": [ { "type": "human", "content": request.prompt } ] } } if request.thread_id: payload["thread_id"] = request.thread_id # Create the run response = await client.post( f"{LANGGRAPH_BASE_URL}/runs", json=payload ) response.raise_for_status() run_data = response.json() run_id = run_data.get("run_id") # Wait for completion result_response = await client.get( f"{LANGGRAPH_BASE_URL}/runs/{run_id}/wait", timeout=120.0 ) result_response.raise_for_status() result = result_response.json() return { "run_id": run_id, "thread_id": run_data.get("thread_id"), "output": result, "status": "success" } except httpx.HTTPError as e: raise HTTPException( status_code=500, detail=f"HTTP error invoking agent: {str(e)}" ) except Exception as e: raise HTTPException( status_code=500, detail=f"Error invoking agent: {str(e)}" ) @app.post( "/stream", response_model=StreamResponse, summary="Stream Agent Response", description="Stream responses from the LangGraph agent", tags=["Agent"] ) async def stream_agent( request: StreamRequest, auth: dict = Depends(verify_token) ): """Stream responses from the LangGraph agent.""" try: async with httpx.AsyncClient(timeout=120.0) as client: payload = { "assistant_id": request.assistant_id, "input": { "messages": [ { "type": "human", "content": request.prompt } ] }, "stream_mode": ["messages"] } if request.thread_id: payload["thread_id"] = request.thread_id async with client.stream( "POST", f"{LANGGRAPH_BASE_URL}/runs/stream", json=payload ) as response: response.raise_for_status() chunks = [] async for chunk in response.aiter_text(): if chunk.strip(): chunks.append(chunk) return { "output": "".join(chunks), "chunks_received": len(chunks), "status": "success" } except httpx.HTTPError as e: raise HTTPException( status_code=500, detail=f"HTTP error streaming from agent: {str(e)}" ) except Exception as e: raise HTTPException( status_code=500, detail=f"Error streaming from agent: {str(e)}" ) @app.get( "/agents", summary="List Agents", description="Get available agents", tags=["System"] ) async def list_agents(auth: dict = Depends(verify_token)): """List available agents.""" return { "agents": [ { "id": "agent", "name": "General Agent", "description": "General purpose LangGraph agent" }, { "id": "health", "name": "Health Agent", "description": "System health monitoring agent" } ] } if __name__ == "__main__": import uvicorn print("=" * 70) print("OAuth-Compliant OpenAPI Server for ChatGPT Enterprise") print("=" * 70) print(f"Server URL: {SERVER_BASE_URL}") print(f"OpenAPI Spec: {SERVER_BASE_URL}/openapi.json") print(f"API Docs: {SERVER_BASE_URL}/docs") print(f"OAuth Enabled: {OAUTH_ENABLED}") print(f"OAuth Provider: {OAUTH_PROVIDER}") print() print("Well-Known Endpoints:") print(f" OpenID Config: {SERVER_BASE_URL}/.well-known/openid-configuration") # noqa: E501 print(f" OAuth Server: {SERVER_BASE_URL}/.well-known/oauth-authorization-server") # noqa: E501 print(f" JWKS: {SERVER_BASE_URL}/.well-known/jwks.json") print("=" * 70) uvicorn.run(app, host="0.0.0.0", port=8001)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bmaranan75/mcp-shopping-assistant-py'

If you have feedback or need assistance with the MCP directory API, please join our Discord server