#!/usr/bin/env python3
"""
Test the MCP server functionality
"""
import asyncio
import json
import subprocess
import sys
async def test_mcp_server():
"""Test the MCP server by sending JSON-RPC requests"""
print("๐งช Testing MCP server functionality...")
# Start the MCP server as a subprocess
process = subprocess.Popen(
[sys.executable, "mcp_server.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
# Test 1: List tools
print("1. Testing tools/list...")
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}
process.stdin.write(json.dumps(request) + "\n")
process.stdin.flush()
response_line = process.stdout.readline()
response = json.loads(response_line)
if "result" in response and "tools" in response["result"]:
tools = response["result"]["tools"]
print(f"โ
Found {len(tools)} tools:")
for tool in tools:
print(f" - {tool['name']}: {tool['description']}")
else:
print(f"โ Unexpected response: {response}")
# Test 2: Start collaboration
print("\n2. Testing start_gemini_collaboration...")
request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": "start_gemini_collaboration",
"arguments": {"topic": "Test Collaboration"}
}
}
process.stdin.write(json.dumps(request) + "\n")
process.stdin.flush()
response_line = process.stdout.readline()
response = json.loads(response_line)
if "result" in response and "content" in response["result"]:
content = response["result"]["content"][0]["text"]
print(f"โ
Collaboration started: {content}")
else:
print(f"โ Unexpected response: {response}")
print("\n๐ MCP server test completed!")
except Exception as e:
print(f"โ Test failed: {e}")
finally:
process.terminate()
process.wait()
if __name__ == "__main__":
asyncio.run(test_mcp_server())