Skip to main content
Glama
test_pydantic_ai_integration.py12.2 kB
""" Tests for Pydantic AI integration with BioMCP. These tests verify the examples provided in the documentation work correctly. """ import asyncio import os import sys import httpx import pytest from pydantic_ai import Agent from pydantic_ai.mcp import MCPServerStdio try: from pydantic_ai.mcp import MCPServerStreamableHTTP # noqa: F401 HAS_STREAMABLE_HTTP = True except ImportError: HAS_STREAMABLE_HTTP = False from pydantic_ai.models.test import TestModel def worker_dependencies_available(): """Check if worker dependencies (FastAPI, Starlette) are available.""" try: import fastapi # noqa: F401 import starlette # noqa: F401 return True except ImportError: return False # Skip marker for tests requiring worker dependencies requires_worker = pytest.mark.skipif( not worker_dependencies_available(), reason="Worker dependencies (FastAPI/Starlette) not installed. Install with: pip install biomcp-python[worker]", ) # Skip marker for tests requiring MCPServerStreamableHTTP requires_streamable_http = pytest.mark.skipif( not HAS_STREAMABLE_HTTP, reason="MCPServerStreamableHTTP not available. Requires pydantic-ai>=0.6.9", ) def get_free_port(): """Get a free port for testing.""" import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("", 0)) s.listen(1) port = s.getsockname()[1] return port async def wait_for_server( url: str, max_retries: int = 60, process=None ) -> None: """Wait for server to be ready with retries.""" import sys for i in range(max_retries): # Check if process has exited with error if process and process.poll() is not None: stdout, stderr = process.communicate() pytest.fail( f"Server process exited with code {process.returncode}. Stderr: {stderr.decode() if stderr else 'None'}" ) try: async with httpx.AsyncClient() as client: response = await client.get(url, timeout=2) if response.status_code == 200: print( f"\nServer ready after {i + 1} seconds", file=sys.stderr, ) return except (httpx.ConnectError, httpx.ReadTimeout): if i % 10 == 0: print( f"\nWaiting for server... ({i} seconds elapsed)", file=sys.stderr, ) await asyncio.sleep(1) pytest.fail(f"Server at {url} did not start within {max_retries} seconds") @pytest.mark.asyncio async def test_stdio_mode_connection(): """Test STDIO mode connection and tool listing.""" server = MCPServerStdio( "python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=20 ) # Use TestModel to avoid needing API keys model = TestModel(call_tools=["search"]) agent = Agent(model=model, toolsets=[server]) async with agent: # Test a simple query to verify connection works result = await agent.run("List available tools") # Should get a response without errors assert result is not None assert result.output is not None @pytest.mark.asyncio async def test_stdio_mode_simple_query(): """Test STDIO mode with a simple search query.""" server = MCPServerStdio( "python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=20 ) # Use TestModel configured to call search model = TestModel(call_tools=["search"]) agent = Agent(model=model, toolsets=[server]) async with agent: result = await agent.run("Find 1 melanoma clinical trial") # TestModel will have called the search tool assert result.output is not None # The TestModel returns mock data, but we're testing the connection works assert result.output != "" @pytest.mark.asyncio async def test_stdio_mode_with_openai(): """Test STDIO mode with OpenAI (requires OPENAI_API_KEY).""" # Skip if no API key if not os.getenv("OPENAI_API_KEY"): pytest.skip("OPENAI_API_KEY not set") server = MCPServerStdio( "python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=30 ) agent = Agent("openai:gpt-4o-mini", toolsets=[server]) async with agent: result = await agent.run( "Find 1 article about BRAF V600E mutations. Return just the title." ) # Should get a real result assert result.output is not None assert len(result.output) > 0 @requires_worker @requires_streamable_http @pytest.mark.asyncio async def test_streamable_http_mode_connection(): """Test Streamable HTTP mode connection for Pydantic AI.""" import subprocess from pydantic_ai.mcp import MCPServerStreamableHTTP port = get_free_port() # Start server in streamable_http mode server_process = subprocess.Popen( # noqa: S603 [ sys.executable, "-m", "biomcp", "run", "--mode", "streamable_http", "--port", str(port), ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) try: # Wait for server to be ready await wait_for_server( f"http://localhost:{port}/health", process=server_process ) # Connect to the /mcp endpoint server = MCPServerStreamableHTTP(f"http://localhost:{port}/mcp") # Use TestModel to avoid needing API keys model = TestModel(call_tools=["search"]) agent = Agent(model=model, toolsets=[server]) async with agent: # Test a simple query to verify connection result = await agent.run("Test connection") assert result is not None assert result.output is not None finally: # Clean up server process server_process.terminate() server_process.wait(timeout=5) @requires_worker @requires_streamable_http @pytest.mark.asyncio async def test_streamable_http_simple_query(): """Test a simple biomedical query using Streamable HTTP.""" import subprocess from pydantic_ai.mcp import MCPServerStreamableHTTP port = get_free_port() server_process = subprocess.Popen( # noqa: S603 [ sys.executable, "-m", "biomcp", "run", "--mode", "streamable_http", "--port", str(port), ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) try: # Wait for server to be ready await wait_for_server( f"http://localhost:{port}/health", process=server_process ) # Connect to the /mcp endpoint server = MCPServerStreamableHTTP(f"http://localhost:{port}/mcp") # Use TestModel with tool calls for search model = TestModel(call_tools=["search"]) agent = Agent(model=model, toolsets=[server]) async with agent: result = await agent.run( "Find 1 article about BRAF mutations. Return just the title." ) # Should get a result assert result.output is not None assert len(result.output) > 0 finally: server_process.terminate() server_process.wait(timeout=5) @requires_worker @pytest.mark.asyncio async def test_worker_mode_streamable_http(): """Test worker mode which now uses streamable HTTP under the hood.""" import subprocess port = get_free_port() # Start server in worker mode (which uses streamable HTTP) server_process = subprocess.Popen( # noqa: S603 [ sys.executable, "-m", "biomcp", "run", "--mode", "worker", "--port", str(port), ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) try: # Wait for server to be ready await wait_for_server( f"http://localhost:{port}/health", process=server_process ) # Worker mode exposes /mcp endpoint through streamable HTTP async with httpx.AsyncClient() as client: # Test the /mcp endpoint with initialize request response = await client.post( f"http://localhost:{port}/mcp", json={ "jsonrpc": "2.0", "method": "initialize", "params": { "protocolVersion": "2025-06-18", "capabilities": {}, "clientInfo": {"name": "test", "version": "1.0"}, }, "id": 1, }, headers={ "Content-Type": "application/json", "Accept": "application/json, text/event-stream", }, ) # Worker mode may return various codes depending on initialization state # 200 = success, 406 = accept header issue, 500 = initialization incomplete assert response.status_code in [200, 406, 500] # Health endpoint should work health_response = await client.get( f"http://localhost:{port}/health" ) assert health_response.status_code == 200 assert health_response.json()["status"] == "healthy" finally: server_process.terminate() server_process.wait(timeout=5) @pytest.mark.asyncio async def test_connection_verification_script(): """Test the connection verification script from documentation.""" server = MCPServerStdio( "python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=20 ) # Use TestModel to avoid needing LLM credentials agent = Agent(model=TestModel(call_tools=["search"]), toolsets=[server]) async with agent: # Test a simple search to verify connection result = await agent.run("Test search for BRAF") # Verify connection successful assert result is not None assert result.output is not None @pytest.mark.asyncio async def test_biomedical_research_workflow(): """Test a complete biomedical research workflow.""" server = MCPServerStdio( "python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=30 ) # Use TestModel configured to use multiple tools model = TestModel(call_tools=["think", "search", "fetch"]) agent = Agent(model=model, toolsets=[server]) async with agent: # Complex multi-step query result = await agent.run(""" First use the think tool to plan your approach, then: 1. Search for articles about BRAF mutations 2. Find relevant clinical trials """) # Should complete without errors assert result is not None assert result.output is not None @requires_worker @pytest.mark.asyncio async def test_health_endpoint(): """Test that the health endpoint is accessible.""" import subprocess port = get_free_port() server_process = subprocess.Popen( # noqa: S603 [ sys.executable, "-m", "biomcp", "run", "--mode", "worker", "--port", str(port), ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) try: # Give subprocess a moment to start await asyncio.sleep(2) # Wait for server to be ready await wait_for_server( f"http://localhost:{port}/health", process=server_process ) async with httpx.AsyncClient() as client: response = await client.get(f"http://localhost:{port}/health") assert response.status_code == 200 data = response.json() assert "status" in data assert data["status"] in ["healthy", "ok"] finally: server_process.terminate() server_process.wait(timeout=5)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/genomoncology/biomcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server