Skip to main content
Glama
chatgpt_fastapi_server.py66 kB
""" ChatGPT Enterprise Server - FastAPI Stateless Wrapper This server provides a stateless HTTP API for ChatGPT Enterprise Apps & Connectors. Unlike the FastMCP streamable-http transport which requires session management, this implementation uses FastAPI to provide simple, stateless REST endpoints. Key features: - Stateless HTTP requests (no session management) - JSON-RPC 2.0 compatible /mcp endpoint - Direct REST API endpoints for each tool - Works seamlessly with ChatGPT Actions - Simple deployment and testing Usage: python -m src.agent_mcp.chatgpt_fastapi_server Or use the startup script: ./start_chatgpt_mcp.sh """ import httpx import json import os import secrets import time from typing import Any, Dict, Optional from fastapi import FastAPI, Request, HTTPException, Depends from fastapi.responses import JSONResponse, Response, HTMLResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel, Field from dotenv import load_dotenv import uvicorn # Load environment variables load_dotenv() # Import html_formatter try: from .html_formatter import format_response_as_html, format_json_as_html except ImportError: from html_formatter import format_response_as_html, format_json_as_html # Configuration LANGGRAPH_BASE_URL = os.getenv("LANGGRAPH_BASE_URL", "http://localhost:2024") CHATGPT_MCP_PORT = int(os.getenv("CHATGPT_MCP_PORT", "8001")) API_KEYS = os.getenv("API_KEYS", "").split(",") if os.getenv("API_KEYS") else [] # Server URL configuration SERVER_URL = os.getenv("SERVER_URL", f"http://localhost:{CHATGPT_MCP_PORT}") # OAuth 2.0 Configuration OAUTH_ENABLED = os.getenv("CHATGPT_OAUTH_ENABLED", "false").lower() == "true" OAUTH_CLIENT_ID = os.getenv("CHATGPT_OAUTH_CLIENT_ID", "") OAUTH_CLIENT_SECRET = os.getenv("CHATGPT_OAUTH_CLIENT_SECRET", "") OAUTH_TOKEN_EXPIRY = int(os.getenv("CHATGPT_OAUTH_TOKEN_EXPIRY", "3600")) OAUTH_ISSUER = os.getenv("OAUTH_ISSUER", SERVER_URL) OAUTH_TOKEN_ENDPOINT = os.getenv("OAUTH_TOKEN_ENDPOINT", f"{SERVER_URL}/oauth/token") OAUTH_AUTHORIZE_URL = os.getenv( "OAUTH_AUTHORIZE_URL", f"{SERVER_URL}/oauth/authorize" ) # Okta Configuration (for external token validation) OKTA_DOMAIN = os.getenv("OKTA_DOMAIN", "") OKTA_AUTHORIZE_URL = os.getenv( "OKTA_AUTHORIZE_URL", f"https://{OKTA_DOMAIN}/oauth2/default/v1/authorize" if OKTA_DOMAIN else "" ) OKTA_INTROSPECT_URL = os.getenv("OKTA_INTROSPECT_URL", "") # In-memory token storage (use Redis in production) active_tokens: Dict[str, Dict[str, Any]] = {} # In-memory response storage for iframe display response_storage: Dict[str, Dict[str, Any]] = {} # Initialize FastAPI app = FastAPI( title="LangGraph Agent MCP Server", description="""ChatGPT Enterprise compatible MCP server for LangGraph agents. **For ChatGPT Integration:** This API provides intelligent agent responses with HTML formatting support. When you call the `/mcp` endpoint with `invoke_agent`, the response will include: - Formatted HTML content for rich display - Plain text fallback - Resource URIs for iframe rendering **Response Handling Instructions:** 1. Extract the response from `result.content[0].text` 2. If HTML tags are present, render as formatted HTML 3. Preserve formatting, tables, and lists 4. For iframe resources, use the URI in `result.content[1].resource.uri` **Conversation Context:** - Include `thread_id` in follow-up requests to maintain context - The agent remembers previous queries within the same thread """, version="1.0.0", servers=[ { "url": SERVER_URL, "description": "ChatGPT MCP Server" } ] ) # OAuth 2.0 Models class TokenRequest(BaseModel): grant_type: str client_id: str client_secret: str scope: Optional[str] = "mcp:access" class TokenResponse(BaseModel): access_token: str token_type: str = "Bearer" expires_in: int scope: str # Security security = HTTPBearer(auto_error=False) async def validate_okta_token(token: str) -> Optional[Dict[str, Any]]: """Validate token with Okta introspection endpoint. Returns: Token info dict if valid, None if invalid """ if not OKTA_INTROSPECT_URL: print("⚠️ Okta introspection URL not configured") return None try: print(f"🔍 Validating token with Okta: {OKTA_INTROSPECT_URL}") async with httpx.AsyncClient(timeout=10.0) as client: response = await client.post( OKTA_INTROSPECT_URL, auth=(OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET), data={"token": token, "token_type_hint": "access_token"} ) print(f"Okta response status: {response.status_code}") if response.status_code == 200: data = response.json() print(f"Okta response: {json.dumps(data, indent=2)}") is_active = data.get("active", False) if is_active: print("✓ Okta token is active and valid") return data else: print("✗ Okta token is inactive or invalid") return None else: print(f"✗ Okta introspection failed: {response.status_code}") print(f"Response: {response.text}") return None except Exception as e: print(f"✗ Error validating token with Okta: {e}") return None async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): """Verify OAuth token or API key.""" print("\n" + "=" * 80) print("AUTHENTICATION CHECK") print("=" * 80) print(f"OAuth Enabled: {OAUTH_ENABLED}") print(f"Credentials Provided: {credentials is not None}") if not OAUTH_ENABLED: print("✓ OAuth disabled, allowing request") print("=" * 80 + "\n") return True if not credentials: print("✗ No credentials provided (missing Authorization header)") print(f"Active tokens in memory: {len(active_tokens)}") print(f"API keys configured: {len(API_KEYS)}") print("=" * 80 + "\n") # Build re-authentication URL reauth_url = ( f"{OKTA_AUTHORIZE_URL or OAUTH_AUTHORIZE_URL}?" f"client_id={OAUTH_CLIENT_ID}&" f"response_type=code&" f"scope=openid%20profile%20email&" f"redirect_uri={SERVER_URL}/oauth/callback" ) # Return OAuth-compliant error with re-auth link raise HTTPException( status_code=401, detail=( f"invalid_token: Authentication required. " f"Please authenticate here: {reauth_url}" ), headers={ "WWW-Authenticate": ( 'Bearer realm="ChatGPT", ' 'error="invalid_token", ' 'error_description="Authentication required"' ), "X-Reauth-URL": reauth_url } ) token = credentials.credentials print(f"Token received: {token[:20]}..." if len(token) > 20 else f"Token: {token}") print(f"Token length: {len(token)}") # Check if it's a valid OAuth token print(f"\nChecking against {len(active_tokens)} active tokens...") if token in active_tokens: token_data = active_tokens[token] expires_at = token_data["expires_at"] current_time = time.time() time_remaining = expires_at - current_time print(f"✓ Token found in active tokens") print(f" Expires at: {expires_at}") print(f" Current time: {current_time}") print(f" Time remaining: {time_remaining:.0f} seconds") if token_data["expires_at"] > time.time(): print("✓ Token is valid and not expired") print("=" * 80 + "\n") return token_data else: # Token expired - return OAuth-compliant error for re-auth del active_tokens[token] print("✗ Token expired, removing from active tokens") print("=" * 80 + "\n") # Build re-authentication URL reauth_url = ( f"{OKTA_AUTHORIZE_URL or OAUTH_AUTHORIZE_URL}?" f"client_id={OAUTH_CLIENT_ID}&" f"response_type=code&" f"scope=openid%20profile%20email&" f"redirect_uri={SERVER_URL}/oauth/callback" ) raise HTTPException( status_code=401, detail=( f"invalid_token: The access token expired. " f"Please re-authenticate: {reauth_url}" ), headers={ "WWW-Authenticate": ( 'Bearer realm="ChatGPT", ' 'error="invalid_token", ' 'error_description="The access token expired"' ), "X-Reauth-URL": reauth_url } ) # Check if it's a valid API key print(f"\nToken not in active tokens, checking API keys...") print(f"Configured API keys: {len(API_KEYS)}") if API_KEYS: for i, key in enumerate(API_KEYS): print(f" API Key {i+1}: {key[:10]}..." if len(key) > 10 else f" API Key {i+1}: {key}") if API_KEYS and token in API_KEYS: print("✓ Valid API key") print("=" * 80 + "\n") return {"type": "api_key", "valid": True} # Try validating with Okta (external OAuth provider) print(f"\nToken not in API keys, trying Okta validation...") print(f"Okta introspection URL configured: {bool(OKTA_INTROSPECT_URL)}") if OKTA_INTROSPECT_URL: token_info = await validate_okta_token(token) if token_info: print("✓ Token validated by Okta") print("=" * 80 + "\n") return { "type": "okta_token", "valid": True, "token": token, "token_info": token_info } print("✗ Invalid token - not found in active tokens, API keys, or Okta") print(f"Token to validate: {token[:30]}...") print(f"Active tokens: {list(active_tokens.keys())[:3]}..." if active_tokens else "Active tokens: []") print("=" * 80 + "\n") # Build re-authentication URL reauth_url = ( f"{OKTA_AUTHORIZE_URL or OAUTH_AUTHORIZE_URL}?" f"client_id={OAUTH_CLIENT_ID}&" f"response_type=code&" f"scope=openid%20profile%20email&" f"redirect_uri={SERVER_URL}/oauth/callback" ) # Return OAuth-compliant error with re-auth link raise HTTPException( status_code=401, detail=( f"invalid_token: The access token is invalid. " f"Please re-authenticate: {reauth_url}" ), headers={ "WWW-Authenticate": ( 'Bearer realm="ChatGPT", ' 'error="invalid_token", ' 'error_description="The access token is invalid"' ), "X-Reauth-URL": reauth_url } ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Customize OpenAPI schema to add ChatGPT-specific hints def custom_openapi(): """Enhanced OpenAPI schema with ChatGPT display instructions.""" if app.openapi_schema: return app.openapi_schema from fastapi.openapi.utils import get_openapi openapi_schema = get_openapi( title=app.title, version=app.version, description=app.description, routes=app.routes, servers=[{"url": SERVER_URL, "description": "ChatGPT MCP Server"}] ) # Add ChatGPT-specific plugin metadata openapi_schema["x-chatgpt-plugin"] = { "response_format": "html", "render_mode": "rich", "preserve_html_styling": True, "mcp_protocol_version": "2024-11-05", "capabilities": [ "html_formatted_responses", "iframe_resources", "conversation_threads", "multi_turn_context" ] } # Add display hints to /mcp endpoint if "/mcp" in openapi_schema.get("paths", {}): mcp_post = openapi_schema["paths"]["/mcp"].get("post", {}) mcp_post["x-chatgpt-display"] = { "format": "html", "content_extraction": "result.content[0].text", "resource_uri": "result.content[1].resource.uri", "render_html": True, "show_iframe": True, "preserve_styling": True } mcp_post["description"] = """JSON-RPC 2.0 endpoint for MCP protocol. **For ChatGPT - Response Format:** The `tools/call` method with `invoke_agent` returns: ```json { "jsonrpc": "2.0", "id": "...", "result": { "content": [ { "type": "text", "text": "HTML formatted response here" }, { "type": "resource", "resource": { "uri": "https://server/response/abc123", "mimeType": "text/html", "text": "same content" } } ] } } ``` **Display Instructions:** 1. Extract `result.content[0].text` 2. If it contains HTML tags (`<`, `>`), render as formatted HTML 3. Display tables, lists, headings with proper formatting 4. Optionally: Load `result.content[1].resource.uri` in iframe for enhanced display 5. Preserve thread_id for follow-up queries **Available Methods:** - `initialize`: Initialize MCP session - `tools/list`: List available tools - `tools/call`: Execute tools (use `invoke_agent` for queries) """ app.openapi_schema = openapi_schema return app.openapi_schema app.openapi = custom_openapi # ============================================================================ # Request Logging Middleware # ============================================================================ @app.middleware("http") async def log_requests(request: Request, call_next): """Log all incoming requests with full details.""" print("\n" + "🔵" * 40) print("INCOMING HTTP REQUEST") print("🔵" * 40) print(f"Method: {request.method}") print(f"URL: {request.url}") print(f"Path: {request.url.path}") print(f"Client: {request.client.host if request.client else 'Unknown'}:{request.client.port if request.client else 'Unknown'}") print("\nHeaders:") for header, value in request.headers.items(): print(f" {header}: {value}") # For POST/PUT requests, try to read body if request.method in ["POST", "PUT", "PATCH"]: try: body = await request.body() if body: print(f"\nBody ({len(body)} bytes):") try: body_str = body.decode('utf-8') print(body_str) # Re-create request with body for downstream processing async def receive(): return {"type": "http.request", "body": body} request._receive = receive except Exception as e: print(f"[Could not decode body: {e}]") else: print("\nBody: (empty)") except Exception as e: print(f"\n[Error reading body: {e}]") print("🔵" * 40 + "\n") # Process the request response = await call_next(request) # Log response status print("\n" + "🟢" * 40) print("HTTP RESPONSE") print("🟢" * 40) print(f"Status: {response.status_code}") print("🟢" * 40 + "\n") return response # ============================================================================ # Request/Response Models # ============================================================================ class JSONRPCRequest(BaseModel): jsonrpc: str = "2.0" id: int | str method: str params: Dict[str, Any] = Field(default_factory=dict) class JSONRPCResponse(BaseModel): jsonrpc: str = "2.0" id: int | str result: Optional[Dict[str, Any]] = None error: Optional[Dict[str, Any]] = None class ToolCallParams(BaseModel): name: str arguments: Dict[str, Any] = Field(default_factory=dict) # ============================================================================ # Helper Functions # ============================================================================ async def invoke_langgraph_agent( prompt: str, assistant_id: str = "supervisor", thread_id: Optional[str] = None, user_id: Optional[str] = None, conversation_id: Optional[str] = None ) -> dict: """ Invoke the LangGraph agent with a prompt. Args: prompt: The user prompt/query assistant_id: The assistant/agent ID (default: "supervisor") thread_id: Optional thread ID for conversation continuity user_id: Optional user ID from authentication conversation_id: Optional conversation ID from ChatGPT Returns: Agent response with output and metadata """ try: # Generate thread_id if not provided if not thread_id: thread_id = f"thread-{secrets.token_urlsafe(16)}" print(f"DEBUG: Generated thread_id: {thread_id}") # Generate conversation_id if not provided if not conversation_id: conversation_id = f"conv-{secrets.token_urlsafe(16)}" print(f"DEBUG: Generated conversation_id: {conversation_id}") async with httpx.AsyncClient(timeout=120.0) as client: # Build input with messages payload = { "assistant_id": assistant_id, "input": { "messages": [ { "role": "user", "content": prompt } ] } } # Add userId to input if available if user_id: payload["input"]["userId"] = user_id # Add conversationId to input payload["input"]["conversationId"] = conversation_id # Add thread_id to config for conversation persistence payload["config"] = { "configurable": { "thread_id": thread_id } } print( f"DEBUG: Payload to supervisor - " f"userId: {user_id}, " f"conversationId: {conversation_id}, " f"thread_id: {thread_id}" ) # Use /runs/stream endpoint which returns SSE format stream_url = f"{LANGGRAPH_BASE_URL}/runs/stream" print(f"DEBUG: Calling {stream_url}") async with client.stream( "POST", stream_url, json=payload ) as response: print(f"DEBUG: Response status: {response.status_code}") # Check for error before parsing if response.status_code >= 400: error_text = b"" async for chunk in response.aiter_bytes(): error_text += chunk print( f"ERROR: LangGraph returned " f"{response.status_code}" ) print(f"ERROR: Response: {error_text.decode()}") raise httpx.HTTPStatusError( f"LangGraph error: {response.status_code}", request=response.request, response=response ) response.raise_for_status() # Parse Server-Sent Events (SSE) format run_id = None final_messages = [] all_message_snapshots = [] # Track all snapshots current_event = None current_data = [] async for line in response.aiter_lines(): line = line.strip() if line.startswith("event:"): # Save previous event data if exists if current_event and current_data: data_str = "\n".join(current_data) try: data_obj = json.loads(data_str) if current_event == "metadata" and "run_id" in data_obj: run_id = data_obj["run_id"] elif current_event == "values" and "messages" in data_obj: # Keep ALL values events, last one wins final_messages = data_obj["messages"] all_message_snapshots.append( data_obj["messages"] ) print(f"DEBUG: Captured values event with " f"{len(final_messages)} messages") except json.JSONDecodeError: pass # Start new event current_event = line.split(":", 1)[1].strip() current_data = [] elif line.startswith("data:"): # Accumulate data lines data_content = line.split(":", 1)[1].strip() current_data.append(data_content) elif line == "": # Empty line marks end of event if current_event and current_data: data_str = "\n".join(current_data) try: data_obj = json.loads(data_str) if current_event == "metadata" and "run_id" in data_obj: run_id = data_obj["run_id"] elif current_event == "values" and "messages" in data_obj: # Keep ALL values events, last one wins final_messages = data_obj["messages"] all_message_snapshots.append( data_obj["messages"] ) print(f"DEBUG: Captured values event with " f"{len(final_messages)} messages") except json.JSONDecodeError: pass current_event = None current_data = [] print(f"DEBUG: Total value snapshots received: " f"{len(all_message_snapshots)}") # Debug: Print final messages structure print(f"DEBUG: Received {len(final_messages)} messages") if final_messages: messages_json = json.dumps( final_messages[:3], indent=2 ) # First 3 for brevity print(f"DEBUG: First messages: {messages_json}") # Extract the final assistant response # SKIP ephemeral progress messages, return substantial content only output_text = None for msg in reversed(final_messages): # Skip ephemeral progress messages entirely if msg.get("progress", {}).get("ephemeral"): print(f"DEBUG: Skipping ephemeral progress message") continue print(f"DEBUG: Checking message: {msg.get('type')} / " f"{msg.get('role')}") # Try different message structures is_assistant = ( msg.get("role") == "assistant" or msg.get("type") == "ai" or msg.get("type") == "AIMessage" ) if is_assistant: # Check for content in nested message object first content = None if "message" in msg and isinstance( msg["message"], dict ): content = msg["message"].get("content", "") elif "content" in msg: content = msg["content"] # Skip empty content or JSON objects if content and not str(content).startswith("{"): output_text = content print(f"DEBUG: Found substantial content from " f"{msg.get('agent', 'unknown')}: " f"{str(content)[:100]}") break # Final fallback only if no substantial content found if not output_text: output_text = "Agent is processing your request. Please try again in a moment." print("DEBUG: No substantial content found, using fallback message") # Store response as HTML resource resource_url = store_response_for_iframe( content=output_text, title="Agent Response", format_type="text" ) # Return both text and resource types for maximum compatibility # ChatGPT needs the text type to display, resource type provides enhanced formatting response = { "content": [ { "type": "text", "text": output_text }, { "type": "resource", "resource": { "uri": resource_url, "mimeType": "text/html", "text": output_text } } ], "isError": False } print(f"DEBUG: Returning response with {len(str(output_text))} chars") print(f"DEBUG: Resource URI: {resource_url}") print(f"DEBUG: Response preview: {str(output_text)[:200]}...") return response except httpx.HTTPError as e: import traceback error_detail = f"HTTP error invoking agent: {str(e)}" print(f"ERROR: {error_detail}") print(f"Traceback:\n{traceback.format_exc()}") return { "content": [ { "type": "text", "text": error_detail } ], "isError": True } except Exception as e: import traceback error_detail = f"Error invoking agent: {str(e)}" print(f"ERROR: {error_detail}") print(f"Traceback:\n{traceback.format_exc()}") return { "content": [ { "type": "text", "text": error_detail } ], "isError": True } async def stream_langgraph_agent( prompt: str, assistant_id: str = "supervisor", thread_id: Optional[str] = None, user_id: Optional[str] = None, conversation_id: Optional[str] = None ) -> dict: """Stream responses from the LangGraph agent.""" try: # Generate thread_id if not provided if not thread_id: thread_id = f"thread-{secrets.token_urlsafe(16)}" print(f"DEBUG: Generated thread_id: {thread_id}") # Generate conversation_id if not provided if not conversation_id: conversation_id = f"conv-{secrets.token_urlsafe(16)}" print(f"DEBUG: Generated conversation_id: {conversation_id}") async with httpx.AsyncClient(timeout=120.0) as client: # Build input with messages payload = { "assistant_id": assistant_id, "input": { "messages": [ { "type": "human", "content": prompt } ] }, "stream_mode": ["messages"] } # Add userId to input if available if user_id: payload["input"]["userId"] = user_id # Add conversationId to input payload["input"]["conversationId"] = conversation_id # Add thread_id to config for conversation persistence payload["config"] = { "configurable": { "thread_id": thread_id } } print( f"DEBUG: Stream payload - " f"userId: {user_id}, " f"conversationId: {conversation_id}, " f"thread_id: {thread_id}" ) async with client.stream( "POST", f"{LANGGRAPH_BASE_URL}/runs/stream", json=payload ) as response: response.raise_for_status() chunks = [] async for chunk in response.aiter_text(): if chunk.strip(): chunks.append(chunk) return { "output": "".join(chunks), "chunks_received": len(chunks), "status": "success" } except httpx.HTTPError as e: import traceback error_detail = f"HTTP error streaming from agent: {str(e)}" print(f"ERROR: {error_detail}") print(f"Traceback:\n{traceback.format_exc()}") return { "error": error_detail, "status": "failed" } except Exception as e: import traceback error_detail = f"Error streaming from agent: {str(e)}" print(f"ERROR: {error_detail}") print(f"Traceback:\n{traceback.format_exc()}") return { "error": error_detail, "status": "failed" } async def check_system_health_tool(assistant_id: str = "supervisor") -> dict: """Check comprehensive system health.""" try: async with httpx.AsyncClient(timeout=30.0) as client: payload = { "assistant_id": assistant_id, "input": { "messages": [ { "type": "human", "content": "Check system health" } ] }, "stream_mode": ["values"] } async with client.stream( "POST", f"{LANGGRAPH_BASE_URL}/runs/stream", json=payload ) as response: response.raise_for_status() chunks = [] run_id = None final_output = None async for chunk in response.aiter_text(): if chunk.strip(): chunks.append(chunk) try: data = json.loads(chunk) if isinstance(data, list) and len(data) > 0: if "run_id" in data[0]: run_id = data[0]["run_id"] # Store as string to avoid parsing issues final_output = chunk except json.JSONDecodeError: pass return { "health_check": final_output or "".join(chunks), "run_id": str(run_id or "unknown"), "status": "success" } except Exception as e: return { "error": str(e), "status": "failed" } async def check_agent_status_tool(agent_name: str, assistant_id: str = "supervisor") -> dict: """Check the status of a specific agent.""" try: async with httpx.AsyncClient(timeout=30.0) as client: payload = { "assistant_id": assistant_id, "input": { "messages": [ { "type": "human", "content": f"Check {agent_name} agent status" } ] }, "stream_mode": ["values"] } async with client.stream( "POST", f"{LANGGRAPH_BASE_URL}/runs/stream", json=payload ) as response: response.raise_for_status() chunks = [] run_id = None final_output = None async for chunk in response.aiter_text(): if chunk.strip(): chunks.append(chunk) try: data = json.loads(chunk) if isinstance(data, list) and len(data) > 0: if "run_id" in data[0]: run_id = data[0]["run_id"] # Store as string to avoid parsing issues final_output = chunk except json.JSONDecodeError: pass return { "agent": str(agent_name), "status_check": final_output or "".join(chunks), "run_id": str(run_id or "unknown"), "status": "success" } except Exception as e: return { "error": str(e), "status": "failed" } async def get_thread_state_tool(thread_id: str) -> dict: """Get the current state of a conversation thread.""" try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( f"{LANGGRAPH_BASE_URL}/threads/{thread_id}/state" ) response.raise_for_status() # Convert to string to ensure JSON serializability state_data = response.text return { "state": state_data, "thread_id": str(thread_id), "status": "success" } except Exception as e: return { "error": str(e), "status": "failed" } async def list_threads_tool(limit: int = 10) -> dict: """List available conversation threads.""" try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( f"{LANGGRAPH_BASE_URL}/threads", params={"limit": limit} ) response.raise_for_status() # Convert to string to ensure JSON serializability threads_data = response.text return { "threads": threads_data, "count": limit, "status": "success" } except Exception as e: return { "error": str(e), "status": "failed" } # ============================================================================ # Tool Router # ============================================================================ async def execute_tool( tool_name: str, arguments: dict, auth: dict = None ) -> dict: """ Execute a tool by name with given arguments. Args: tool_name: Name of the tool to execute arguments: Tool arguments auth: Authentication context (contains token_info for userId) Returns: Tool execution result """ # Extract userId from auth token if available user_id = None if auth and auth.get("token_info"): token_info = auth["token_info"] user_id = ( token_info.get("sub") or token_info.get("uid") or token_info.get("username") or token_info.get("email") or token_info.get("client_id") ) print(f"DEBUG: Extracted userId from token: {user_id}") elif auth and auth.get("method") == "api_key": print("DEBUG: API key auth - no userId available") # Get conversationId from arguments if passed by ChatGPT conversation_id = arguments.get("conversationId") # Echo tool if tool_name == "echo": text = arguments.get("text", "") return { "content": [ { "type": "text", "text": str(text) } ], "isError": False } # Get server info elif tool_name == "get_server_info": return { "name": "LangGraph Agent MCP Server", "version": "1.0.0", "transport": "fastapi-stateless", "endpoint": "/mcp", "langgraph_base_url": LANGGRAPH_BASE_URL, "port": CHATGPT_MCP_PORT, "chatgpt_compatible": True, "tools": [ "echo", "get_server_info", "invoke_agent", "stream_agent", "check_system_health", "check_agent_status", "get_thread_state", "list_threads" ] } # Invoke agent elif tool_name == "invoke_agent": prompt = arguments.get("prompt", "") # Always use supervisor assistant assistant_id = "supervisor" thread_id = arguments.get("thread_id") return await invoke_langgraph_agent( prompt, assistant_id, thread_id, user_id, conversation_id ) # Stream agent elif tool_name == "stream_agent": prompt = arguments.get("prompt", "") # Always use supervisor assistant assistant_id = "supervisor" thread_id = arguments.get("thread_id") return await stream_langgraph_agent( prompt, assistant_id, thread_id, user_id, conversation_id ) # Check system health elif tool_name == "check_system_health": # Always use supervisor assistant assistant_id = "supervisor" return await check_system_health_tool(assistant_id) # Check agent status elif tool_name == "check_agent_status": agent_name = arguments.get("agent_name", "") # Always use supervisor assistant assistant_id = "supervisor" if not agent_name: return {"error": "agent_name is required", "status": "failed"} return await check_agent_status_tool(agent_name, assistant_id) # Get thread state elif tool_name == "get_thread_state": thread_id = arguments.get("thread_id", "") if not thread_id: return {"error": "thread_id is required", "status": "failed"} return await get_thread_state_tool(thread_id) # List threads elif tool_name == "list_threads": limit = arguments.get("limit", 10) return await list_threads_tool(limit) # Unknown tool else: return { "error": f"Unknown tool: {tool_name}", "status": "failed" } # ============================================================================ # OAuth 2.0 Endpoints # ============================================================================ @app.post("/oauth/token") async def oauth_token(request: Request): """OAuth 2.0 token endpoint for Client Credentials flow.""" if not OAUTH_ENABLED: return JSONResponse( status_code=400, content={ "error": "unsupported_grant_type", "error_description": "OAuth is not enabled" } ) try: body = await request.json() except: return JSONResponse( status_code=400, content={ "error": "invalid_request", "error_description": "Invalid JSON body" } ) grant_type = body.get("grant_type") client_id = body.get("client_id") client_secret = body.get("client_secret") scope = body.get("scope", "mcp:access") # Validate grant type if grant_type != "client_credentials": return JSONResponse( status_code=400, content={ "error": "unsupported_grant_type", "error_description": "Only client_credentials supported" } ) # Validate client credentials if not client_id or not client_secret: return JSONResponse( status_code=400, content={ "error": "invalid_request", "error_description": "client_id and client_secret required" } ) if client_id != OAUTH_CLIENT_ID or client_secret != OAUTH_CLIENT_SECRET: return JSONResponse( status_code=401, content={ "error": "invalid_client", "error_description": "Invalid client credentials" } ) # Generate access token access_token = secrets.token_urlsafe(32) expires_at = time.time() + OAUTH_TOKEN_EXPIRY # Store token active_tokens[access_token] = { "client_id": client_id, "scope": scope, "expires_at": expires_at, "created_at": time.time() } print(f"[OAuth] Token issued for client: {client_id}, scope: {scope}") return JSONResponse({ "access_token": access_token, "token_type": "Bearer", "expires_in": OAUTH_TOKEN_EXPIRY, "scope": scope }) @app.get("/oauth/info") async def oauth_info(): """OAuth 2.0 configuration information.""" if not OAUTH_ENABLED: return JSONResponse({ "enabled": False, "message": "OAuth is not enabled" }) return JSONResponse({ "enabled": True, "issuer": OAUTH_ISSUER, "token_endpoint": OAUTH_TOKEN_ENDPOINT, "grant_types_supported": ["client_credentials"], "scopes_supported": ["mcp:access"], "token_expiry_seconds": OAUTH_TOKEN_EXPIRY }) # ============================================================================ # OAuth 2.0 Discovery Endpoints (RFC 8414) # ============================================================================ @app.get("/.well-known/oauth-authorization-server") async def oauth_authorization_server_metadata(): """OAuth 2.0 Authorization Server Metadata (RFC 8414).""" if not OAUTH_ENABLED: return JSONResponse( status_code=404, content={"error": "OAuth not enabled"} ) return JSONResponse({ "issuer": OAUTH_ISSUER, "token_endpoint": OAUTH_TOKEN_ENDPOINT, "token_endpoint_auth_methods_supported": [ "client_secret_post", "client_secret_basic" ], "grant_types_supported": ["client_credentials"], "response_types_supported": [], "scopes_supported": ["mcp:access"], "service_documentation": f"{SERVER_URL}/docs", "revocation_endpoint_auth_methods_supported": [ "client_secret_post", "client_secret_basic" ] }) @app.get("/.well-known/oauth-authorization-server/mcp") async def oauth_authorization_server_mcp(): """OAuth 2.0 Authorization Server Metadata for MCP resource.""" return await oauth_authorization_server_metadata() @app.get("/.well-known/oauth-protected-resource") async def oauth_protected_resource_metadata(): """OAuth 2.0 Protected Resource Metadata.""" if not OAUTH_ENABLED: return JSONResponse( status_code=404, content={"error": "OAuth not enabled"} ) return JSONResponse({ "resource": SERVER_URL, "authorization_servers": [OAUTH_ISSUER], "scopes_supported": ["mcp:access"], "bearer_methods_supported": ["header"], "resource_documentation": f"{SERVER_URL}/docs" }) @app.get("/.well-known/oauth-protected-resource/mcp") async def oauth_protected_resource_mcp(): """OAuth 2.0 Protected Resource Metadata for MCP.""" return await oauth_protected_resource_metadata() @app.get("/.well-known/openid-configuration") async def openid_configuration(): """OpenID Connect Discovery metadata.""" if not OAUTH_ENABLED: return JSONResponse( status_code=404, content={"error": "OAuth not enabled"} ) return JSONResponse({ "issuer": OAUTH_ISSUER, "token_endpoint": OAUTH_TOKEN_ENDPOINT, "token_endpoint_auth_methods_supported": [ "client_secret_post", "client_secret_basic" ], "grant_types_supported": ["client_credentials"], "response_types_supported": [], "scopes_supported": ["mcp:access"], "subject_types_supported": ["public"], "id_token_signing_alg_values_supported": [], "claims_supported": ["sub", "iat", "exp"] }) @app.get("/.well-known/openid-configuration/mcp") async def openid_configuration_mcp(): """OpenID Connect Discovery for MCP resource.""" return await openid_configuration() @app.get("/mcp/.well-known/openid-configuration") async def mcp_openid_configuration(): """OpenID Connect Discovery under /mcp path.""" return await openid_configuration() # ============================================================================ # HTML Response Endpoints (for iframe display) # ============================================================================ @app.get("/response/{response_id}", response_class=HTMLResponse) async def get_formatted_response(response_id: str): """ Get a formatted HTML response for iframe display. Args: response_id: The unique response ID Returns: Formatted HTML response """ if response_id not in response_storage: return HTMLResponse( content=format_response_as_html( "Response not found or expired", "Error" ), status_code=404 ) stored_data = response_storage[response_id] content = stored_data.get("content", "") title = stored_data.get("title", "Agent Response") format_type = stored_data.get("format", "text") if format_type == "json": html_content = format_json_as_html(content, title) else: html_content = format_response_as_html(str(content), title) return HTMLResponse(content=html_content) def store_response_for_iframe(content: Any, title: str = "Agent Response", format_type: str = "text") -> str: """ Store a response and return an iframe-compatible URL. Args: content: The content to store (string or dict) title: The title for the HTML page format_type: The format type ('text' or 'json') Returns: The URL to access the formatted response """ response_id = secrets.token_urlsafe(16) response_storage[response_id] = { "content": content, "title": title, "format": format_type, "timestamp": time.time() } # Clean up old responses (older than 1 hour) current_time = time.time() expired_ids = [ rid for rid, data in response_storage.items() if current_time - data.get("timestamp", 0) > 3600 ] for rid in expired_ids: del response_storage[rid] return f"{SERVER_URL}/response/{response_id}" # ============================================================================ # MCP Endpoints # ============================================================================ @app.get("/") async def root(): """Root endpoint with server information.""" return { "name": "LangGraph Agent MCP Server", "version": "1.0.0", "transport": "fastapi-stateless", "endpoints": { "mcp": "/mcp (JSON-RPC 2.0)", "health": "/health", "tools": "/tools (list available tools)" }, "chatgpt_compatible": True } @app.get("/health") async def health_check(): """Health check endpoint.""" try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{LANGGRAPH_BASE_URL}/ok") response.raise_for_status() langgraph_ok = response.json().get("ok", False) except: langgraph_ok = False return { "status": "ok", "server": "running", "langgraph_backend": "connected" if langgraph_ok else "disconnected", "langgraph_url": LANGGRAPH_BASE_URL } @app.get("/tools") async def list_tools(): """List all available tools in MCP-compliant format.""" return { "tools": [ { "name": "echo", "description": "Echo text back - test connectivity", "inputSchema": { "type": "object", "properties": { "text": { "type": "string", "description": "Text to echo back" } }, "required": ["text"] } }, { "name": "get_server_info", "description": "Get server information and capabilities", "inputSchema": { "type": "object", "properties": {} } }, { "name": "invoke_agent", "description": "Execute the LangGraph agent with a prompt", "inputSchema": { "type": "object", "properties": { "prompt": { "type": "string", "description": "The prompt to send to the agent" }, "assistant_id": { "type": "string", "description": "Assistant ID to use", "default": "agent" }, "thread_id": { "type": "string", "description": "Thread ID for conversation context" } }, "required": ["prompt"] } }, { "name": "stream_agent", "description": "Stream responses from the LangGraph agent", "inputSchema": { "type": "object", "properties": { "prompt": { "type": "string", "description": "The prompt to send to the agent" }, "assistant_id": { "type": "string", "description": "Assistant ID to use", "default": "supervisor" }, "thread_id": { "type": "string", "description": "Thread ID for conversation context" } }, "required": ["prompt"] } }, { "name": "check_system_health", "description": "Check comprehensive system health", "inputSchema": { "type": "object", "properties": { "assistant_id": { "type": "string", "description": "Assistant ID to check", "default": "supervisor" } } } }, { "name": "check_agent_status", "description": "Check status of a specific agent", "inputSchema": { "type": "object", "properties": { "agent_name": { "type": "string", "description": "Name of the agent to check" }, "assistant_id": { "type": "string", "description": "Assistant ID to use", "default": "supervisor" } }, "required": ["agent_name"] } }, { "name": "get_thread_state", "description": "Get current conversation thread state", "inputSchema": { "type": "object", "properties": { "thread_id": { "type": "string", "description": "Thread ID to retrieve state for" } }, "required": ["thread_id"] } }, { "name": "list_threads", "description": "List available conversation threads", "inputSchema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Maximum number of threads to return", "default": 10 } } } } ] } @app.get("/mcp") async def mcp_info(): """GET endpoint for /mcp - provides information about the MCP endpoint.""" return JSONResponse({ "name": "LangGraph Agent MCP Server", "version": "1.0.0", "protocol": "JSON-RPC 2.0", "transport": "HTTP POST", "authentication": "OAuth 2.0 Bearer Token" if OAUTH_ENABLED else "None", "endpoints": { "mcp": "POST /mcp (JSON-RPC 2.0 requests)", "tools": "GET /tools (list available tools)", "health": "GET /health (server health check)", "oauth_token": "POST /oauth/token (get access token)" if OAUTH_ENABLED else None, "oauth_info": "GET /oauth/info (OAuth configuration)" if OAUTH_ENABLED else None }, "documentation": f"{SERVER_URL}/docs", "message": "This endpoint accepts POST requests with JSON-RPC 2.0 format. Use POST /mcp with proper authentication." }) @app.post("/mcp") async def mcp_endpoint( request: Request, auth: dict = Depends(verify_token) ): """ Main MCP endpoint - JSON-RPC 2.0 compatible. This endpoint handles stateless JSON-RPC requests from ChatGPT Enterprise. Requires OAuth token or API key authentication if OAuth is enabled. """ # Log incoming request for debugging print("\n" + "=" * 80) print("INCOMING REQUEST TO /mcp") print("=" * 80) print(f"Method: {request.method}") print(f"URL: {request.url}") print(f"Client: {request.client.host if request.client else 'Unknown'}:{request.client.port if request.client else 'Unknown'}") print("\nHeaders:") for header, value in request.headers.items(): print(f" {header}: {value}") try: # Read and log raw body body_bytes = await request.body() print(f"\nRaw Body ({len(body_bytes)} bytes):") print(body_bytes.decode('utf-8')) # Parse JSON try: body = json.loads(body_bytes) print("\nParsed JSON:") print(json.dumps(body, indent=2)) except json.JSONDecodeError as e: print(f"\nJSON Parse Error: {e}") print("=" * 80 + "\n") return JSONResponse( status_code=400, content={ "jsonrpc": "2.0", "id": "error", "error": { "code": -32700, "message": f"Parse error: Invalid JSON - {str(e)}" } } ) # Validate JSON-RPC format if not isinstance(body, dict): error_response = { "jsonrpc": "2.0", "id": "error", "error": { "code": -32600, "message": "Invalid Request: body must be an object" } } print("\nResponse (400 Bad Request):") print(json.dumps(error_response, indent=2)) print("=" * 80 + "\n") return JSONResponse( status_code=400, content=error_response ) req_id = body.get("id", "unknown") method = body.get("method", "") params = body.get("params", {}) print(f"\nExtracted:") print(f" ID: {req_id}") print(f" Method: {method}") print(f" Params: {json.dumps(params, indent=4)}") # Handle MCP notifications (no response required) if method.startswith("notifications/"): print(f"\nHandling notification: {method}") print("Response (204 No Content - Notification acknowledged)") print("=" * 80 + "\n") return Response(status_code=204) # Handle initialize method (for MCP protocol compliance) if method == "initialize": response_data = { "jsonrpc": "2.0", "id": req_id, "result": { "protocolVersion": "2024-11-05", "serverInfo": { "name": "LangGraph Agent MCP Server", "version": "1.0.0" }, "capabilities": { "tools": {}, "resources": {} } } } print("\nResponse (200 OK):") print(json.dumps(response_data, indent=2)) print("=" * 80 + "\n") return JSONResponse(response_data) # Handle tools/call method elif method == "tools/call": tool_name = params.get("name", "") arguments = params.get("arguments", {}) if not tool_name: error_response = { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32602, "message": "Invalid params: 'name' is required" } } print("\nResponse (400 Bad Request):") print(json.dumps(error_response, indent=2)) print("=" * 80 + "\n") return JSONResponse( status_code=400, content=error_response ) # Execute the tool with auth context result = await execute_tool(tool_name, arguments, auth) # Check if tool execution failed if result.get("status") == "failed": error_msg = result.get('error', 'Unknown error') error_response = { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32603, "message": f"Tool execution failed: {error_msg}" } } print("\nResponse (500 Internal Server Error):") print(json.dumps(error_response, indent=2)) print("=" * 80 + "\n") return JSONResponse( status_code=500, content=error_response ) # Return successful result success_response = { "jsonrpc": "2.0", "id": req_id, "result": result } print("\nResponse (200 OK - Tool executed):") print(json.dumps(success_response, indent=2)) print("=" * 80 + "\n") return JSONResponse(success_response) # Handle tools/list method elif method == "tools/list": tools_data = await list_tools() list_response = { "jsonrpc": "2.0", "id": req_id, "result": tools_data } print("\nResponse (200 OK - Tools list):") print(json.dumps(list_response, indent=2)) print("=" * 80 + "\n") return JSONResponse(list_response) # Unknown method else: unknown_response = { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32601, "message": f"Method not found: {method}" } } print("\nResponse (400 Bad Request - Unknown method):") print(json.dumps(unknown_response, indent=2)) print("=" * 80 + "\n") return JSONResponse( status_code=400, content=unknown_response ) except json.JSONDecodeError: decode_error = { "jsonrpc": "2.0", "id": "error", "error": { "code": -32700, "message": "Parse error: Invalid JSON" } } print("\nResponse (400 Bad Request - JSON Parse Error):") print(json.dumps(decode_error, indent=2)) print("=" * 80 + "\n") return JSONResponse( status_code=400, content=decode_error ) except Exception as e: req_id = "error" if isinstance(body, dict): req_id = body.get("id", "error") server_error = { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } print("\nResponse (500 Internal Server Error):") print(json.dumps(server_error, indent=2)) print("=" * 80 + "\n") return JSONResponse( status_code=500, content=server_error ) # ============================================================================ # Main Entry Point # ============================================================================ if __name__ == "__main__": print("=" * 80) print("ChatGPT Enterprise MCP Server (FastAPI Stateless)") print("=" * 80) print(f"Server Name: LangGraph Agent MCP Server") print(f"Transport: FastAPI (stateless HTTP)") print(f"Port: {CHATGPT_MCP_PORT}") print(f"MCP Endpoint: http://0.0.0.0:{CHATGPT_MCP_PORT}/mcp") print(f"Health: http://0.0.0.0:{CHATGPT_MCP_PORT}/health") print(f"Tools: http://0.0.0.0:{CHATGPT_MCP_PORT}/tools") print(f"LangGraph Backend: {LANGGRAPH_BASE_URL}") print() print("ChatGPT Enterprise Setup:") print(f" 1. Use URL: http://your-server:{CHATGPT_MCP_PORT}/mcp") print(f" 2. Add to ChatGPT Apps & Connectors") print(f" 3. Test with: python test_chatgpt_mcp.py") print() print("Available Tools:") print(" - echo: Test connectivity") print(" - get_server_info: Server information") print(" - invoke_agent: Execute LangGraph agent") print(" - stream_agent: Stream agent responses") print(" - check_system_health: System diagnostics") print(" - check_agent_status: Check specific agent") print(" - get_thread_state: Get conversation state") print(" - list_threads: List conversations") print("=" * 80) print() uvicorn.run( app, host="0.0.0.0", port=CHATGPT_MCP_PORT, log_level="info" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bmaranan75/mcp-shopping-assistant-py'

If you have feedback or need assistance with the MCP directory API, please join our Discord server