"""
Tests for Pydantic AI integration with BioMCP.
These tests verify the examples provided in the documentation work correctly.
"""
import asyncio
import os
import sys
import httpx
import pytest
from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerStdio
try:
from pydantic_ai.mcp import MCPServerStreamableHTTP # noqa: F401
HAS_STREAMABLE_HTTP = True
except ImportError:
HAS_STREAMABLE_HTTP = False
from pydantic_ai.models.test import TestModel
def worker_dependencies_available():
"""Check if worker dependencies (FastAPI, Starlette) are available."""
try:
import fastapi # noqa: F401
import starlette # noqa: F401
return True
except ImportError:
return False
# Skip marker for tests requiring worker dependencies
requires_worker = pytest.mark.skipif(
not worker_dependencies_available(),
reason="Worker dependencies (FastAPI/Starlette) not installed. Install with: pip install biomcp-python[worker]",
)
# Skip marker for tests requiring MCPServerStreamableHTTP
requires_streamable_http = pytest.mark.skipif(
not HAS_STREAMABLE_HTTP,
reason="MCPServerStreamableHTTP not available. Requires pydantic-ai>=0.6.9",
)
def get_free_port():
"""Get a free port for testing."""
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
s.listen(1)
port = s.getsockname()[1]
return port
async def wait_for_server(
url: str, max_retries: int = 60, process=None
) -> None:
"""Wait for server to be ready with retries."""
import sys
for i in range(max_retries):
# Check if process has exited with error
if process and process.poll() is not None:
stdout, stderr = process.communicate()
pytest.fail(
f"Server process exited with code {process.returncode}. Stderr: {stderr.decode() if stderr else 'None'}"
)
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=2)
if response.status_code == 200:
print(
f"\nServer ready after {i + 1} seconds",
file=sys.stderr,
)
return
except (httpx.ConnectError, httpx.ReadTimeout):
if i % 10 == 0:
print(
f"\nWaiting for server... ({i} seconds elapsed)",
file=sys.stderr,
)
await asyncio.sleep(1)
pytest.fail(f"Server at {url} did not start within {max_retries} seconds")
@pytest.mark.asyncio
async def test_stdio_mode_connection():
"""Test STDIO mode connection and tool listing."""
server = MCPServerStdio(
"python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=20
)
# Use TestModel to avoid needing API keys
model = TestModel(call_tools=["search"])
agent = Agent(model=model, toolsets=[server])
async with agent:
# Test a simple query to verify connection works
result = await agent.run("List available tools")
# Should get a response without errors
assert result is not None
assert result.output is not None
@pytest.mark.asyncio
async def test_stdio_mode_simple_query():
"""Test STDIO mode with a simple search query."""
server = MCPServerStdio(
"python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=20
)
# Use TestModel configured to call search
model = TestModel(call_tools=["search"])
agent = Agent(model=model, toolsets=[server])
async with agent:
result = await agent.run("Find 1 melanoma clinical trial")
# TestModel will have called the search tool
assert result.output is not None
# The TestModel returns mock data, but we're testing the connection works
assert result.output != ""
@pytest.mark.asyncio
async def test_stdio_mode_with_openai():
"""Test STDIO mode with OpenAI (requires OPENAI_API_KEY)."""
# Skip if no API key
if not os.getenv("OPENAI_API_KEY"):
pytest.skip("OPENAI_API_KEY not set")
server = MCPServerStdio(
"python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=30
)
agent = Agent("openai:gpt-4o-mini", toolsets=[server])
async with agent:
result = await agent.run(
"Find 1 article about BRAF V600E mutations. Return just the title."
)
# Should get a real result
assert result.output is not None
assert len(result.output) > 0
@requires_worker
@requires_streamable_http
@pytest.mark.asyncio
async def test_streamable_http_mode_connection():
"""Test Streamable HTTP mode connection for Pydantic AI."""
import subprocess
from pydantic_ai.mcp import MCPServerStreamableHTTP
port = get_free_port()
# Start server in streamable_http mode
server_process = subprocess.Popen( # noqa: S603
[
sys.executable,
"-m",
"biomcp",
"run",
"--mode",
"streamable_http",
"--port",
str(port),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
# Wait for server to be ready
await wait_for_server(
f"http://localhost:{port}/health", process=server_process
)
# Connect to the /mcp endpoint
server = MCPServerStreamableHTTP(f"http://localhost:{port}/mcp")
# Use TestModel to avoid needing API keys
model = TestModel(call_tools=["search"])
agent = Agent(model=model, toolsets=[server])
async with agent:
# Test a simple query to verify connection
result = await agent.run("Test connection")
assert result is not None
assert result.output is not None
finally:
# Clean up server process
server_process.terminate()
server_process.wait(timeout=5)
@requires_worker
@requires_streamable_http
@pytest.mark.asyncio
async def test_streamable_http_simple_query():
"""Test a simple biomedical query using Streamable HTTP."""
import subprocess
from pydantic_ai.mcp import MCPServerStreamableHTTP
port = get_free_port()
server_process = subprocess.Popen( # noqa: S603
[
sys.executable,
"-m",
"biomcp",
"run",
"--mode",
"streamable_http",
"--port",
str(port),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
# Wait for server to be ready
await wait_for_server(
f"http://localhost:{port}/health", process=server_process
)
# Connect to the /mcp endpoint
server = MCPServerStreamableHTTP(f"http://localhost:{port}/mcp")
# Use TestModel with tool calls for search
model = TestModel(call_tools=["search"])
agent = Agent(model=model, toolsets=[server])
async with agent:
result = await agent.run(
"Find 1 article about BRAF mutations. Return just the title."
)
# Should get a result
assert result.output is not None
assert len(result.output) > 0
finally:
server_process.terminate()
server_process.wait(timeout=5)
@requires_worker
@pytest.mark.asyncio
async def test_worker_mode_streamable_http():
"""Test worker mode which now uses streamable HTTP under the hood."""
import subprocess
port = get_free_port()
# Start server in worker mode (which uses streamable HTTP)
server_process = subprocess.Popen( # noqa: S603
[
sys.executable,
"-m",
"biomcp",
"run",
"--mode",
"worker",
"--port",
str(port),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
# Wait for server to be ready
await wait_for_server(
f"http://localhost:{port}/health", process=server_process
)
# Worker mode exposes /mcp endpoint through streamable HTTP
async with httpx.AsyncClient() as client:
# Test the /mcp endpoint with initialize request
response = await client.post(
f"http://localhost:{port}/mcp",
json={
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {},
"clientInfo": {"name": "test", "version": "1.0"},
},
"id": 1,
},
headers={
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
},
)
# Worker mode may return various codes depending on initialization state
# 200 = success, 406 = accept header issue, 500 = initialization incomplete
assert response.status_code in [200, 406, 500]
# Health endpoint should work
health_response = await client.get(
f"http://localhost:{port}/health"
)
assert health_response.status_code == 200
assert health_response.json()["status"] == "healthy"
finally:
server_process.terminate()
server_process.wait(timeout=5)
@pytest.mark.asyncio
async def test_connection_verification_script():
"""Test the connection verification script from documentation."""
server = MCPServerStdio(
"python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=20
)
# Use TestModel to avoid needing LLM credentials
agent = Agent(model=TestModel(call_tools=["search"]), toolsets=[server])
async with agent:
# Test a simple search to verify connection
result = await agent.run("Test search for BRAF")
# Verify connection successful
assert result is not None
assert result.output is not None
@pytest.mark.asyncio
async def test_biomedical_research_workflow():
"""Test a complete biomedical research workflow."""
server = MCPServerStdio(
"python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=30
)
# Use TestModel configured to use multiple tools
model = TestModel(call_tools=["think", "search", "fetch"])
agent = Agent(model=model, toolsets=[server])
async with agent:
# Complex multi-step query
result = await agent.run("""
First use the think tool to plan your approach, then:
1. Search for articles about BRAF mutations
2. Find relevant clinical trials
""")
# Should complete without errors
assert result is not None
assert result.output is not None
@requires_worker
@pytest.mark.asyncio
async def test_health_endpoint():
"""Test that the health endpoint is accessible."""
import subprocess
port = get_free_port()
server_process = subprocess.Popen( # noqa: S603
[
sys.executable,
"-m",
"biomcp",
"run",
"--mode",
"worker",
"--port",
str(port),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
# Give subprocess a moment to start
await asyncio.sleep(2)
# Wait for server to be ready
await wait_for_server(
f"http://localhost:{port}/health", process=server_process
)
async with httpx.AsyncClient() as client:
response = await client.get(f"http://localhost:{port}/health")
assert response.status_code == 200
data = response.json()
assert "status" in data
assert data["status"] in ["healthy", "ok"]
finally:
server_process.terminate()
server_process.wait(timeout=5)
@requires_worker
@pytest.mark.asyncio
async def test_auth_token_required():
"""Test that requests without auth fail when MCP_AUTH_TOKEN is set."""
import subprocess
port = get_free_port()
token = "test_auth_token_" + "a" * 16 # 32 chars total
# Start server with auth token
env = os.environ.copy()
env["MCP_AUTH_TOKEN"] = token
server_process = subprocess.Popen( # noqa: S603
[
sys.executable,
"-m",
"biomcp",
"run",
"--mode",
"streamable_http",
"--port",
str(port),
],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
# Wait for server to be ready (health endpoint bypasses auth)
await wait_for_server(
f"http://localhost:{port}/health", process=server_process
)
async with httpx.AsyncClient() as client:
# Request without auth should fail
response = await client.post(
f"http://localhost:{port}/mcp",
json={"jsonrpc": "2.0", "method": "initialize", "id": 1},
)
assert response.status_code == 401
data = response.json()
assert data["error"] == "unauthorized"
finally:
server_process.terminate()
server_process.wait(timeout=5)
@requires_worker
@pytest.mark.asyncio
async def test_auth_token_valid():
"""Test that requests with valid auth succeed when MCP_AUTH_TOKEN is set."""
import subprocess
port = get_free_port()
token = "test_auth_token_" + "b" * 16 # 32 chars total
env = os.environ.copy()
env["MCP_AUTH_TOKEN"] = token
server_process = subprocess.Popen( # noqa: S603
[
sys.executable,
"-m",
"biomcp",
"run",
"--mode",
"streamable_http",
"--port",
str(port),
],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
await wait_for_server(
f"http://localhost:{port}/health", process=server_process
)
async with httpx.AsyncClient() as client:
# Request with valid auth should not get 401
response = await client.post(
f"http://localhost:{port}/mcp",
json={"jsonrpc": "2.0", "method": "initialize", "id": 1},
headers={"Authorization": f"Bearer {token}"},
)
# Should not be 401 - actual status depends on MCP state
assert response.status_code != 401
finally:
server_process.terminate()
server_process.wait(timeout=5)
@requires_worker
@pytest.mark.asyncio
async def test_auth_token_invalid():
"""Test that requests with wrong auth token fail."""
import subprocess
port = get_free_port()
token = "test_auth_token_" + "c" * 16 # 32 chars total
wrong_token = "wrong_auth_token" + "d" * 16 # 32 chars total
env = os.environ.copy()
env["MCP_AUTH_TOKEN"] = token
server_process = subprocess.Popen( # noqa: S603
[
sys.executable,
"-m",
"biomcp",
"run",
"--mode",
"streamable_http",
"--port",
str(port),
],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
await wait_for_server(
f"http://localhost:{port}/health", process=server_process
)
async with httpx.AsyncClient() as client:
# Request with wrong token should fail
response = await client.post(
f"http://localhost:{port}/mcp",
json={"jsonrpc": "2.0", "method": "initialize", "id": 1},
headers={"Authorization": f"Bearer {wrong_token}"},
)
assert response.status_code == 401
data = response.json()
assert data["error"] == "unauthorized"
assert "Invalid token" in data["error_description"]
finally:
server_process.terminate()
server_process.wait(timeout=5)
@requires_worker
@pytest.mark.asyncio
async def test_auth_health_no_auth_required():
"""Test that /health endpoint works without auth even when token is set."""
import subprocess
port = get_free_port()
token = "test_auth_token_" + "e" * 16 # 32 chars total
env = os.environ.copy()
env["MCP_AUTH_TOKEN"] = token
server_process = subprocess.Popen( # noqa: S603
[
sys.executable,
"-m",
"biomcp",
"run",
"--mode",
"streamable_http",
"--port",
str(port),
],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
# Give subprocess a moment to start.
# Note: We use sleep instead of wait_for_server() to avoid circular
# dependency since wait_for_server() relies on the /health endpoint.
await asyncio.sleep(2)
async with httpx.AsyncClient() as client:
# Health endpoint should work without auth
response = await client.get(f"http://localhost:{port}/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
finally:
server_process.terminate()
server_process.wait(timeout=5)
@requires_worker
@pytest.mark.asyncio
async def test_auth_no_token_set():
"""Test that requests work without auth when MCP_AUTH_TOKEN is not set."""
import subprocess
port = get_free_port()
# Ensure MCP_AUTH_TOKEN is not set
env = os.environ.copy()
env.pop("MCP_AUTH_TOKEN", None)
server_process = subprocess.Popen( # noqa: S603
[
sys.executable,
"-m",
"biomcp",
"run",
"--mode",
"streamable_http",
"--port",
str(port),
],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
await wait_for_server(
f"http://localhost:{port}/health", process=server_process
)
async with httpx.AsyncClient() as client:
# Request without auth should work when no token is configured
response = await client.post(
f"http://localhost:{port}/mcp",
json={"jsonrpc": "2.0", "method": "initialize", "id": 1},
)
# Should not be 401 - auth is disabled
assert response.status_code != 401
finally:
server_process.terminate()
server_process.wait(timeout=5)
@requires_worker
@pytest.mark.asyncio
async def test_worker_mode_rejects_auth_token():
"""Test that worker mode fails fast when MCP_AUTH_TOKEN is set."""
import subprocess
port = get_free_port()
token = "test_auth_token_" + "f" * 16 # 32 chars total
# Set auth token with worker mode - should fail
env = os.environ.copy()
env["MCP_AUTH_TOKEN"] = token
server_process = subprocess.Popen( # noqa: S603
[
sys.executable,
"-m",
"biomcp",
"run",
"--mode",
"worker",
"--port",
str(port),
],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Server should exit with code 1
return_code = server_process.wait(timeout=10)
assert return_code == 1
# Check stderr for the error message
stderr = server_process.stderr.read().decode()
assert "does not support authentication" in stderr