Skip to main content
Glama
test_tool_requires_api_key.py25.1 kB
# tests/test_tool_requires_api_key.py import httpx import asyncio import json import logging import os from dotenv import load_dotenv from uuid import uuid4 from typing import Any, Dict, Optional from pathlib import Path # Load environment variables from .env file if available try: project_root = Path(__file__).resolve().parent.parent dotenv_path = project_root / '.env' if dotenv_path.exists(): load_dotenv(dotenv_path=dotenv_path, override=True) else: load_dotenv(override=True) except Exception: load_dotenv(override=True) # Configuration constants from environment variables BASE_URL = os.getenv("PLEXUS_BASE_URL", "http://127.0.0.1:8080") ENTITY_ID_FOR_TOOL_KEY_TEST = os.getenv("TEST_TOOL_API_KEY_ENTITY_ID", "test_tenant_tool_api_key") HOST_APP_SECRET = os.getenv("HOST_APP_REGISTRATION_SECRET") MCP_PROTOCOL_VERSION = "2025-03-26" # Test target configuration TARGET_TOOL_NAME = "use_alpha_service_key_tool" TARGET_PROVIDER_NAME = "my_test_service_alpha" TARGET_KEY_DISPLAY_NAME = "My Alpha Test Service Key" SERVER_ENCRYPTION_KEY_SET = bool(os.getenv("PLEXUS_ENCRYPTION_KEY")) # Configure logging logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO").upper(), format="%(asctime)s - %(name)s - [%(levelname)s] - %(message)s", handlers=[logging.StreamHandler()], ) logger = logging.getLogger("ToolRequiresApiKeyTest") class FullFlowTestClient: """ Test client that handles the complete flow of: 1. User registration with Plexus 2. MCP session initialization 3. API key submission 4. Tool execution that requires API keys """ def __init__(self, base_url: str, entity_id: str): self.base_url = base_url self.entity_id = entity_id self.mcp_endpoint = f"{self.base_url}/{self.entity_id}/mcp/" self.services_endpoint_base = f"{self.base_url}/{self.entity_id}/plexus-services" self.plexus_auth_base = f"{self.base_url}/{self.entity_id}/plexus-auth" # Session state self.mcp_session_id: Optional[str] = None self.plexus_user_auth_token: Optional[str] = None self.persistent_user_id: Optional[str] = None self.request_counter = 0 self.client_logger = logging.getLogger(f"ToolRequiresApiKeyTest.Client.{self.entity_id}") async def _make_generic_http_request( self, http_client: httpx.AsyncClient, method: str, url: str, json_payload: Optional[Dict[str, Any]] = None, data_payload: Optional[Dict[str, Any]] = None, expected_status: int = 200, extra_headers: Optional[Dict[str,str]] = None ) -> Optional[Dict[str, Any]]: """Generic HTTP request handler with error handling and response parsing""" final_headers = extra_headers or {} if 'Content-Type' not in final_headers and json_payload: final_headers.setdefault("Content-Type", "application/json") self.client_logger.debug(f"HTTP {method} to {url}. Headers: {final_headers}") try: response = await http_client.request( method, url, json=json_payload, data=data_payload, headers=final_headers ) self.client_logger.debug(f"HTTP Response Status: {response.status_code}") assert response.status_code == expected_status, \ f"Expected {expected_status}, got {response.status_code}. Response: {response.text}" if response.content: try: return response.json() except json.JSONDecodeError: self.client_logger.warning(f"Response for {url} not JSON, returning raw text container.") return { "_status_code": response.status_code, "_raw_text": response.text, "_headers": dict(response.headers) } return { "_status_code": response.status_code, "_raw_text": "", "_headers": dict(response.headers) } except httpx.HTTPStatusError as e_http: self.client_logger.error( f"HTTP Error ({method} {url}): {e_http.response.status_code} - {e_http.response.text}" ) try: return e_http.response.json() except: return { "error": "http_status_error_no_json_body", "status_code": e_http.response.status_code, "detail": e_http.response.text } except Exception as e: self.client_logger.error(f"Unexpected error ({method} {url}): {e}", exc_info=True) return {"error": "unexpected_exception", "detail": str(e)} async def register_plexus_user(self, http_client: httpx.AsyncClient, host_app_user_id: str) -> bool: """Register a new user with the Plexus authentication system""" self.client_logger.info(f"Registering Plexus user '{host_app_user_id}' for entity '{self.entity_id}'...") payload = {"user_id_from_host_app": host_app_user_id} url = f"{self.plexus_auth_base}/register-user" headers = {"Content-Type": "application/json"} if HOST_APP_SECRET: headers["X-Host-App-Secret"] = HOST_APP_SECRET else: self.client_logger.error("HOST_APP_SECRET not set for user registration.") return False response_data = await self._make_generic_http_request( http_client, "POST", url, json_payload=payload, extra_headers=headers ) if response_data and response_data.get("plexus_user_auth_token"): self.plexus_user_auth_token = response_data.get("plexus_user_auth_token") self.persistent_user_id = response_data.get("persistent_user_id") if self.plexus_user_auth_token and self.persistent_user_id: self.client_logger.info( f"Plexus user '{self.persistent_user_id}' registered. Token: {self.plexus_user_auth_token[:10]}..." ) return True self.client_logger.error(f"Plexus registration failed. Response: {response_data}") return False async def _make_mcp_protocol_request( self, http_client: httpx.AsyncClient, mcp_method: str, mcp_params: Optional[Dict[str, Any]] = None, is_notification: bool = False, skip_default_bearer: bool = False ) -> Optional[Dict[str, Any]]: """ Handle MCP protocol requests with support for both JSON and SSE responses. Manages session IDs and authentication tokens automatically. """ self.request_counter += 1 req_id = f"mcp-fullflow-{self.entity_id}-{self.request_counter}" if not is_notification else None # Build MCP request payload according to JSON-RPC 2.0 specification payload: Dict[str, Any] = {"jsonrpc": "2.0", "method": mcp_method} if mcp_params: payload["params"] = mcp_params if req_id: payload["id"] = req_id # Set up headers with session and auth info headers = {"Content-Type": "application/json", "Accept": "application/json, text/event-stream"} if self.mcp_session_id: headers["Mcp-Session-Id"] = self.mcp_session_id if self.plexus_user_auth_token and not skip_default_bearer: headers["Authorization"] = f"Bearer {self.plexus_user_auth_token}" self.client_logger.info(f"MCP Request: ID={req_id}, Method={mcp_method}") try: response = await http_client.post(self.mcp_endpoint, json=payload, headers=headers) raw_response_text = response.text response_status_code = response.status_code response_headers = dict(response.headers) self.client_logger.info( f"MCP Response for {mcp_method}(ID:{req_id}) - Status: {response_status_code}" ) content_type = response_headers.get("content-type", "").lower() # Handle notification acceptance (HTTP 202) if response_status_code == 202 and is_notification: self.client_logger.info(f"Received HTTP 202 for notification '{mcp_method}'.") return {"_mcp_status": "accepted_notification"} # Handle HTTP errors if response_status_code >= 400: self.client_logger.error( f"MCP request {mcp_method} (ID:{req_id}) failed with HTTP status: {response_status_code}" ) try: parsed_mcp_json_data = response.json() except json.JSONDecodeError: parsed_mcp_json_data = None return { "_http_error": True, "status_code": response_status_code, "detail": parsed_mcp_json_data or raw_response_text } # Handle empty responses (special case for initialize) if not response.content: if mcp_method == "initialize" and response_status_code == 200 and "mcp-session-id" in response_headers: self.mcp_session_id = response_headers["mcp-session-id"] self.client_logger.info( f"initialize call had empty body but Mcp-Session-Id '{self.mcp_session_id}' was received." ) return { "_mcp_status": "initialize_empty_body_with_session_id", "id": req_id, "mcp_session_id": self.mcp_session_id } return None # Parse response based on content type parsed_mcp_json_data = None if "application/json" in content_type: try: parsed_mcp_json_data = response.json() self.client_logger.debug(f"Parsed application/json for {mcp_method} (ID:{req_id})") except json.JSONDecodeError as e: self.client_logger.error( f"Failed to decode application/json response for {mcp_method} (ID:{req_id}). Err: {e}" ) return { "_json_decode_error": True, "raw_text": raw_response_text, "error_str": str(e) } elif "text/event-stream" in content_type: # Parse SSE format looking for 'data:' field self.client_logger.debug(f"Received text/event-stream for {mcp_method} (ID:{req_id})") for line in response.text.splitlines(): if line.startswith("data:"): json_str = line[len("data:"):].strip() try: parsed_mcp_json_data = json.loads(json_str) self.client_logger.debug( f"Successfully parsed JSON from SSE 'data:' for {mcp_method} (ID:{req_id})" ) break except json.JSONDecodeError as e_json_sse: self.client_logger.error( f"Could not decode JSON from SSE 'data:' line for {mcp_method} (ID:{req_id}). Error: {e_json_sse}" ) return { "_sse_json_decode_error": True, "line": json_str, "raw_full_sse": raw_response_text, "error_str": str(e_json_sse) } # Update session ID if provided in response headers if response_headers and "mcp-session-id" in response_headers: new_sid = response_headers["mcp-session-id"] if self.mcp_session_id != new_sid: self.mcp_session_id = new_sid self.client_logger.info(f"MCP Session ID ('{mcp_method}') updated to {new_sid}") return parsed_mcp_json_data except httpx.HTTPStatusError as e_http: self.client_logger.error( f"HTTPStatusError ({mcp_method} ID:{req_id}): {e_http.response.status_code}" ) try: return e_http.response.json() if e_http.response.content else { "_http_error_status": True, "status_code": e_http.response.status_code, "detail": e_http.response.text } except: return { "_http_error_status": True, "status_code": e_http.response.status_code, "detail": e_http.response.text } except Exception as exc: self.client_logger.error( f"Unexpected exception in _make_mcp_protocol_request for {mcp_method} (ID:{req_id}): {exc}", exc_info=True ) return None async def initialize_mcp_session(self, http_client: httpx.AsyncClient) -> bool: """Initialize MCP session with the server and send required notification""" if not self.plexus_user_auth_token: self.client_logger.error("Cannot init MCP: Plexus user auth token missing on client instance.") return False client_info: Dict[str, Any] = {"name": "FullFlowTestClient", "version": "1.0.1"} params = { "protocolVersion": MCP_PROTOCOL_VERSION, "capabilities": {}, "clientInfo": client_info } # Initialize session init_response_json = await self._make_mcp_protocol_request( http_client, "initialize", mcp_params=params, skip_default_bearer=False ) if (init_response_json and not init_response_json.get("error") and not init_response_json.get("_http_error") and not init_response_json.get("_json_decode_error") and not init_response_json.get("_sse_json_decode_error") and self.mcp_session_id): self.client_logger.info(f"MCP session initialized with server. Session ID: {self.mcp_session_id}") # Send required initialized notification per MCP protocol notif_response = await self._make_mcp_protocol_request( http_client, "notifications/initialized", mcp_params={}, is_notification=True, skip_default_bearer=False ) return bool(notif_response and notif_response.get("_mcp_status") == "accepted_notification") self.client_logger.error(f"Failed to initialize MCP session. Current Session ID: {self.mcp_session_id}") return False async def submit_api_key(self, http_client: httpx.AsyncClient, provider: str, key_value: str) -> bool: """Submit an API key for a specific provider to the Plexus services""" if not self.plexus_user_auth_token: self.client_logger.error("Cannot submit API key: Plexus user not authenticated on client instance.") return False url = f"{self.services_endpoint_base}/api-keys" payload = {"provider_name": provider, "api_key_value": key_value} headers = {"Authorization": f"Bearer {self.plexus_user_auth_token}"} response_data = await self._make_generic_http_request( http_client, "POST", url, json_payload=payload, extra_headers=headers ) return bool(response_data and "message" in response_data and provider in response_data["message"]) async def call_mcp_tool(self, http_client: httpx.AsyncClient, tool_name: str, arguments: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Call an MCP tool and extract the result payload""" if not self.mcp_session_id: self.client_logger.error(f"Tool '{tool_name}': No MCP session ID.") return None full_mcp_response_json = await self._make_mcp_protocol_request( http_client, "tools/call", mcp_params={"name": tool_name, "arguments": arguments}, skip_default_bearer=False ) return self._extract_tool_result_payload(full_mcp_response_json) def _extract_tool_result_payload(self, mcp_response_json: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """ Extract the actual tool result payload from MCP response format. Handles both successful results and error conditions. """ if not mcp_response_json: return None # Check for request/parsing errors if (mcp_response_json.get("_http_error") or mcp_response_json.get("_json_decode_error") or mcp_response_json.get("_sse_json_decode_error")): return mcp_response_json # Check for MCP protocol errors if mcp_response_json.get("error"): self.client_logger.error(f"MCP Protocol Error received: {mcp_response_json['error']}") return {"_mcp_protocol_error": True, "detail": mcp_response_json['error']} result = mcp_response_json.get("result") # Handle tool execution errors (isError=True) if isinstance(result, dict) and result.get("isError"): error_content_list = result.get('content', []) if (error_content_list and isinstance(error_content_list[0], dict) and isinstance(error_content_list[0].get("text"), str)): try: # Parse JSON from error text (expected for plexus_api_key_required) return json.loads(error_content_list[0]["text"]) except json.JSONDecodeError as e: self.client_logger.error( f"Could not parse JSON from 'isError' tool error text. Error: {e}" ) return { "_mcp_tool_error_unparsed_text": error_content_list[0]["text"], "detail": str(e) } return {"_mcp_tool_error_raw_content": error_content_list} # Handle successful tool results if (isinstance(result, dict) and result.get("content") and isinstance(result["content"], list) and result["content"]): first_content = result["content"][0] if (isinstance(first_content, dict) and "text" in first_content and isinstance(first_content["text"], str)): try: return json.loads(first_content["text"]) except json.JSONDecodeError as e: self.client_logger.error(f"Failed to parse successful tool result text. Error: {e}") return {"_payload_is_raw_text": True, "text": first_content['text']} elif isinstance(first_content, dict) and "structuredContent" in first_content: return first_content["structuredContent"] self.client_logger.warning(f"Could not extract expected tool payload from result: {result}") return None async def run_api_key_tool_test_flow(): """ Execute the complete test flow: 1. Register user and initialize MCP session 2. Call tool without API key (expect error) 3. Submit API key 4. Call tool again (expect success) """ logger.info(f"--- Test: Tool requiring API Key for Entity: {ENTITY_ID_FOR_TOOL_KEY_TEST} ---") test_user_id = f"fullflow_user_{uuid4().hex[:6]}" dummy_api_key_value = f"test_key_val_for_{test_user_id}_{uuid4().hex[:8]}" client = FullFlowTestClient(base_url=BASE_URL, entity_id=ENTITY_ID_FOR_TOOL_KEY_TEST) async with httpx.AsyncClient(timeout=30.0) as http_client: # Setup: Register user and initialize MCP session assert await client.register_plexus_user(http_client, test_user_id), "User registration failed" assert client.plexus_user_auth_token, "Plexus Auth Token not set after registration" assert await client.initialize_mcp_session(http_client), "MCP session initialization failed" assert client.mcp_session_id, "MCP Session ID not set after initialization" # Test 1: Call tool without API key (should fail with specific error) logger.info(f"Attempting to call '{TARGET_TOOL_NAME}' (expecting API key required error)...") tool_args = {"some_other_param": f"run_value_1_{uuid4().hex[:4]}"} extracted_payload_1 = await client.call_mcp_tool(http_client, TARGET_TOOL_NAME, tool_args) assert extracted_payload_1 is not None, "First tool call did not return an extractable payload" assert isinstance(extracted_payload_1, dict), \ f"extracted_payload_1 is not a dict. Got type: {type(extracted_payload_1)}" # Verify we get the expected API key required error assert extracted_payload_1.get("error") == "plexus_api_key_required", \ f"Expected 'plexus_api_key_required' error. Got: {json.dumps(extracted_payload_1, indent=2)}" assert extracted_payload_1.get("provider_name") == TARGET_PROVIDER_NAME assert extracted_payload_1.get("key_name_display") == TARGET_KEY_DISPLAY_NAME assert "instructions" in extracted_payload_1 logger.info(f"Correctly received PlexusApiKeyRequiredError for provider '{TARGET_PROVIDER_NAME}'.") # Submit the required API key logger.info(f"Submitting API key for '{TARGET_PROVIDER_NAME}'...") submit_ok = await client.submit_api_key(http_client, TARGET_PROVIDER_NAME, dummy_api_key_value) assert submit_ok, "API Key submission failed" logger.info(f"API Key for '{TARGET_PROVIDER_NAME}' submitted successfully.") # Brief pause to allow key processing await asyncio.sleep(0.3) # Test 2: Call tool with API key (should succeed) logger.info(f"Attempting to call '{TARGET_TOOL_NAME}' again (expecting success this time)...") tool_args_2 = {"some_other_param": f"run_value_2_{uuid4().hex[:4]}"} extracted_payload_2 = await client.call_mcp_tool(http_client, TARGET_TOOL_NAME, tool_args_2) assert extracted_payload_2 is not None, "Second tool call did not return an extractable payload" if isinstance(extracted_payload_2, dict): assert extracted_payload_2.get("error") != "plexus_api_key_required", \ f"Second tool call STILL resulted in 'plexus_api_key_required' error: {json.dumps(extracted_payload_2, indent=2)}" assert "_mcp_protocol_error" not in extracted_payload_2, \ f"Second tool call resulted in an MCP protocol error: {extracted_payload_2.get('detail')}" # Verify successful tool execution results assert isinstance(extracted_payload_2, dict), \ f"Expected dict payload from successful second call, got {type(extracted_payload_2)}" assert "retrieved_api_key_fragment" in extracted_payload_2, \ f"Tool response missing 'retrieved_api_key_fragment'" assert extracted_payload_2["retrieved_api_key_fragment"].startswith(dummy_api_key_value[:10]), \ f"Retrieved API key fragment does not match submitted key" assert extracted_payload_2.get("provider_name_used") == TARGET_PROVIDER_NAME, \ f"provider_name_used mismatch. Expected '{TARGET_PROVIDER_NAME}'" assert extracted_payload_2.get("received_other_param") == tool_args_2["some_other_param"], \ f"received_other_param mismatch" logger.info(f"Tool '{TARGET_TOOL_NAME}' successfully called with API key for '{TARGET_PROVIDER_NAME}'.") logger.info(f"--- Test: Tool requiring API Key for Entity '{ENTITY_ID_FOR_TOOL_KEY_TEST}' PASSED ---") async def main(): """Main test execution function""" logger.info(f"Starting Full API Key Test Flow for entity: {ENTITY_ID_FOR_TOOL_KEY_TEST}") if not HOST_APP_SECRET: logger.error("CRITICAL: HOST_APP_REGISTRATION_SECRET must be set in .env for registration.") return if not SERVER_ENCRYPTION_KEY_SET: logger.warning("PLEXUS_ENCRYPTION_KEY check: Not set in client's env. Ensure it's set on SERVER SIDE.") await run_api_key_tool_test_flow() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Super-I-Tech/mcp_plexus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server