#!/usr/bin/env python3
"""
Test client for Agent Orchestration Platform MCP Server
This script tests the MCP server functionality by calling its tools
and verifying the responses.
"""
import asyncio
import sys
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from fastmcp import Client
async def test_server():
"""Test the MCP server functionality."""
print("π§ Testing Agent Orchestration Platform MCP Server...")
# Test with the server script directly
server_script = project_root / "src" / "main.py"
try:
async with Client(str(server_script)) as client:
print("β
Successfully connected to server")
# Test 1: Create a session
print("\nπ Testing create_session...")
session_result = await client.call_tool("create_session", {
"root_path": str(project_root),
"session_name": "test_session"
})
print(f" Result: {session_result[0].text}")
# Test 2: Create an agent
print("\nπ€ Testing create_agent...")
agent_result = await client.call_tool("create_agent", {
"session_id": "session_test_session",
"agent_name": "Agent_1",
"specialization": "testing"
})
print(f" Result: {agent_result[0].text}")
# Test 3: Send message to agent
print("\n㪠Testing send_message_to_agent...")
message_result = await client.call_tool("send_message_to_agent", {
"agent_name": "Agent_1",
"message": "Hello, Agent_1! This is a test message."
})
print(f" Result: {message_result[0].text}")
# Test 4: Get session status
print("\nπ Testing get_session_status...")
status_result = await client.call_tool("get_session_status", {
"session_id": "session_test_session"
})
print(f" Result: {status_result[0].text}")
# Test 5: List sessions
print("\nπ Testing list_sessions...")
list_result = await client.call_tool("list_sessions", {})
print(f" Result: {list_result[0].text}")
# Test 6: Test resource
print("\nπ Testing session resource...")
try:
resource_result = await client.read_resource("session://test_session/info")
print(f" Result: {resource_result[0].text}")
except Exception as e:
print(f" Resource test skipped: {e}")
# Test 7: Delete agent
print("\nποΈ Testing delete_agent...")
delete_agent_result = await client.call_tool("delete_agent", {
"agent_name": "Agent_1",
"force": False
})
print(f" Result: {delete_agent_result[0].text}")
# Test 8: Delete session
print("\nποΈ Testing delete_session...")
delete_session_result = await client.call_tool("delete_session", {
"session_id": "session_test_session",
"force": False
})
print(f" Result: {delete_session_result[0].text}")
print("\nπ All tests completed successfully!")
print("β
MCP Server is working correctly!")
except Exception as e:
print(f"β Test failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
print("Agent Orchestration Platform - MCP Server Test")
print("=" * 50)
asyncio.run(test_server())