Skip to main content
Glama
test_full_flow.py28.7 kB
# tests/test_full_flow.py import httpx import asyncio import json import logging import os from dotenv import load_dotenv from urllib.parse import urlparse, parse_qs from uuid import uuid4 from typing import Any, Dict, Optional from pathlib import Path import secrets # Add project root to sys.path for local imports import sys project_root_path = Path(__file__).parent.parent.resolve() if str(project_root_path) not in sys.path: sys.path.insert(0, str(project_root_path)) from mcp_plexus.oauth.pkce import generate_pkce_code_verifier, generate_pkce_code_challenge load_dotenv(dotenv_path=project_root_path / '.env', override=True) # Configuration constants BASE_URL = os.getenv("PLEXUS_BASE_URL", "http://127.0.0.1:8080") E2E_ENTITY_ID = os.getenv("E2E_FULL_FLOW_TEST_ENTITY_ID", "test_tenant_001") # OAuth client configuration for internal authentication flow INTERNAL_OAUTH_CLIENT_ID = "plexus-test-client" INTERNAL_OAUTH_REDIRECT_URI = "http://localhost:8080/callback" INTERNAL_OAUTH_SCOPES = "openid profile mcp_tool:get_entity_info" GITHUB_PROVIDER_NAME = "github" MCP_PROTOCOL_VERSION = "2025-03-26" HOST_APP_SECRET = os.getenv("HOST_APP_REGISTRATION_SECRET") # Setup logging configuration global_test_log_level = os.getenv("E2E_LOG_LEVEL", "DEBUG").upper() logging.basicConfig( level=global_test_log_level, format="%(asctime)s - %(name)s - [%(levelname)s] - %(message)s", handlers=[logging.StreamHandler()], force=True ) logger = logging.getLogger("FullE2ETest") logger.setLevel(global_test_log_level) class MCPTestHelperClient: """ Helper client for testing MCP (Model Context Protocol) interactions. Manages MCP sessions, authentication tokens, and provides methods for testing OAuth flows and tool calls within the MCP Plexus ecosystem. """ def __init__(self, base_url: str, entity_id: str): self.base_url = base_url self.entity_id = entity_id self.mcp_endpoint = f"{self.base_url}/{self.entity_id}/mcp/" self.plexus_auth_base = f"{self.base_url}/{self.entity_id}/plexus-auth" self.internal_oauth_base = f"{self.base_url}/{self.entity_id}/oauth" # Session and authentication state self.mcp_session_id: Optional[str] = None self.plexus_user_auth_token: Optional[str] = None self.persistent_user_id: Optional[str] = None self.internal_access_token: Optional[str] = None self.request_counter = 0 self.client_logger = logging.getLogger(f"MCPTestHelperClient.{self.entity_id}") self.client_logger.setLevel(logging.DEBUG if global_test_log_level == "DEBUG" else global_test_log_level) def _generate_mcp_request_id(self, prefix: str = "e2e-full") -> str: """Generate unique request IDs for MCP protocol messages.""" self.request_counter += 1 return f"{prefix}-{self.entity_id}-{self.request_counter}" async def _make_plexus_http_request( self, http_client: httpx.AsyncClient, method: str, url: str, json_payload: Optional[Dict[str, Any]] = None, data_payload: Optional[Dict[str, Any]] = None, expected_status: int = 200, extra_headers: Optional[Dict[str, str]] = None ) -> Dict[str, Any]: """ Make HTTP requests to Plexus endpoints with proper error handling. Handles both JSON and redirect responses, validates status codes, and provides detailed error information for debugging. """ self.client_logger.debug(f"HTTP {method} to {url}") final_headers = extra_headers or {} try: if method.upper() == "POST": response = await http_client.post(url, json=json_payload, data=data_payload, headers=final_headers) elif method.upper() == "GET": response = await http_client.get(url, params=data_payload, headers=final_headers) else: raise ValueError(f"Unsupported HTTP method: {method}") self.client_logger.debug(f"HTTP Response Status: {response.status_code}") raw_text_for_debug = response.text if response.content else "" assert response.status_code == expected_status, \ f"Expected status {expected_status}, got {response.status_code}. Response: {raw_text_for_debug}" # Handle JSON responses if "application/json" in response.headers.get("content-type", "").lower(): if response.content: return response.json() else: self.client_logger.warning(f"Empty JSON response for {url}") return {"_status_code": response.status_code, "_raw_text": "", "_headers": dict(response.headers)} # Handle redirect responses (e.g., OAuth flows) elif response.status_code == 302 and "location" in response.headers: return {"location": response.headers["location"], "_status_code": response.status_code, "_headers": dict(response.headers)} return {"_status_code": response.status_code, "_raw_text": raw_text_for_debug, "_headers": dict(response.headers)} except httpx.HTTPStatusError as e_http: self.client_logger.error(f"HTTP Error ({method} {url}): {e_http.response.status_code}") self.client_logger.error(f"Error Response Body: {e_http.response.text}") raise except Exception as e: self.client_logger.error(f"Unexpected error ({method} {url}): {e}", exc_info=True) raise async def _make_mcp_protocol_request( self, http_client: httpx.AsyncClient, mcp_method: str, mcp_params: Optional[Dict[str, Any]] = None, is_notification: bool = False, skip_default_bearer: bool = False ) -> Optional[Dict[str, Any]]: """ Send MCP protocol messages to the server. Handles both regular JSON-RPC requests and notifications, manages session IDs, and parses responses from both JSON and Server-Sent Events formats. """ request_id = self._generate_mcp_request_id() if not is_notification else None payload: Dict[str, Any] = {"jsonrpc": "2.0", "method": mcp_method} if mcp_params is not None: payload["params"] = mcp_params if request_id: payload["id"] = request_id # Prepare headers with session ID and authentication headers = {"Content-Type": "application/json", "Accept": "application/json, text/event-stream"} if self.mcp_session_id: headers["Mcp-Session-Id"] = self.mcp_session_id # Add Bearer token for authenticated requests if self.plexus_user_auth_token and not skip_default_bearer: headers["Authorization"] = f"Bearer {self.plexus_user_auth_token}" self.client_logger.debug(f"MCP Request: {mcp_method} (ID: {request_id})") try: response = await http_client.post(self.mcp_endpoint, json=payload, headers=headers) raw_response_text = response.text response_status_code = response.status_code response_headers = dict(response.headers) content_type = response_headers.get("content-type", "").lower() # Handle notification acceptance if response_status_code == 202 and is_notification: self.client_logger.info(f"Notification '{mcp_method}' accepted") return {"_mcp_status": "accepted_notification"} # Handle HTTP errors if response_status_code >= 400: self.client_logger.error(f"MCP request {mcp_method} failed with HTTP {response_status_code}") try: parsed_error = response.json() except json.JSONDecodeError: parsed_error = None return {"_http_error": True, "status_code": response_status_code, "detail": parsed_error or raw_response_text} # Handle empty responses if not response.content: self.client_logger.warning(f"Empty response for {mcp_method}") # Special case for initialize method with session ID in headers if mcp_method == "initialize" and response_status_code == 200 and "mcp-session-id" in response_headers: self.mcp_session_id = response_headers["mcp-session-id"] self.client_logger.info(f"Initialize completed with session ID: {self.mcp_session_id}") return {"_mcp_status": "initialize_empty_body_with_session_id", "id": request_id, "mcp_session_id": self.mcp_session_id} return None parsed_mcp_json_data = None # Parse JSON responses if "application/json" in content_type: try: parsed_mcp_json_data = response.json() self.client_logger.debug(f"Parsed JSON response for {mcp_method}") except json.JSONDecodeError as e: self.client_logger.error(f"JSON decode error for {mcp_method}: {e}") return {"_json_decode_error": True, "raw_text": raw_response_text, "error_str": str(e)} # Parse Server-Sent Events responses elif "text/event-stream" in content_type: self.client_logger.debug(f"Parsing SSE response for {mcp_method}") for line in response.text.splitlines(): if line.startswith("data:"): json_str = line[len("data:"):].strip() try: parsed_mcp_json_data = json.loads(json_str) break except json.JSONDecodeError as e_json_sse: self.client_logger.error(f"SSE JSON decode error: {e_json_sse}") return {"_sse_json_decode_error": True, "line": json_str, "raw_full_sse": raw_response_text, "error_str": str(e_json_sse)} if not parsed_mcp_json_data and not is_notification: self.client_logger.warning(f"No parsable data in SSE response for {mcp_method}") else: self.client_logger.warning(f"Unexpected content type '{content_type}' for {mcp_method}") # Validate response ID matches request ID if parsed_mcp_json_data and not is_notification: response_id = parsed_mcp_json_data.get("id") if response_id != request_id and not str(response_id).startswith("server-error"): self.client_logger.warning(f"Response ID mismatch: expected {request_id}, got {response_id}") # Update session ID if provided in headers if "mcp-session-id" in response_headers: new_sid = response_headers["mcp-session-id"] if self.mcp_session_id != new_sid: self.mcp_session_id = new_sid self.client_logger.info(f"Session ID updated to {new_sid}") return parsed_mcp_json_data except httpx.HTTPStatusError as e_http: self.client_logger.error(f"HTTP error for {mcp_method}: {e_http.response.status_code}") try: return e_http.response.json() if e_http.response.content else {"_http_error_status": True, "status_code": e_http.response.status_code} except: return {"_http_error_status": True, "status_code": e_http.response.status_code, "detail": e_http.response.text} except Exception as exc: self.client_logger.error(f"Unexpected error in MCP request {mcp_method}: {exc}", exc_info=True) return None def _extract_tool_call_payload(self, mcp_response_json: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """ Extract and parse the actual payload from MCP tool call responses. Handles both error responses and successful tool outputs, parsing JSON content from the tool's text response. """ if not mcp_response_json: return None # Check for JSON-RPC protocol errors json_rpc_error = mcp_response_json.get("error") if json_rpc_error: self.client_logger.error(f"MCP protocol error: {json_rpc_error}") return {"_mcp_protocol_error": True, "detail": json_rpc_error} result = mcp_response_json.get("result") if isinstance(result, dict): # Handle tool execution errors if result.get("isError"): self.client_logger.warning("MCP tool execution resulted in error") raw_error_content = result.get('content', []) parsed_error_detail = None if raw_error_content and isinstance(raw_error_content[0], dict) and isinstance(raw_error_content[0].get("text"), str): try: parsed_error_detail = json.loads(raw_error_content[0]["text"]) except: pass return {"_mcp_tool_error": True, "detail": parsed_error_detail or raw_error_content} # Extract successful tool response content content_list = result.get("content") if content_list and isinstance(content_list[0], dict): # Parse JSON from text content if isinstance(content_list[0].get("text"), str): try: return json.loads(content_list[0]["text"]) except json.JSONDecodeError as e: self.client_logger.error(f"Failed to parse tool response text: {e}") return {"_mcp_tool_payload_parse_error": True, "raw_text": content_list[0]['text']} # Handle structured content elif isinstance(content_list[0].get("structuredContent"), dict): return content_list[0].get("structuredContent") self.client_logger.warning(f"Could not extract payload from tool result: {content_list}") else: self.client_logger.warning(f"Unexpected result format: {result}") return None async def register_plexus_user(self, http_client: httpx.AsyncClient, host_app_user_id: str) -> bool: """ Register a new user with the Plexus authentication system. Creates a persistent user ID and auth token for the given host app user ID. """ self.client_logger.info(f"Registering Plexus user '{host_app_user_id}'") if not HOST_APP_SECRET: self.client_logger.error("HOST_APP_SECRET not configured") return False payload = {"user_id_from_host_app": host_app_user_id} url = f"{self.plexus_auth_base}/register-user" headers = {"X-Host-App-Secret": HOST_APP_SECRET} response_data = await self._make_plexus_http_request( http_client, "POST", url, json_payload=payload, extra_headers=headers ) if response_data and response_data.get("plexus_user_auth_token"): self.plexus_user_auth_token = response_data.get("plexus_user_auth_token") self.persistent_user_id = response_data.get("persistent_user_id") if self.plexus_user_auth_token and self.persistent_user_id: self.client_logger.info(f"User registered with ID: {self.persistent_user_id}") return True self.client_logger.error(f"Registration failed: {response_data}") return False async def initialize_mcp_session(self, http_client: httpx.AsyncClient) -> bool: """ Initialize an MCP session with the server. Sends the initialize message and notifications/initialized to establish a working MCP session with authentication context. """ self.client_logger.info("Initializing MCP session") client_info_dict = {"name": "FullE2ETestClient", "version": "1.0.0"} params = { "protocolVersion": MCP_PROTOCOL_VERSION, "capabilities": {}, "clientInfo": client_info_dict } # Initialize the session init_response = await self._make_mcp_protocol_request( http_client, "initialize", mcp_params=params, skip_default_bearer=False ) # Check if initialization was successful if (init_response and not init_response.get("error") and not init_response.get("_http_error") and not init_response.get("_json_decode_error") and not init_response.get("_sse_json_decode_error") and self.mcp_session_id): self.client_logger.info(f"MCP session initialized: {self.mcp_session_id}") # Send initialized notification notif_response = await self._make_mcp_protocol_request( http_client, "notifications/initialized", mcp_params={}, is_notification=True, skip_default_bearer=False ) return bool(notif_response and notif_response.get("_mcp_status") == "accepted_notification") self.client_logger.error(f"MCP initialization failed: {init_response}") return False async def call_tool(self, http_client: httpx.AsyncClient, tool_name: str, arguments: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Call an MCP tool with the given arguments. Returns the parsed tool response payload or None if the call failed. """ if not self.mcp_session_id: self.client_logger.error(f"Cannot call tool '{tool_name}': No MCP session") return None mcp_response = await self._make_mcp_protocol_request( http_client, "tools/call", mcp_params={"name": tool_name, "arguments": arguments} ) return self._extract_tool_call_payload(mcp_response) async def perform_internal_oauth_flow(self, http_client: httpx.AsyncClient) -> bool: """ Execute the internal OAuth authorization code flow with PKCE. Performs the complete OAuth flow including authorization request, code exchange, and token retrieval for internal OAuth clients. """ self.client_logger.info("Starting internal OAuth flow") if not self.mcp_session_id: self.client_logger.error("Cannot perform OAuth flow: No MCP session") return False # Generate PKCE parameters for security code_verifier = generate_pkce_code_verifier() code_challenge = generate_pkce_code_challenge(code_verifier) auth_state = secrets.token_urlsafe(16) # Prepare authorization request auth_params = { "response_type": "code", "client_id": INTERNAL_OAUTH_CLIENT_ID, "redirect_uri": INTERNAL_OAUTH_REDIRECT_URI, "scope": INTERNAL_OAUTH_SCOPES, "state": auth_state, "code_challenge": code_challenge, "code_challenge_method": "S256" } auth_url = f"{self.internal_oauth_base}/authorize" auth_headers = {"Mcp-Session-Id": self.mcp_session_id} # Get authorization code via redirect auth_response = await self._make_plexus_http_request( http_client, "GET", auth_url, data_payload=auth_params, expected_status=302, extra_headers=auth_headers ) redirect_loc = auth_response.get("location") if not redirect_loc: self.client_logger.error(f"Authorization did not redirect: {auth_response}") return False # Parse authorization code from redirect parsed_redirect = urlparse(redirect_loc) query_params = parse_qs(parsed_redirect.query) auth_code = query_params.get("code", [None])[0] received_state = query_params.get("state", [None])[0] if not auth_code: self.client_logger.error(f"No authorization code received: {query_params}") return False if received_state != auth_state: self.client_logger.error(f"State mismatch: expected {auth_state}, got {received_state}") return False # Exchange authorization code for access token token_url = f"{self.internal_oauth_base}/token" token_payload = { "grant_type": "authorization_code", "code": auth_code, "redirect_uri": INTERNAL_OAUTH_REDIRECT_URI, "client_id": INTERNAL_OAUTH_CLIENT_ID, "code_verifier": code_verifier } token_response = await self._make_plexus_http_request( http_client, "POST", token_url, data_payload=token_payload ) self.internal_access_token = token_response.get("access_token") if self.internal_access_token: self.client_logger.info("Internal OAuth flow completed successfully") return True self.client_logger.error(f"Token exchange failed: {token_response}") return False async def test_scenario_full_flow(): """ Execute the complete end-to-end test scenario. Tests user registration, MCP session initialization, tool calls, internal OAuth flow, external GitHub OAuth, and token persistence. """ test_host_app_user_id = f"fullflow_user_{secrets.token_hex(8)}" logger.info(f"Starting full flow test for user '{test_host_app_user_id}'") client_helper = MCPTestHelperClient(base_url=BASE_URL, entity_id=E2E_ENTITY_ID) async with httpx.AsyncClient(timeout=90.0) as http_client: # Step 1: Register Plexus user assert await client_helper.register_plexus_user(http_client, test_host_app_user_id), \ "Plexus user registration failed" # Store tokens for later session testing temp_plexus_token = client_helper.plexus_user_auth_token temp_persistent_user_id = client_helper.persistent_user_id # Step 2: Initialize MCP session assert await client_helper.initialize_mcp_session(http_client), \ "MCP session initialization failed" # Step 3: Verify user context in MCP session logger.info("Verifying user context in MCP session") entity_info = await client_helper.call_tool(http_client, "get_entity_info", {}) assert entity_info and not entity_info.get("_mcp_tool_error") and not entity_info.get("_mcp_protocol_error"), \ f"get_entity_info failed: {entity_info}" if entity_info.get("_mcp_tool_payload_parse_error"): logger.error(f"Tool response parsing error: {entity_info.get('raw_text')}") assert False, "Failed to parse get_entity_info tool response" # Verify persistent user ID is correctly associated with session session_info_str = entity_info.get("final_plexus_session_info", "") assert f"persistent_user_id={client_helper.persistent_user_id}" in session_info_str, \ f"User ID mismatch in session. Expected '{client_helper.persistent_user_id}', got: '{session_info_str}'" logger.info(f"User context verified: {client_helper.persistent_user_id}") # Step 4: Test internal OAuth flow logger.info("Testing internal OAuth flow") internal_oauth_ok = await client_helper.perform_internal_oauth_flow(http_client) assert internal_oauth_ok, "Internal OAuth flow failed" logger.info("Internal OAuth flow test PASSED") # Step 5: Test external GitHub OAuth flow logger.info("Testing external GitHub OAuth flow") github_tool_name = "fetch_secure_external_data" github_item_id = f"gh_item_{uuid4().hex[:6]}" # First call may trigger OAuth flow or use existing token gh_tool_resp1 = await client_helper.call_tool(http_client, github_tool_name, {"item_id": github_item_id}) assert gh_tool_resp1, "GitHub tool call received no response" if gh_tool_resp1.get("_mcp_tool_error"): # OAuth authorization required error_detail = gh_tool_resp1.get("detail", {}) logger.info("GitHub tool requires authorization") assert error_detail.get("error") == "external_auth_required", "Expected external_auth_required error" assert error_detail.get("provider_name") == GITHUB_PROVIDER_NAME, "Provider name mismatch" github_auth_url = error_detail.get("authorization_url") assert github_auth_url and "github.com/login/oauth/authorize" in github_auth_url, \ "Invalid GitHub auth URL" # Manual intervention required for GitHub OAuth logger.info("!!! MANUAL ACTION REQUIRED FOR GITHUB TEST !!!") logger.info("Please open the following URL, authenticate with GitHub, and grant consent:") logger.info(f" {github_auth_url}") logger.info("After successful authentication, press Enter to continue.") await asyncio.to_thread(input, "PRESS ENTER TO CONTINUE... ") # Retry tool call after OAuth completion gh_tool_resp2 = await client_helper.call_tool(http_client, github_tool_name, {"item_id": github_item_id}) assert gh_tool_resp2 and not gh_tool_resp2.get("_mcp_tool_error"), \ f"GitHub tool call failed after auth: {gh_tool_resp2}" assert gh_tool_resp2.get("status") == "success_placeholder", \ f"Unexpected GitHub tool response: {gh_tool_resp2}" logger.info("GitHub tool call post-auth SUCCESSFUL") else: # Token already available from previous runs logger.info("GitHub tool call succeeded with existing token") assert gh_tool_resp1.get("status") == "success_placeholder", \ f"Unexpected GitHub tool response: {gh_tool_resp1}" logger.info("External GitHub OAuth flow test PASSED") # Step 6: Test token persistence across MCP sessions logger.info("Testing GitHub token persistence with new MCP session") client_helper_new_session = MCPTestHelperClient(base_url=BASE_URL, entity_id=E2E_ENTITY_ID) client_helper_new_session.plexus_user_auth_token = temp_plexus_token client_helper_new_session.persistent_user_id = temp_persistent_user_id assert await client_helper_new_session.initialize_mcp_session(http_client), \ "New MCP session initialization failed" assert client_helper_new_session.mcp_session_id != client_helper.mcp_session_id, \ "New session should have different MCP session ID" # Test that GitHub token persists across sessions gh_tool_resp3 = await client_helper_new_session.call_tool( http_client, github_tool_name, {"item_id": github_item_id + "_newsess"} ) assert gh_tool_resp3 and not gh_tool_resp3.get("_mcp_tool_error"), \ f"GitHub tool call in new session failed: {gh_tool_resp3}" assert gh_tool_resp3.get("status") == "success_placeholder", \ f"Unexpected response in new session: {gh_tool_resp3}" assert gh_tool_resp3.get("item_id") == github_item_id + "_newsess", \ "Tool item_id mismatch in new session" logger.info("GitHub token persistence test PASSED") logger.info(f"Full E2E test completed successfully for user '{test_host_app_user_id}'") if __name__ == "__main__": logger.info("Starting MCP Plexus E2E test - ensure server is running and configured") logger.info(f"Using BASE_URL: {BASE_URL}, E2E_ENTITY_ID: {E2E_ENTITY_ID}") # Validate required configuration if not HOST_APP_SECRET or HOST_APP_SECRET == "default_secret_if_env_missing": logger.warning("HOST_APP_REGISTRATION_SECRET not properly configured - check .env file") if E2E_ENTITY_ID != "test_tenant_001": logger.warning(f"E2E_ENTITY_ID is '{E2E_ENTITY_ID}' - GitHub OAuth may fail if not configured for this entity") if not os.getenv("GITHUB_CLIENT_ID") or not os.getenv("GITHUB_CLIENT_SECRET"): logger.warning("GitHub OAuth credentials not configured - OAuth flow may fail") asyncio.run(test_scenario_full_flow())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Super-I-Tech/mcp_plexus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server