Skip to main content
Glama
openapi_oauth_server.py65.6 kB
""" OAuth-compliant OpenAPI server for ChatGPT Enterprise integration. Uses OAuth 2.0 Client Credentials flow for service-to-service authentication. Includes well-known discovery endpoints required by ChatGPT Enterprise. """ import json import os import httpx import secrets import copy from typing import Optional, Dict, Any from pydantic import BaseModel, Field from fastapi import FastAPI, HTTPException, Depends, Request, status, Form from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import RedirectResponse, HTMLResponse from fastapi.security import OAuth2 from starlette.middleware.sessions import SessionMiddleware from dotenv import load_dotenv # Load environment variables load_dotenv() # Configuration LANGGRAPH_BASE_URL = os.getenv("LANGGRAPH_BASE_URL", "http://localhost:2024").rstrip("/") OAUTH_ENABLED = os.getenv("OAUTH_ENABLED", "true").lower() == "true" OAUTH_PROVIDER = os.getenv("OAUTH_PROVIDER", "okta") SECRET_KEY = os.getenv("SECRET_KEY", secrets.token_urlsafe(32)) API_KEYS = os.getenv("API_KEYS", "").split(",") if os.getenv("API_KEYS") \ else [] SERVER_BASE_URL = os.getenv("SERVER_BASE_URL", "http://localhost:8001") # Plugin metadata CONTACT_EMAIL = os.getenv("CONTACT_EMAIL", "support@example.com") PLUGIN_NAME = os.getenv("PLUGIN_NAME", "LangGraph Agent") # Okta Configuration OKTA_DOMAIN = os.getenv("OKTA_DOMAIN", "") OKTA_AUTHORIZE_URL = os.getenv( "OKTA_AUTHORIZE_URL", f"https://{os.getenv('OKTA_DOMAIN', '')}/oauth2/default/v1/authorize" ) OKTA_INTROSPECT_URL = os.getenv( "OKTA_INTROSPECT_URL", f"https://{os.getenv('OKTA_DOMAIN', '')}/oauth2/default/v1/introspect" ) OKTA_CLIENT_ID = os.getenv("OKTA_CLIENT_ID", "") OKTA_CLIENT_SECRET = os.getenv("OKTA_CLIENT_SECRET", "") # Google Configuration GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID", "") GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "") # Public endpoints that don't require authentication (ChatGPT Enterprise compatibility) PUBLIC_ENDPOINTS = { "/health", "/.well-known/ai-plugin.json", "/.well-known/openid-configuration", "/.well-known/oauth-authorization-server", "/.well-known/jwks.json", "/.well-known/oauth-protected-resource", "/.well-known/oauth-authorization-server/openapi.json", "/.well-known/openid-configuration/openapi.json", "/.well-known/oauth-protected-resource/openapi.json", "/openapi.json/.well-known/openid-configuration", "/docs", "/redoc", "/openapi.json", "/openapi.actions.json", "/openapi.test.json", "/favicon.ico", "/logo.png", "/test", "/telephony", "/" } # Create FastAPI app with OpenAPI metadata app = FastAPI( title="LangGraph Agent API", description="OAuth-secured API for LangGraph agents", version="1.0.0", servers=[ { "url": SERVER_BASE_URL, "description": "API Server" } ], docs_url="/docs", redoc_url="/redoc", openapi_url="/openapi.json" ) # Customize OpenAPI schema to include OAuth2 Client Credentials def custom_openapi(): if app.openapi_schema: return app.openapi_schema from fastapi.openapi.utils import get_openapi openapi_schema = get_openapi( title=app.title, version=app.version, description=app.description, routes=app.routes, ) # Add servers configuration (CRITICAL for ChatGPT) openapi_schema["servers"] = [ { "url": SERVER_BASE_URL, "description": "API Server" } ] # Add API Key security scheme # ChatGPT GPTs only support ONE security scheme, so we use API Key # (OAuth is still supported in backend, just not advertised in OpenAPI) openapi_schema["components"]["securitySchemes"] = { "ApiKeyAuth": { "type": "apiKey", "in": "header", "name": "X-API-Key" } } # Apply API Key security globally to all endpoints openapi_schema["security"] = [ {"ApiKeyAuth": []} ] # IMPORTANT: ChatGPT recommends explicitly declaring security # on each endpoint in addition to the global security declaration if "paths" in openapi_schema: for path, path_item in openapi_schema["paths"].items(): for method, operation in path_item.items(): if method in ["get", "post", "put", "delete", "patch"]: # Add security to each operation unless public endpoint # Skip security for well-known endpoints and docs is_public = ( path.startswith("/.well-known") or path in ["/openapi.json", "/docs", "/redoc"] ) if not is_public and isinstance(operation, dict): operation["security"] = [{"ApiKeyAuth": []}] # Add custom ChatGPT hints for agent endpoints if isinstance(operation, dict): if path == "/invoke" and method == "post": operation["x-openai-isConsequential"] = False operation["x-chatgpt-display"] = { "format": "html", "content_field": "output.content", "render_html": True, "preserve_styling": True, "show_metadata": False } elif path == "/stream" and method == "post": operation["x-openai-isConsequential"] = False operation["x-chatgpt-display"] = { "format": "html", "content_field": "output", "render_html": True, "preserve_styling": True, "progressive_display": True } # Add global extensions for ChatGPT openapi_schema["x-chatgpt-plugin"] = { "response_format": "html", "render_mode": "rich", "preserve_html_styling": True, "capabilities": [ "product_recommendations", "shopping_assistance", "comparative_analysis", "html_formatted_responses" ] } app.openapi_schema = openapi_schema return app.openapi_schema app.openapi = custom_openapi def _get_actions_openapi_schema() -> Dict[str, Any]: """Return a cleaned OpenAPI schema containing only the action endpoints. This helps ChatGPT Enterprise import only the relevant actions and avoid picking up metadata/well-known endpoints which may confuse the Actions UI. """ # Use the full generated schema and deep-copy it before filtering schema = app.openapi() schema_copy = copy.deepcopy(schema) allowed_paths = {"/health", "/invoke", "/stream", "/agents"} paths = schema_copy.get("paths", {}) filtered_paths = {p: v for p, v in paths.items() if p in allowed_paths} schema_copy["paths"] = filtered_paths # Keep components (security schemes) and servers as-is so auth is preserved # If other components are present that's fine — ChatGPT only needs security return schema_copy @app.get("/openapi.actions.json") async def openapi_actions(): """Serve a trimmed OpenAPI schema with only the action endpoints. Use this URL when importing actions into ChatGPT Enterprise to avoid unrelated endpoints (like /.well-known) being included. """ return _get_actions_openapi_schema() @app.get("/openapi.test.json") async def openapi_test(): """Minimal OpenAPI schema for testing connectivity.""" return { "openapi": "3.1.0", "info": { "title": "LangGraph Agent API - Test", "description": "Minimal test schema for verifying " "ChatGPT Enterprise connectivity", "version": "1.0.0" }, "servers": [ { "url": SERVER_BASE_URL, "description": "API Server" } ], "paths": { "/health": { "get": { "operationId": "health_check", "summary": "Health Check", "description": "Check if the API server is healthy", "tags": ["System"], "responses": { "200": { "description": "Server is healthy", "content": { "application/json": { "schema": { "type": "object", "properties": { "status": { "type": "string", "example": "healthy" }, "service": { "type": "string" }, "version": { "type": "string" } } } } } } } } }, "/test": { "get": { "operationId": "test_connection", "summary": "Test Connection", "description": "Test API connectivity", "tags": ["Testing"], "responses": { "200": { "description": "Connection successful", "content": { "application/json": { "schema": { "type": "object", "properties": { "status": { "type": "string" }, "message": { "type": "string" } } } } } } } } } }, "components": { "securitySchemes": { "ApiKeyAuth": { "type": "apiKey", "in": "header", "name": "X-API-Key" } } } } # Add session middleware app.add_middleware( SessionMiddleware, secret_key=SECRET_KEY, max_age=3600 ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Request logging middleware for debugging Enterprise integration @app.middleware("http") async def log_requests(request: Request, call_next): """Log all requests for debugging Enterprise integration.""" import time from datetime import datetime start_time = time.time() # Log incoming request print(f"\\n{'='*70}") print(f"📥 REQUEST - {datetime.now().isoformat()}") print(f"{'='*70}") print(f"Method: {request.method}") print(f"Path: {request.url.path}") print(f"Client: {request.client.host if request.client else 'Unknown'}") # Check if from ChatGPT user_agent = request.headers.get("user-agent", "") if "ChatGPT" in user_agent or "OpenAI" in user_agent: print("🤖 REQUEST FROM CHATGPT/OPENAI DETECTED!") # Process request try: response = await call_next(request) duration = time.time() - start_time # Log response print(f"\\n📤 RESPONSE: {response.status_code} " f"({duration:.3f}s)") if response.status_code >= 400: print(f"⚠️ ERROR") else: print(f"✅ SUCCESS") print(f"{'='*70}\\n") return response except Exception as e: print(f"\\n❌ EXCEPTION: {str(e)}") print(f"{'='*70}\\n") raise # OAuth2 scheme for Client Credentials flow (ChatGPT Enterprise) class OAuth2ClientCredentials(OAuth2): """OAuth2 Client Credentials flow for ChatGPT Enterprise.""" def __init__(self, tokenUrl: str, auto_error: bool = True): flows = { "clientCredentials": { "tokenUrl": tokenUrl, "scopes": {} } } super().__init__(flows=flows, auto_error=auto_error) oauth2_scheme = OAuth2ClientCredentials( tokenUrl=f"{SERVER_BASE_URL}/oauth/token", auto_error=False ) # ========================================== # Pydantic Models # ========================================== class InvokeRequest(BaseModel): """Request model for invoking the agent. ChatGPT: Send user queries to the LangGraph AI agent for complex reasoning, product recommendations, shopping assistance, and detailed analysis. """ prompt: str = Field( ..., description="The user's natural language query or request. Be specific and include all relevant context.", examples=[ "What are the best winter hiking boots under $200?", "Compare iPhone 15 Pro and Samsung Galaxy S24 for photography", "Help me plan a shopping list for a weekend camping trip" ] ) assistant_id: str = Field( default="supervisor", description="The AI assistant to invoke. Use 'supervisor' for general queries and intelligent task routing.", examples=["supervisor", "shopping", "analysis"] ) thread_id: Optional[str] = Field( default=None, description="Conversation thread ID for multi-turn dialogs. IMPORTANT: Preserve this value across related follow-up questions to maintain context.", examples=["thread-abc-123", "thread-user-session-456"] ) conversationId: Optional[str] = Field( default=None, description="ChatGPT conversation identifier for tracking user sessions and privacy compliance.", examples=["conv-abc-123", "chatgpt-conv-xyz-789"] ) class InvokeResponse(BaseModel): """Response model for agent invocation. ChatGPT DISPLAY INSTRUCTIONS: 1. Extract the 'content' field from 'output' object 2. If content contains HTML tags, render it as formatted HTML 3. Present the response in a visually appealing format with proper styling 4. Preserve the thread_id for follow-up questions 5. Do NOT show run_id or internal metadata to users unless debugging """ run_id: str = Field( ..., description="Internal execution identifier - used for debugging and tracking. Do not display to users." ) thread_id: str = Field( ..., description="Conversation thread ID - CRITICAL: Save and reuse this for follow-up questions to maintain conversation context." ) output: Dict[str, Any] = Field( ..., description="""The agent's response object. CHATGPT INSTRUCTIONS FOR DISPLAYING THIS FIELD: - Extract output['content'] or output['all_messages'][-1]['content'] - If the content contains HTML markup (<div>, <table>, <ul>, etc.), render it as formatted HTML - Display product listings, comparisons, and structured data in rich HTML format - Use proper styling for tables, lists, and formatting elements - Present the information naturally and conversationally - For multi-part responses, organize sections with headers The response may include: - HTML formatted product recommendations - Comparison tables with styling - Structured lists and bullet points - Rich text with emphasis and links """ ) status: str = Field( ..., description="Execution status: 'success' means the response is ready to display, 'failed' means an error occurred.", examples=["success", "failed"] ) class StreamRequest(BaseModel): """Request model for streaming agent responses. ChatGPT: Use this for longer queries that benefit from incremental display, such as detailed explanations, step-by-step guides, or lengthy analyses. """ prompt: str = Field( ..., description="The user's detailed query or complex request requiring streaming response.", examples=[ "Explain the complete process of selecting winter camping gear step by step", "Provide a comprehensive comparison of top 5 smartphones in 2025" ] ) assistant_id: str = Field( default="supervisor", description="The AI assistant to invoke. Default 'supervisor' routes to appropriate specialized agents.", examples=["supervisor", "shopping", "analysis"] ) thread_id: Optional[str] = Field( default=None, description="Conversation thread ID for maintaining context across multiple exchanges. Preserve this for related follow-ups.", examples=["thread-xyz-789"] ) conversationId: Optional[str] = Field( default=None, description="ChatGPT conversation identifier for session tracking and compliance.", examples=["conv-xyz-789"] ) class StreamResponse(BaseModel): """Response model for streaming. ChatGPT DISPLAY INSTRUCTIONS: - Parse the 'output' field which may contain HTML - Render HTML elements properly (tables, lists, formatting) - Display incrementally if the content supports it - chunks_received is metadata - don't show to users """ output: str = Field( ..., description="""The complete streamed response content. CHATGPT: This field may contain HTML-formatted content. Render it as rich HTML with proper styling. Look for: - Product tables and comparison grids - Formatted lists and recommendations - Styled sections and headers Present the information in the most user-friendly format possible. """ ) chunks_received: int = Field( ..., description="Number of data chunks received during streaming - internal metric, do not display to users." ) status: str = Field( ..., description="Execution status indicator", examples=["success", "failed"] ) class HealthResponse(BaseModel): """Health check response.""" status: str = Field(..., example="healthy") service: str = Field(..., example="LangGraph Agent API") version: str = Field(..., example="1.0.0") auth_enabled: bool = Field(..., example=True) class OAuthConfigResponse(BaseModel): """OAuth configuration response.""" issuer: str authorization_endpoint: str token_endpoint: str userinfo_endpoint: Optional[str] = None jwks_uri: Optional[str] = None response_types_supported: list = ["code", "token"] grant_types_supported: list = ["authorization_code", "client_credentials"] # noqa: E501 subject_types_supported: list = ["public"] id_token_signing_alg_values_supported: list = ["RS256"] scopes_supported: list = ["openid", "profile", "email"] # ========================================== # Authentication # ========================================== async def verify_token(request: Request): """Verify OAuth token (from Okta) or API key.""" # Allow public endpoints without authentication (Enterprise compatibility) if request.url.path in PUBLIC_ENDPOINTS: print(f"DEBUG: ✅ Public endpoint accessed: {request.url.path}") return {"authenticated": True, "method": "public", "public": True} # DEBUG: Log all headers received print("=" * 70) print("DEBUG: Authentication Request") print("=" * 70) print(f"Path: {request.url.path}") print(f"Method: {request.method}") print("Headers:") for header_name, header_value in request.headers.items(): # Mask sensitive values for security if header_name.lower() in ["authorization", "x-api-key"]: if header_value: masked = header_value[:10] + "..." if len(header_value) > 10 else "***" print(f" {header_name}: {masked}") else: print(f" {header_name}: {header_value}") print() # Check for API key in header api_key = request.headers.get("X-API-Key") print(f"DEBUG: X-API-Key header present: {api_key is not None}") if api_key: print(f"DEBUG: API Key (first 10 chars): {api_key[:10]}...") print(f"DEBUG: Configured API_KEYS: {len(API_KEYS)} keys") if API_KEYS: print(f"DEBUG: First configured key (first 10 chars): {API_KEYS[0][:10]}...") if api_key and api_key in API_KEYS: print("DEBUG: ✅ API Key authentication SUCCESSFUL") print("=" * 70) return {"authenticated": True, "method": "api_key"} elif api_key: print("DEBUG: ❌ API Key NOT FOUND in configured keys") print("=" * 70) # Check for Bearer token auth_header = request.headers.get("Authorization") print(f"DEBUG: Authorization header present: {auth_header is not None}") if auth_header and auth_header.startswith("Bearer "): token = auth_header.split(" ")[1] print(f"DEBUG: Bearer token found (first 10 chars): {token[:10]}...") # Validate token with Okta if OAuth is enabled if OAUTH_ENABLED and OAUTH_PROVIDER == "okta": print(f"DEBUG: Validating token with Okta (OAUTH_ENABLED={OAUTH_ENABLED})") print(f"DEBUG: Using introspect URL: {OKTA_INTROSPECT_URL}") try: import httpx introspect_url = OKTA_INTROSPECT_URL async with httpx.AsyncClient() as client: response = await client.post( introspect_url, data={ "token": token, "token_type_hint": "access_token" }, auth=(OKTA_CLIENT_ID, OKTA_CLIENT_SECRET), timeout=10.0 ) if response.status_code == 200: token_info = response.json() if token_info.get("active"): print("DEBUG: ✅ Okta token validation SUCCESSFUL") claims = list(token_info.keys()) print(f"DEBUG: Token claims: {claims}") print("=" * 70) return { "authenticated": True, "method": "oauth", "token": token, "token_info": token_info } else: print("DEBUG: ❌ Token is not active") print("=" * 70) # Build re-authentication URL reauth_url = ( f"{OKTA_AUTHORIZE_URL}?" f"client_id={OKTA_CLIENT_ID}&" f"response_type=code&" f"scope=openid%20profile%20email&" f"redirect_uri={SERVER_BASE_URL}/oauth/callback" ) # Return OAuth-compliant error with re-auth link raise HTTPException( status_code=401, detail=( f"invalid_token: The access token expired. " f"Please re-authenticate: {reauth_url}" ), headers={ "WWW-Authenticate": ( 'Bearer realm="ChatGPT", ' 'error="invalid_token", ' 'error_description="The access token expired"' ), "X-Reauth-URL": reauth_url } ) else: print( f"DEBUG: ❌ Okta returned status " f"{response.status_code}" ) print("=" * 70) # Build re-authentication URL reauth_url = ( f"{OKTA_AUTHORIZE_URL}?" f"client_id={OKTA_CLIENT_ID}&" f"response_type=code&" f"scope=openid%20profile%20email&" f"redirect_uri={SERVER_BASE_URL}/oauth/callback" ) raise HTTPException( status_code=401, detail=( f"invalid_token: Token validation failed. " f"Please re-authenticate: {reauth_url}" ), headers={ "WWW-Authenticate": ( 'Bearer realm="ChatGPT", ' 'error="invalid_token", ' 'error_description="Token validation failed"' ), "X-Reauth-URL": reauth_url } ) except HTTPException: # Re-raise HTTP exceptions as-is raise except Exception as e: print(f"DEBUG: ❌ Token validation error: {e}") print("=" * 70) # Build re-authentication URL reauth_url = ( f"{OKTA_AUTHORIZE_URL}?" f"client_id={OKTA_CLIENT_ID}&" f"response_type=code&" f"scope=openid%20profile%20email&" f"redirect_uri={SERVER_BASE_URL}/oauth/callback" ) raise HTTPException( status_code=401, detail=( f"invalid_token: The access token is invalid. " f"Please re-authenticate: {reauth_url}" ), headers={ "WWW-Authenticate": ( 'Bearer realm="ChatGPT", ' 'error="invalid_token", ' 'error_description="The access token is invalid"' ), "X-Reauth-URL": reauth_url } ) # Check session if request.session.get("user"): print("DEBUG: ✅ Session authentication") print("=" * 70) return {"authenticated": True, "method": "session", "user": request.session.get("user")} # If OAuth not enabled, allow unauthenticated access if not OAUTH_ENABLED: print("DEBUG: ✅ Unauthenticated access (OAUTH_ENABLED=false)") print("=" * 70) return {"authenticated": True, "method": "none"} print("DEBUG: ❌ AUTHENTICATION FAILED - No valid credentials found") print("=" * 70) # Build re-authentication URL reauth_url = ( f"{OKTA_AUTHORIZE_URL}?" f"client_id={OKTA_CLIENT_ID}&" f"response_type=code&" f"scope=openid%20profile%20email&" f"redirect_uri={SERVER_BASE_URL}/oauth/callback" ) # Return OAuth-compliant error with re-auth link raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=( f"invalid_token: Authentication required. " f"Please authenticate here: {reauth_url}" ), headers={ "WWW-Authenticate": ( 'Bearer realm="ChatGPT", ' 'error="invalid_token", ' 'error_description="Authentication required"' ), "X-Reauth-URL": reauth_url }, ) # ========================================== # Well-Known Endpoints (Required by ChatGPT) # ========================================== @app.get("/.well-known/ai-plugin.json") async def ai_plugin_manifest(): """ AI Plugin manifest for ChatGPT Enterprise. This is REQUIRED for ChatGPT to discover your actions. Reference: https://platform.openai.com/docs/plugins """ return { "schema_version": "v1", "name_for_human": PLUGIN_NAME, "name_for_model": "langgraph_agent", "description_for_human": ( "Access to LangGraph AI agents for complex reasoning, " "analysis, and content generation with rich HTML formatting." ), "description_for_model": ( "You can invoke LangGraph agents to help with complex queries, " "detailed analysis, reasoning tasks, and content generation. " "Use the invoke endpoint for complete responses or stream " "endpoint for longer queries. " "\\n\\n**IMPORTANT DISPLAY INSTRUCTIONS:**\\n" "- Responses contain HTML-formatted content\\n" "- Render all HTML tags properly (tables, lists, divs, styling)\\n" "- Extract content from output.content field\\n" "- Display product recommendations, comparisons, and structured data in rich HTML format\\n" "- Preserve colors, emphasis, and formatting\\n" "- Make responses visually appealing and easy to scan\\n" "- Use thread_id for multi-turn conversations\\n" "- Do not show internal metadata like run_id to users" ), "auth": { "type": "service_http", "authorization_type": "bearer", "verification_tokens": { "openai": OKTA_CLIENT_ID or "your-verification-token-here" } }, "api": { "type": "openapi", "url": f"{SERVER_BASE_URL}/openapi.actions.json", "is_user_authenticated": False, "response_format": "html", "supports_html_rendering": True }, "capabilities": { "html_responses": True, "structured_data": True, "conversation_memory": True, "product_recommendations": True, "comparative_analysis": True }, "logo_url": f"{SERVER_BASE_URL}/logo.png", "contact_email": CONTACT_EMAIL, "legal_info_url": f"{SERVER_BASE_URL}/legal", "privacy_policy_url": f"{SERVER_BASE_URL}/privacy" } @app.get("/.well-known/openid-configuration") async def openid_configuration(): """ OpenID Connect discovery endpoint. Points to Okta as the OAuth provider. """ okta_issuer = f"https://{OKTA_DOMAIN}/oauth2/default" return { "issuer": okta_issuer, "authorization_endpoint": f"{okta_issuer}/v1/authorize", "token_endpoint": f"{okta_issuer}/v1/token", "userinfo_endpoint": f"{okta_issuer}/v1/userinfo", "jwks_uri": f"{okta_issuer}/v1/keys", "response_types_supported": ["code", "token", "id_token"], "grant_types_supported": ["client_credentials", "authorization_code"], "subject_types_supported": ["public"], "id_token_signing_alg_values_supported": ["RS256"], "scopes_supported": ["openid", "profile", "email"], "token_endpoint_auth_methods_supported": [ "client_secret_basic", "client_secret_post" ], "claims_supported": ["sub", "name", "email", "email_verified"] } @app.get("/.well-known/oauth-authorization-server") async def oauth_authorization_server(): """ OAuth 2.0 authorization server metadata. Points to Okta as the OAuth provider. """ okta_issuer = f"https://{OKTA_DOMAIN}/oauth2/default" return { "issuer": okta_issuer, "token_endpoint": f"{okta_issuer}/v1/token", "jwks_uri": f"{okta_issuer}/v1/keys", "grant_types_supported": ["client_credentials"], "token_endpoint_auth_methods_supported": [ "client_secret_basic", "client_secret_post" ], "scopes_supported": ["openid", "profile", "email"] } @app.get("/.well-known/jwks.json") async def jwks(): """JSON Web Key Set endpoint.""" # In production, return actual public keys return { "keys": [ { "kty": "RSA", "use": "sig", "kid": "1", "alg": "RS256", "n": "example_modulus", "e": "AQAB" } ] } @app.get("/.well-known/oauth-protected-resource") async def oauth_protected_resource(): """OAuth 2.0 protected resource metadata.""" return { "resource": SERVER_BASE_URL, "authorization_servers": [SERVER_BASE_URL], "scopes_supported": ["openid", "profile", "email"], "bearer_methods_supported": ["header", "query"], "resource_documentation": f"{SERVER_BASE_URL}/docs" } @app.get("/.well-known/oauth-authorization-server/openapi.json") async def oauth_server_openapi(): """OpenAPI spec for OAuth authorization server.""" # Return a reference to the main OpenAPI spec return RedirectResponse(url="/openapi.json", status_code=302) @app.get("/.well-known/openid-configuration/openapi.json") async def openid_config_openapi(): """OpenAPI spec for OpenID configuration.""" # Return a reference to the main OpenAPI spec return RedirectResponse(url="/openapi.json", status_code=302) @app.get("/.well-known/oauth-protected-resource/openapi.json") async def oauth_protected_resource_openapi(): """OpenAPI spec for OAuth protected resource.""" # Return a reference to the main OpenAPI spec return RedirectResponse(url="/openapi.json", status_code=302) @app.get("/openapi.json/.well-known/openid-configuration") async def openapi_openid_config(): """OpenID configuration at alternate path (for some OAuth clients).""" # Return the same OpenID configuration return await openid_configuration() # ========================================== # OAuth Token Endpoint (Client Credentials) # ========================================== @app.post("/oauth/token") async def oauth_token( grant_type: str = Form(...), client_id: str = Form(...), client_secret: str = Form(...), scope: Optional[str] = Form(None) ): """ OAuth 2.0 token endpoint for Client Credentials flow. Validates client credentials and issues access tokens. For ChatGPT Enterprise integration. """ # Validate grant type if grant_type != "client_credentials": raise HTTPException( status_code=400, detail="unsupported_grant_type" ) # Validate client credentials against Okta config if client_id != OKTA_CLIENT_ID or client_secret != OKTA_CLIENT_SECRET: raise HTTPException( status_code=401, detail="invalid_client" ) # Generate access token access_token = secrets.token_urlsafe(32) # Return token response return { "access_token": access_token, "token_type": "Bearer", "expires_in": 3600, "scope": scope or "" } # ========================================== # OAuth Userinfo Endpoint # ========================================== @app.get("/oauth/userinfo") async def oauth_userinfo(auth: dict = Depends(verify_token)): """OAuth userinfo endpoint - returns user info from validated token.""" if not auth.get("authenticated"): raise HTTPException(status_code=401, detail="Not authenticated") # If token was already validated by verify_token, use that info if auth.get("method") == "oauth" and auth.get("token_info"): token_info = auth["token_info"] return { "sub": token_info.get("sub", "unknown"), "name": token_info.get("username", "ChatGPT Enterprise"), "email": token_info.get("username", "chatgpt@openai.com"), "email_verified": True } # Fallback for API key auth return { "sub": "api-key-user", "name": "API Key User", "email": "api@example.com", "email_verified": False } # ========================================== # Telephony Endpoint (No Authentication Required) # ========================================== @app.post("/telephony") async def telephony_webhook(request: Request): """ Simple telephony webhook endpoint that logs headers and body. No authentication required. """ # Get all headers headers = dict(request.headers) # Get the raw body body = await request.body() # Try to parse as JSON, fallback to raw text try: body_json = await request.json() body_content = body_json except: body_content = body.decode('utf-8') if body else "" # Log everything print("=" * 70) print("TELEPHONY WEBHOOK RECEIVED") print("=" * 70) print("\n📞 HEADERS:") for header_name, header_value in headers.items(): print(f" {header_name}: {header_value}") print("\n📄 BODY:") if isinstance(body_content, dict): print(json.dumps(body_content, indent=2)) else: print(body_content) print("=" * 70) print() # Return a simple success response return { "status": "received", "message": "Telephony webhook processed successfully", "headers_received": len(headers), "body_size": len(body) } # ========================================== # Root Endpoint # ========================================== @app.get("/", response_class=HTMLResponse) async def root(): """Root endpoint with API information.""" return """ <!DOCTYPE html> <html> <head> <title>LangGraph Agent API</title> <style> body { font-family: Arial, sans-serif; max-width: 800px; margin: 50px auto; padding: 20px; } h1 { color: #333; } .endpoint { background: #f5f5f5; padding: 10px; margin: 10px 0; border-radius: 5px; } a { color: #0066cc; } </style> </head> <body> <h1>🚀 LangGraph Agent API</h1> <p>OAuth-secured API for ChatGPT Enterprise integration</p> <h2>📚 Documentation</h2> <div class="endpoint"> <a href="/docs">Interactive API Docs (Swagger UI)</a> </div> <div class="endpoint"> <a href="/redoc">API Documentation (ReDoc)</a> </div> <div class="endpoint"> <a href="/openapi.json">OpenAPI Specification</a> </div> <h2>🔐 OAuth Endpoints</h2> <div class="endpoint"> <a href="/.well-known/openid-configuration"> OpenID Configuration </a> </div> <div class="endpoint"> <a href="/.well-known/oauth-authorization-server"> OAuth Authorization Server </a> </div> <h2>🏥 System</h2> <div class="endpoint"> <a href="/health">Health Check</a> </div> </body> </html> """ @app.get("/logo.png") async def logo(): """Serve a placeholder logo for ChatGPT plugin manifest.""" # Return a redirect to a default image or serve a simple SVG # For now, return a simple text response return HTMLResponse( content='<svg xmlns="http://www.w3.org/2000/svg" width="100" height="100"><rect width="100" height="100" fill="#4A90E2"/><text x="50" y="55" font-size="40" text-anchor="middle" fill="white">LG</text></svg>', # noqa: E501 media_type="image/svg+xml" ) @app.get("/legal", response_class=HTMLResponse) async def legal_info(): """Legal information endpoint for ChatGPT plugin manifest.""" return f""" <!DOCTYPE html> <html> <head> <title>Legal Information - {PLUGIN_NAME} API</title> <style> body {{ font-family: Arial, sans-serif; max-width: 800px; margin: 50px auto; padding: 20px; }} </style> </head> <body> <h1>Legal Information</h1> <h2>Terms of Service</h2> <p>This API is provided as-is for authorized users only.</p> <h2>Privacy Policy</h2> <p>We do not store or share user data without consent.</p> <h2>Contact</h2> <p>Email: {CONTACT_EMAIL}</p> </body> </html> """ @app.get("/privacy", response_class=HTMLResponse) async def privacy_policy(): """ Privacy policy endpoint for ChatGPT plugin compliance. Required by ChatGPT to inform users about data handling. """ return f""" <!DOCTYPE html> <html> <head> <title>Privacy Policy - {PLUGIN_NAME} API</title> <style> body {{ font-family: Arial, sans-serif; max-width: 800px; margin: 50px auto; padding: 20px; line-height: 1.6; }} h1 {{ color: #333; }} h2 {{ color: #555; margin-top: 30px; }} .section {{ margin-bottom: 20px; }} .highlight {{ background-color: #f0f8ff; padding: 15px; border-left: 4px solid #0066cc; margin: 20px 0; }} </style> </head> <body> <h1>Privacy Policy</h1> <p><strong>Last Updated:</strong> November 15, 2025</p> <div class="highlight"> <strong>Summary:</strong> We prioritize your privacy. This API processes requests in real-time and does not store conversation data or personal information. </div> <div class="section"> <h2>1. Information We Collect</h2> <p>When you use this API through ChatGPT or directly:</p> <ul> <li><strong>Query Data:</strong> The prompts and requests you send to the LangGraph agent</li> <li><strong>Technical Data:</strong> API usage logs, timestamps, and error logs for service reliability</li> <li><strong>Authentication Data:</strong> API keys or OAuth tokens for access control</li> </ul> </div> <div class="section"> <h2>2. How We Use Your Information</h2> <p>Your information is used solely to:</p> <ul> <li>Process your requests and return agent responses</li> <li>Authenticate and authorize API access</li> <li>Monitor service health and performance</li> <li>Debug and improve the service</li> </ul> </div> <div class="section"> <h2>3. Data Storage and Retention</h2> <ul> <li><strong>Conversation Data:</strong> Not stored permanently. Processed in real-time only.</li> <li><strong>Log Data:</strong> Technical logs retained for 30 days for debugging purposes</li> <li><strong>Authentication Tokens:</strong> Session tokens expire after 1 hour</li> </ul> </div> <div class="section"> <h2>4. Data Sharing</h2> <p>We do not sell, trade, or share your data with third parties, except:</p> <ul> <li><strong>LangGraph Service:</strong> Requests are forwarded to the LangGraph agent backend for processing</li> <li><strong>OAuth Provider:</strong> Authentication credentials validated with Okta when using OAuth</li> <li><strong>Legal Requirements:</strong> If required by law or to protect our rights</li> </ul> </div> <div class="section"> <h2>5. Security</h2> <p>We implement security measures including:</p> <ul> <li>HTTPS encryption for all API communications</li> <li>OAuth 2.0 and API Key authentication</li> <li>Regular security updates and monitoring</li> <li>Access control and rate limiting</li> </ul> </div> <div class="section"> <h2>6. Your Rights</h2> <p>You have the right to:</p> <ul> <li>Request information about data we process</li> <li>Request deletion of your API keys or access tokens</li> <li>Opt-out of using the service at any time</li> </ul> </div> <div class="section"> <h2>7. ChatGPT Integration</h2> <p>When using this API through ChatGPT:</p> <ul> <li>ChatGPT's own privacy policy also applies</li> <li>OpenAI may process and store conversation data according to their policies</li> <li>We only receive the specific requests sent to our API</li> </ul> </div> <div class="section"> <h2>8. Children's Privacy</h2> <p>This service is not intended for users under 13 years of age. We do not knowingly collect information from children.</p> </div> <div class="section"> <h2>9. Changes to Privacy Policy</h2> <p>We may update this privacy policy from time to time. Changes will be posted on this page with an updated revision date.</p> </div> <div class="section"> <h2>10. Contact Us</h2> <p>If you have questions about this privacy policy or our data practices:</p> <p><strong>Email:</strong> {CONTACT_EMAIL}</p> <p><strong>API Documentation:</strong> <a href="{SERVER_BASE_URL}/docs">{SERVER_BASE_URL}/docs</a></p> </div> <hr style="margin: 40px 0;"> <p style="text-align: center; color: #666;"> <a href="/">Home</a> | <a href="/legal">Legal Information</a> | <a href="/docs">API Documentation</a> </p> </body> </html> """ # ========================================== # API Endpoints # ========================================== @app.get( "/health", response_model=HealthResponse, summary="Health Check", tags=["System"], description="Check API health status." ) async def health_check(): """Check API health status.""" return { "status": "healthy", "service": "LangGraph Agent API", "version": "1.0.0", "auth_enabled": OAUTH_ENABLED } @app.get( "/test", summary="Test Connection", description="Simple test endpoint that returns a success message. " "Use this to verify connectivity between ChatGPT " "Enterprise and your API.", tags=["Testing"], operation_id="test_connection" ) async def test_endpoint(): """Test endpoint for verifying connectivity.""" from datetime import datetime return { "status": "success", "message": "Connection to LangGraph Agent API is working!", "timestamp": datetime.now().isoformat(), "server": SERVER_BASE_URL, "version": "1.0.0" } @app.post( "/invoke", response_model=InvokeResponse, summary="Ask LangGraph AI Agent", description="""Invoke the LangGraph AI agent for intelligent responses to user queries. **ChatGPT Usage Guide:** 🎯 **When to use this endpoint:** - Product recommendations and shopping assistance - Detailed comparisons and analysis - Complex multi-step reasoning - Information synthesis and research 📋 **How to handle the response:** 1. Extract `output.content` from the response 2. **IMPORTANT**: If content contains HTML tags, render it as formatted HTML 3. Display tables, lists, and styled content with proper formatting 4. Present product recommendations in visually appealing format 5. Save `thread_id` for follow-up questions to maintain context 🎨 **HTML Rendering:** - The response may include rich HTML formatting - Render `<table>`, `<ul>`, `<div>`, and styling elements properly - Preserve colors, emphasis, and structured layouts - Make the output user-friendly and visually organized 💬 **Conversation Context:** - Use `thread_id` from response for related follow-ups - Include `conversationId` from ChatGPT for tracking - This enables multi-turn conversations with memory ⚡ **Example Query Flow:** User: "What are the best camping tents under $300?" → Send to /invoke with prompt → Receive HTML-formatted product comparison → Display as rich, formatted content → Save thread_id for "Tell me more about the first option" """, tags=["Agent"], operation_id="invoke_langgraph_agent" ) async def invoke_agent( request: InvokeRequest, auth: dict = Depends(verify_token) ): """Invoke the LangGraph agent with a prompt.""" try: # Extract userId from token if available user_id = None if auth.get("token_info"): # Try common claims for user identification token_info = auth["token_info"] user_id = ( token_info.get("sub") or token_info.get("uid") or token_info.get("username") or token_info.get("email") or token_info.get("client_id") ) print(f"DEBUG: Extracted userId from token: {user_id}") elif auth.get("method") == "api_key": print("DEBUG: API key auth - no userId available") else: print("DEBUG: No token_info available for userId extraction") async with httpx.AsyncClient(timeout=120.0) as client: # Build input with messages, userId, and conversationId payload = { "assistant_id": request.assistant_id, "input": { "messages": [ { "role": "user", "content": request.prompt } ] } } # Add userId to input if available if user_id: payload["input"]["userId"] = user_id # Add conversationId to input if available if request.conversationId: payload["input"]["conversationId"] = request.conversationId # Add thread_id to config for conversation persistence if request.thread_id: payload["config"] = { "configurable": { "thread_id": request.thread_id } } print( f"DEBUG: Payload to supervisor - " f"userId: {user_id}, " f"conversationId: {request.conversationId}, " f"thread_id: {request.thread_id}" ) # Use /runs/stream endpoint which returns SSE format stream_url = f"{LANGGRAPH_BASE_URL}/runs/stream" print(f"DEBUG: Calling {stream_url} with payload: {payload}") async with client.stream( "POST", stream_url, json=payload ) as response: print(f"DEBUG: Response status: {response.status_code}") # Check for error before parsing if response.status_code >= 400: error_text = b"" async for chunk in response.aiter_bytes(): error_text += chunk print( f"ERROR: LangGraph returned " f"{response.status_code}" ) print(f"ERROR: Response: {error_text.decode()}") raise httpx.HTTPStatusError( f"LangGraph error: {response.status_code}", request=response.request, response=response ) response.raise_for_status() # Parse Server-Sent Events (SSE) format run_id = None final_messages = [] current_event = None current_data = [] async for line in response.aiter_lines(): line = line.strip() if line.startswith("event:"): # Save previous event data if exists if current_event and current_data: data_str = "\n".join(current_data) try: data_obj = json.loads(data_str) if current_event == "metadata" and "run_id" in data_obj: run_id = data_obj["run_id"] elif current_event == "values" and "messages" in data_obj: final_messages = data_obj["messages"] except json.JSONDecodeError: pass # Start new event current_event = line.split(":", 1)[1].strip() current_data = [] elif line.startswith("data:"): # Accumulate data lines data_content = line.split(":", 1)[1].strip() current_data.append(data_content) elif line == "": # Empty line marks end of event if current_event and current_data: data_str = "\n".join(current_data) try: data_obj = json.loads(data_str) if current_event == "metadata" and "run_id" in data_obj: run_id = data_obj["run_id"] elif current_event == "values" and "messages" in data_obj: final_messages = data_obj["messages"] except json.JSONDecodeError: pass current_event = None current_data = [] # Debug: Print final messages structure messages_json = json.dumps(final_messages, indent=2) print(f"DEBUG: Final messages: {messages_json}") # Extract the final assistant response output_text = "Task completed" html_content = None for msg in reversed(final_messages): # Try different message structures is_assistant = ( msg.get("role") == "assistant" or msg.get("type") == "ai" ) if is_assistant: # Check for content directly or in message field content = None if "content" in msg: content = msg["content"] elif "message" in msg and isinstance( msg["message"], dict ): content = msg["message"].get("content", "") if content and not str(content).startswith("{"): output_text = content # Generate HTML-formatted version html_content = self._format_as_html(output_text) break return { "run_id": run_id or "unknown", "thread_id": request.thread_id or "auto-generated", "output": { "html_content": html_content or f"<div class='agent-response'><p>{output_text}</p></div>", "content": output_text, "all_messages": final_messages }, "status": "success" } except httpx.HTTPError as e: import traceback error_detail = f"HTTP error invoking agent: {str(e)}" print(f"ERROR: {error_detail}") print(f"Traceback:\n{traceback.format_exc()}") raise HTTPException( status_code=500, detail=error_detail ) except Exception as e: import traceback error_detail = f"Error invoking agent: {str(e)}" print(f"ERROR: {error_detail}") print(f"Traceback:\n{traceback.format_exc()}") raise HTTPException( status_code=500, detail=error_detail ) @app.post( "/stream", response_model=StreamResponse, summary="Stream AI Agent Response", description="""Stream responses from the LangGraph agent for longer, detailed outputs. **ChatGPT Usage Guide:** 🎯 **When to use streaming:** - Long-form content and detailed explanations - Step-by-step guides and tutorials - Comprehensive product analyses - Multi-section responses 📋 **Response Handling:** 1. Parse the complete `output` field 2. **CRITICAL**: Render HTML content with proper formatting 3. Display tables, styled lists, and rich formatting elements 4. Organize sections with clear visual hierarchy 🎨 **HTML Display Requirements:** - Render all HTML tags properly (tables, lists, divs, spans) - Preserve styling attributes and CSS classes - Format product grids and comparison tables attractively - Use proper spacing and visual organization - Make structured data easy to scan and understand 💡 **Best Practices:** - Display content progressively if possible - Maintain conversation context with thread_id - Present information in user-friendly, scannable format - Highlight key information and action items """, tags=["Agent"], operation_id="stream_langgraph_agent" ) async def stream_agent( request: StreamRequest, auth: dict = Depends(verify_token) ): """Stream responses from the LangGraph agent.""" try: # Extract userId from token if available user_id = None if auth.get("token_info"): token_info = auth["token_info"] user_id = ( token_info.get("sub") or token_info.get("uid") or token_info.get("username") or token_info.get("email") or token_info.get("client_id") ) print(f"DEBUG: Extracted userId from token: {user_id}") elif auth.get("method") == "api_key": print("DEBUG: API key auth - no userId available") else: print("DEBUG: No token_info available for userId extraction") async with httpx.AsyncClient(timeout=120.0) as client: # Build input with messages, userId, and conversationId payload = { "assistant_id": request.assistant_id, "input": { "messages": [ { "type": "human", "content": request.prompt } ] }, "stream_mode": ["messages"] } # Add userId to input if available if user_id: payload["input"]["userId"] = user_id # Add conversationId to input if available if request.conversationId: payload["input"]["conversationId"] = request.conversationId # Add thread_id to config for conversation persistence if request.thread_id: payload["config"] = { "configurable": { "thread_id": request.thread_id } } print( f"DEBUG: Payload to supervisor - " f"userId: {user_id}, " f"conversationId: {request.conversationId}, " f"thread_id: {request.thread_id}" ) async with client.stream( "POST", f"{LANGGRAPH_BASE_URL}/runs/stream", json=payload ) as response: response.raise_for_status() chunks = [] async for chunk in response.aiter_text(): if chunk.strip(): chunks.append(chunk) return { "output": "".join(chunks), "chunks_received": len(chunks), "status": "success" } except httpx.HTTPError as e: import traceback error_detail = f"HTTP error streaming from agent: {str(e)}" print(f"ERROR: {error_detail}") print(f"Traceback:\n{traceback.format_exc()}") raise HTTPException( status_code=500, detail=error_detail ) except Exception as e: import traceback error_detail = f"Error streaming from agent: {str(e)}" print(f"ERROR: {error_detail}") print(f"Traceback:\n{traceback.format_exc()}") raise HTTPException( status_code=500, detail=error_detail ) @app.get( "/agents", summary="List Agents", description="Get available agents", tags=["System"] ) async def list_agents(auth: dict = Depends(verify_token)): """List available agents.""" return { "agents": [ { "id": "agent", "name": "General Agent", "description": "General purpose LangGraph agent" }, { "id": "health", "name": "Health Agent", "description": "System health monitoring agent" } ] } if __name__ == "__main__": import uvicorn print("=" * 70) print("OAuth-Compliant OpenAPI Server for ChatGPT Enterprise") print("=" * 70) print(f"Server URL: {SERVER_BASE_URL}") print(f"OpenAPI Spec: {SERVER_BASE_URL}/openapi.json") print(f"API Docs: {SERVER_BASE_URL}/docs") print(f"OAuth Enabled: {OAUTH_ENABLED}") print(f"OAuth Provider: {OAUTH_PROVIDER}") print() print("ChatGPT Enterprise Endpoints:") print(f" AI Plugin Manifest: {SERVER_BASE_URL}/.well-known/ai-plugin.json") # noqa: E501 print(f" OpenID Config: {SERVER_BASE_URL}/.well-known/openid-configuration") # noqa: E501 print(f" OAuth Server: {SERVER_BASE_URL}/.well-known/oauth-authorization-server") # noqa: E501 print(f" JWKS: {SERVER_BASE_URL}/.well-known/jwks.json") print() print("⚠️ IMPORTANT: Use this URL in ChatGPT Enterprise:") print(f" {SERVER_BASE_URL}/.well-known/ai-plugin.json") print("=" * 70) uvicorn.run(app, host="0.0.0.0", port=8001)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bmaranan75/mcp-shopping-assistant-py'

If you have feedback or need assistance with the MCP directory API, please join our Discord server