FastAPI-MCP
by tadata-org
- tests
#!/usr/bin/env python
"""
Test script to verify that an MCP server is properly exposing tools.
This script connects to the MCP server, initializes a session, and requests a list of available tools.
"""
# TODO: Turn this into a pytest test
import json
import sys
import asyncio
import httpx
from urllib.parse import urljoin
# Default MCP server URL
MCP_URL = "http://localhost:8000/mcp"
async def test_mcp_tools(url=MCP_URL):
"""Connect to the MCP server and test tool exposure."""
print(f"Connecting to MCP server at {url}...")
# Connect to the SSE endpoint to establish connection
async with httpx.AsyncClient(timeout=httpx.Timeout(120.0)) as client:
# First establish an SSE connection
endpoint_url = None
base_url = url.rsplit("/", 1)[0] + "/" # Extract base URL (everything up to the last path component)
response_queue = asyncio.Queue()
# Task to send requests and receive responses through the SSE channel
async def send_request(request_data, request_id):
# Send the request
await client.post(endpoint_url, json=request_data)
print(f"Sent request with ID: {request_id}")
# Wait for the response with matching ID
while True:
response = await response_queue.get()
if "id" in response and response["id"] == request_id:
return response
else:
# Not our response, put it back in the queue for someone else
await response_queue.put(response)
# Start the SSE connection
async with client.stream("GET", url) as response:
response.raise_for_status()
print("Connected to MCP server")
# Process the SSE stream
current_event = None
async def process_sse_stream():
nonlocal current_event, endpoint_url
async for line in response.aiter_lines():
line = line.strip()
if not line:
continue
if line.startswith("event:"):
current_event = line[len("event:") :].strip()
print(f"Received event: {current_event}")
elif line.startswith("data:"):
data = line[len("data:") :].strip()
if current_event == "endpoint":
endpoint_path = data
endpoint_url = urljoin(base_url, endpoint_path.lstrip("/"))
print(f"Endpoint URL: {endpoint_url}")
elif current_event == "message":
try:
message = json.loads(data)
# Pretty print the JSON message
print("Received message:")
print(json.dumps(message, indent=2))
# Add to queue for request handlers
await response_queue.put(message)
except json.JSONDecodeError:
print(f"Failed to parse message: {data}")
# Start processing the SSE stream in the background
background_task = asyncio.create_task(process_sse_stream())
# Wait for the endpoint URL to be set
while endpoint_url is None:
await asyncio.sleep(0.1)
try:
# 1. Initialize request
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {"experimental": {}},
"clientInfo": {"name": "mcp-test-client", "version": "0.1.0"},
},
}
print("\nSending initialize request...")
init_result = await send_request(init_request, 1)
print("\nInitialization response:")
print(json.dumps(init_result, indent=2))
# 2. Send initialized notification
init_notification = {"jsonrpc": "2.0", "method": "notifications/initialized"}
await client.post(endpoint_url, json=init_notification)
print("\nSent initialized notification")
# 3. List tools request
list_tools_request = {"jsonrpc": "2.0", "id": 2, "method": "tools/list"}
print("\nSending tools/list request...")
tools_result = await send_request(list_tools_request, 2)
print("\nTools list response:")
print(json.dumps(tools_result, indent=2))
# Check if we got a valid response
if "result" in tools_result and "tools" in tools_result["result"]:
tools = tools_result["result"]["tools"]
if tools:
print(f"\nFound {len(tools)} tools:")
for i, tool in enumerate(tools):
print(f"{i + 1}. {tool['name']}")
print(f"{tool.get('description', 'No description')}")
# 4. Find and call the get_item_count tool
get_item_count_tool = None
for tool in tools:
if tool["name"] == "get_item_count":
get_item_count_tool = tool
break
if get_item_count_tool:
print(f"\nTrying to call tool: {get_item_count_tool['name']}")
call_tool_request = {
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {
"name": get_item_count_tool["name"],
"arguments": {}, # Empty arguments for get_item_count which doesn't require any
},
}
print("\nSending tools/call request...")
tool_result = await send_request(call_tool_request, 3)
print("\nTool call response:")
print(json.dumps(tool_result, indent=2))
else:
print("\nCould not find get_item_count tool")
else:
print("\nNo tools found")
else:
print("\nInvalid tools/list response format")
finally:
# Clean up and cancel the background task
background_task.cancel()
try:
await background_task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
url = sys.argv[1] if len(sys.argv) > 1 else MCP_URL
asyncio.run(test_mcp_tools(url))