test_client.pyโข5.73 kB
#!/usr/bin/env python3
"""
Test client for the MCP Testing Harness
"""
import asyncio
import json
import logging
import sys
from pathlib import Path
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def test_mcp_client():
"""Test connecting to our MCP server."""
print("๐งช Testing MCP Client Connection")
print("=" * 35)
try:
# Connect to the server
print("๐ก Connecting to localhost:8000...")
reader, writer = await asyncio.open_connection("localhost", 8000)
print("โ
Connected successfully!")
# Send initialization request
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "MCP Test Client",
"version": "1.0.0"
}
}
}
print("๐ค Sending initialization request...")
writer.write(json.dumps(init_request).encode())
await writer.drain()
# Read response
response_line = await reader.readline()
response = None
if isinstance(response_line, (bytes, bytearray)):
response_str = response_line.decode().strip()
try:
response = json.loads(response_str)
except Exception as e:
print("Raw response:", response_str)
raise
elif isinstance(response_line, str):
try:
response = json.loads(response_line.strip())
except Exception as e:
print("Raw response:", response_line)
raise
elif isinstance(response_line, dict):
response = response_line
else:
print("Unknown response type:", type(response_line))
response = None
print("๐ฅ Received response:")
if response is not None:
print(json.dumps(response, indent=2))
else:
print("No response parsed.")
if "result" in response:
print("โ
Initialization successful!")
# Test tools listing
tools_request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
}
print("\n๐ค Sending tools/list request...")
writer.write(json.dumps(tools_request).encode())
await writer.drain()
# Read response
response_line = await reader.readline()
response = None
if isinstance(response_line, (bytes, bytearray)):
response_str = response_line.decode().strip()
try:
response = json.loads(response_str)
except Exception as e:
print("Raw response:", response_str)
raise
elif isinstance(response_line, str):
try:
response = json.loads(response_line.strip())
except Exception as e:
print("Raw response:", response_line)
raise
elif isinstance(response_line, dict):
response = response_line
else:
print("Unknown response type:", type(response_line))
response = None
print("๐ฅ Received tools response:")
print(json.dumps(response, indent=2))
if "result" in response and "tools" in response["result"]:
tools = response["result"]["tools"]
print(f"โ
Found {len(tools)} tools:")
for tool in tools:
print(f" - {tool.get('name', 'unknown')}: {tool.get('description', 'No description')}")
# Test a built-in tool
list_servers_request = {
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {
"name": "list_servers",
"arguments": {}
}
}
print("\n๐ค Testing list_servers tool...")
writer.write(json.dumps(list_servers_request).encode())
await writer.drain()
# Read response
response_line = await reader.readline()
response = None
if isinstance(response_line, (bytes, bytearray)):
response_str = response_line.decode().strip()
try:
response = json.loads(response_str)
except Exception as e:
print("Raw response:", response_str)
raise
elif isinstance(response_line, str):
try:
response = json.loads(response_line.strip())
except Exception as e:
print("Raw response:", response_line)
raise
elif isinstance(response_line, dict):
response = response_line
else:
print("Unknown response type:", type(response_line))
response = None
except Exception as e:
print(f"โ Error during test: {e}")
# Print the last response if available
if 'response' in locals():
print("Last response:", response)
# ...existing code...
if __name__ == "__main__":
asyncio.run(test_mcp_client())