Skip to main content
Glama

Perplexica MCP Server

test_transports.py19 kB
#!/usr/bin/env python3 """ Test script for verifying all transport modes of the Perplexica MCP server. """ import asyncio import json import os import subprocess import sys import time import httpx async def test_http_transport(host: str = "localhost", port: int = 3002) -> bool: """Test HTTP transport by checking MCP endpoint.""" print(f"Testing HTTP transport on {host}:{port}...") async with httpx.AsyncClient() as client: try: # Test MCP endpoint - make a proper MCP initialize request mcp_request = { "jsonrpc": "2.0", "id": 0, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {"roots": {"listChanged": True}, "sampling": {}}, "clientInfo": {"name": "test-client", "version": "1.0.0"}, }, } headers = { "Content-Type": "application/json", "Accept": "application/json, text/event-stream", "User-Agent": "test-client/1.0.0", } mcp_response = await client.post( f"http://{host}:{port}/mcp", json=mcp_request, headers=headers, timeout=5, ) if mcp_response.status_code == 200: try: # Check if it's a regular JSON response response_data = mcp_response.json() if response_data.get("result", {}).get("capabilities"): print("✓ HTTP: MCP initialize request successful (JSON)") return True else: print(f"✗ HTTP: Invalid initialize response: {response_data}") return False except ValueError: # Might be SSE-formatted response - check for event-stream data response_text = mcp_response.text if "data:" in response_text and "{" in response_text: try: # Extract JSON from SSE format - find the data line for line in response_text.split("\n"): if line.startswith("data: "): json_part = line[6:] # Remove 'data: ' prefix response_data = json.loads(json_part) if response_data.get("result", {}).get( "capabilities" ): print( "✓ HTTP: MCP initialize request successful (SSE format)" ) return True else: print( f"✗ HTTP: Invalid initialize response in SSE: {response_data}" ) return False print("✗ HTTP: No data line found in SSE response") return False except (IndexError, json.JSONDecodeError) as e: print(f"✗ HTTP: Invalid SSE JSON response: {e}") print(f"Raw response: {response_text[:200]}...") return False else: print( f"✗ HTTP: Invalid response format: {response_text[:200]}..." ) return False else: print(f"✗ HTTP: MCP endpoint failed: {mcp_response.status_code}") print(f"Response: {mcp_response.text}") return False except httpx.RequestError as e: print(f"✗ HTTP transport test failed: {e}") return False def _read_line_with_timeout(pipe, timeout=10): """Reads a line from a pipe with a timeout.""" import queue import threading result_queue = queue.Queue() def read_line(): try: line = pipe.readline() result_queue.put(("success", line)) except Exception as e: result_queue.put(("error", str(e))) thread = threading.Thread(target=read_line) thread.daemon = True thread.start() try: result_type, result = result_queue.get(timeout=timeout) if result_type == "success": return result else: raise RuntimeError(f"Error reading line: {result}") except queue.Empty: raise TimeoutError(f"Timed out after {timeout} seconds waiting for line.") def test_stdio_transport() -> bool: """Test stdio transport by running the server in stdio mode and performing a handshake.""" print("Testing stdio transport...") process = None try: # Use proper module execution instead of server.py directly cmd = [sys.executable, "-m", "perplexica_mcp", "stdio"] env = os.environ.copy() env["PYTHONUNBUFFERED"] = "1" # Ensure Python's output is unbuffered process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env, ) # 1. Send initialize request (FastMCP format) init_request = { "jsonrpc": "2.0", "id": 0, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {"roots": {"listChanged": True}, "sampling": {}}, "clientInfo": {"name": "test-client", "version": "1.0.0"}, "initializationOptions": {}, }, } process.stdin.write(json.dumps(init_request) + "\n") process.stdin.flush() # 2. Read initialize response init_response_line = _read_line_with_timeout( process.stdout, timeout=15 ) # Increased timeout for init try: init_response = json.loads(init_response_line) if init_response.get("result", {}).get("capabilities"): print("✓ Stdio: Server initialized successfully") else: print(f"✗ Stdio: Initialization failed. Response: {init_response_line}") return False except json.JSONDecodeError as e: print(f"✗ Stdio: Invalid JSON in initialization response: {e}") print(f"Raw response: {init_response_line}") return False # 3. Send initialized notification (required by MCP protocol) initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized", "params": {}, } process.stdin.write(json.dumps(initialized_notification) + "\n") process.stdin.flush() # Small delay to ensure initialization is complete time.sleep(0.1) # 4. Send tools/list request tools_list_request = { "jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}, } process.stdin.write(json.dumps(tools_list_request) + "\n") process.stdin.flush() # 5. Read tools/list response tools_list_response_line = _read_line_with_timeout(process.stdout, timeout=10) try: tools_list_response = json.loads(tools_list_response_line) # FastMCP returns tools directly in result, not nested under "tools" if "result" in tools_list_response: tools = tools_list_response["result"] # Handle both FastMCP format (direct list) and standard MCP format (nested under "tools") if isinstance(tools, dict) and "tools" in tools: tools = tools["tools"] if isinstance(tools, list) and any( "search" in tool.get("name", "") for tool in tools ): print("✓ Stdio: Tools listed successfully (found 'search' tool)") return True else: print(f"✗ Stdio: No 'search' tool found in tools list: {tools}") return False else: print( f"✗ Stdio: Tools list failed. Response: {tools_list_response_line}" ) return False except json.JSONDecodeError as e: print(f"✗ Stdio: Invalid JSON in tools list response: {e}") print(f"Raw response: {tools_list_response_line}") return False except ( subprocess.TimeoutExpired, TimeoutError, json.JSONDecodeError, ValueError, ) as e: print(f"✗ Stdio transport test failed: {e}") if process and process.poll() is None: # Check if process is still running process.kill() # Print stderr if available for debugging if process and process.stderr: try: # Use communicate with timeout to avoid blocking _, stderr_output = process.communicate(timeout=2) if stderr_output: print(f"STDERR: {stderr_output}") except subprocess.TimeoutExpired: print("STDERR: Could not read stderr (timeout)") except Exception as stderr_e: print(f"STDERR: Error reading stderr: {stderr_e}") return False finally: if process: try: if process.poll() is None: # Process is still running process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: print("Process didn't terminate gracefully, force killing...") process.kill() process.wait(timeout=2) # Give it a moment to die except Exception as cleanup_e: print(f"Error during process cleanup: {cleanup_e}") try: process.kill() except (OSError, ProcessLookupError): pass # Process might already be dead async def test_sse_transport(host: str = "localhost", port: int = 3001) -> bool: """Test SSE transport by checking the SSE endpoint.""" print(f"Testing SSE transport on {host}:{port}...") max_retries = 5 retry_delay = 2 # Increased delay for SSE server startup async with httpx.AsyncClient() as client: for i in range(max_retries): try: # Check the SSE endpoint with streaming async with client.stream( "GET", f"http://{host}:{port}/sse", timeout=5 ) as response: if response.status_code == 200: # For SSE, we expect a streaming response # Try to read the first few bytes to confirm it's working try: # Read a small chunk to see if we get SSE data async for chunk in response.aiter_text(chunk_size=100): if chunk and ( "ping" in chunk or "data:" in chunk or "event:" in chunk ): print("✓ SSE endpoint accessible and streaming") return True else: print("✓ SSE endpoint accessible") return True break # Only check first chunk except Exception: # No data received, but 200 status means endpoint exists print("✓ SSE endpoint accessible") return True else: print(f"✗ SSE endpoint failed: {response.status_code}") if i == max_retries - 1: # Last attempt return False except httpx.RequestError as e: print(f"Attempt {i+1}/{max_retries}: SSE transport test failed: {e}") if i < max_retries - 1: print(f"Waiting {retry_delay} seconds before retry...") await asyncio.sleep(retry_delay) else: return False return False # All retries exhausted def run_server_background(transport: str, **kwargs) -> subprocess.Popen: """Start a server in the background for testing.""" # Use proper module execution instead of server.py directly cmd = [sys.executable, "-m", "perplexica_mcp", transport] if transport in ["sse", "http", "all"]: cmd.extend([kwargs.get("host", "localhost")]) if transport in ["sse", "all"]: cmd.extend([str(kwargs.get("sse_port", 3001))]) if transport in ["http", "all"]: cmd.extend([str(kwargs.get("http_port", 3002))]) print(f"Starting server with command: {' '.join(cmd)}") # Pass current environment variables to subprocess so it can access .env file env = os.environ.copy() env["PYTHONUNBUFFERED"] = "1" # Ensure Python's output is unbuffered return subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env, text=True ) async def wait_for_server_ready( host: str, port: int, max_retries: int = 10, retry_delay: float = 0.5 ) -> bool: """Wait for server to be ready by checking if it responds to any request.""" async with httpx.AsyncClient() as client: for i in range(max_retries): try: # Try a simple HEAD request first to see if the server is responding at all response = await client.head(f"http://{host}:{port}/", timeout=2) # Any response code (even 4xx) means the server is running and responding print( f"✓ Server ready on {host}:{port} after {i+1} attempts (status: {response.status_code})" ) return True except httpx.RequestError: # Server not ready yet, continue polling pass if i < max_retries - 1: # Don't sleep on the last attempt await asyncio.sleep(retry_delay) print(f"✗ Server on {host}:{port} not ready after {max_retries} attempts") return False async def main(): """Run all transport tests.""" print("Perplexica MCP Server Transport Tests") print("=" * 40) results = {} # Test stdio transport results["stdio"] = test_stdio_transport() # Test HTTP transport print("\nStarting HTTP server for testing...") http_server = run_server_background("http", host="localhost", http_port=3002) # Give the server a moment to start up and capture any immediate output await asyncio.sleep(1.0) # Check for immediate failures if http_server.poll() is not None: print(f"✗ HTTP server failed to start. Exit code: {http_server.returncode}") try: stdout, _ = http_server.communicate(timeout=2) if stdout: print(f"Server output: {stdout}") except (subprocess.TimeoutExpired, OSError): pass results["http"] = False else: # Wait for server to be ready instead of fixed sleep server_ready = await wait_for_server_ready( "localhost", 3002, max_retries=20, retry_delay=0.5 ) if not server_ready: print("✗ HTTP server is running but not responding to requests") # Try to get some output from the server to diagnose the issue try: # Read some output without blocking import select if select.select([http_server.stdout], [], [], 0)[0]: output = http_server.stdout.read(1024) if output: print(f"Server output: {output}") except Exception as e: print(f"Could not read server output: {e}") results["http"] = False else: results["http"] = await test_http_transport("localhost", 3002) # Always clean up the HTTP server try: http_server.terminate() http_server.wait(timeout=5) except subprocess.TimeoutExpired: print("HTTP server didn't terminate gracefully, force killing...") http_server.kill() except Exception as e: print(f"Error terminating HTTP server: {e}") # Test SSE transport print("\nStarting SSE server for testing...") # Add a small delay to ensure previous server is fully cleaned up await asyncio.sleep(0.5) sse_server = run_server_background("sse", host="localhost", sse_port=3001) # Give SSE server more time to start up await asyncio.sleep(2.0) # Check for immediate SSE server failures if sse_server.poll() is not None: print(f"✗ SSE server failed to start. Exit code: {sse_server.returncode}") try: stdout, _ = sse_server.communicate(timeout=2) if stdout: print(f"SSE server output: {stdout}") except (subprocess.TimeoutExpired, OSError): pass results["sse"] = False else: # Wait for SSE server to be ready instead of fixed sleep # Note: SSE endpoint might not have a health endpoint, so we'll use the existing test function # which already has retry logic built in results["sse"] = await test_sse_transport("localhost", 3001) # Always clean up the SSE server try: sse_server.terminate() sse_server.wait(timeout=5) except subprocess.TimeoutExpired: print("SSE server didn't terminate gracefully, force killing...") sse_server.kill() except Exception as e: print(f"Error terminating SSE server: {e}") # Print results print("\n" + "=" * 40) print("Test Results:") print("=" * 40) for transport, success in results.items(): status = "✓ PASS" if success else "✗ FAIL" print(f"{transport.upper()} transport: {status}") all_passed = all(results.values()) print(f"\nOverall: {'✓ ALL TESTS PASSED' if all_passed else '✗ SOME TESTS FAILED'}") return 0 if all_passed else 1 if __name__ == "__main__": sys.exit(asyncio.run(main()))

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/thetom42/perplexica-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server