#!/usr/bin/env python3
"""
Test script to verify FastAPI + MCP Server integration
"""
import asyncio
import subprocess
import time
import requests
import json
from mcp.client.stdio import stdio_client
from mcp.client.session import ClientSession
async def test_fastapi_app():
"""Test the FastAPI app directly"""
print("π§ͺ Testing FastAPI app...")
try:
# Test root endpoint
response = requests.get("http://localhost:8000/")
print(f"β
Root endpoint: {response.json()}")
# Test health endpoint
response = requests.get("http://localhost:8000/health")
print(f"β
Health endpoint: {response.json()}")
# Test creating a user
user_data = {"name": "Test User", "email": "test@example.com", "age": 30}
response = requests.post("http://localhost:8000/users", json=user_data)
print(f"β
Create user: {response.json()}")
# Test getting users
response = requests.get("http://localhost:8000/users")
print(f"β
Get users: {response.json()}")
return True
except Exception as e:
print(f"β FastAPI test failed: {e}")
return False
async def test_mcp_server():
"""Test the MCP server"""
print("\nπ§ͺ Testing MCP server...")
try:
# Start MCP server as subprocess
process = subprocess.Popen(
["python", "mcp_server.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Give it a moment to start
await asyncio.sleep(2)
# Test MCP server communication
async with stdio_client(process.stdin, process.stdout) as (read, write):
async with ClientSession(read, write) as session:
# Initialize
await session.initialize()
# List tools
tools = await session.list_tools()
print(f"β
MCP tools available: {len(tools.tools)}")
# Test calling a tool
result = await session.call_tool("get_app_info", {})
print(f"β
MCP tool call result: {result.content[0].text}")
process.terminate()
return True
except Exception as e:
print(f"β MCP test failed: {e}")
return False
async def main():
"""Main test function"""
print("π Starting FastAPI + MCP Server Integration Test")
# Start FastAPI app
print("\nπ‘ Starting FastAPI app...")
fastapi_process = subprocess.Popen(
["python", "sample_app.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Wait for FastAPI to start
await asyncio.sleep(3)
# Test FastAPI
fastapi_ok = await test_fastapi_app()
if fastapi_ok:
# Test MCP server
mcp_ok = await test_mcp_server()
if mcp_ok:
print("\nπ All tests passed! Integration is working correctly.")
else:
print("\nβ οΈ MCP server test failed, but FastAPI is working.")
else:
print("\nβ FastAPI test failed.")
# Clean up
fastapi_process.terminate()
print("\nβ
Test completed.")
if __name__ == "__main__":
asyncio.run(main())