enhanced_mcp_harness.pyโข12.6 kB
#!/usr/bin/env python3
"""
Enhanced MCP Testing Harness with Print Server Integration
"""
import asyncio
import json
import logging
import sys
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Any
# Add src to path
sys.path.insert(0, str(Path(__file__).parent / "src"))
from core.server import MCPServer, MCPTool, MCPServerRegistry
from core.testing import MCPServerTester
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EnhancedMCPServer(MCPServer):
"""Enhanced MCP server with print functionality and server management."""
def __init__(self, host="localhost", port=8000, debug=True):
super().__init__(host=host, port=port, debug=debug)
self.messages = []
self.managed_servers = {} # Track other MCP servers
self._setup_enhanced_tools()
async def _send_response(self, writer, response):
"""Send a response to the client, newline-terminated."""
data = (json.dumps(response) + '\n').encode()
writer.write(data)
await writer.drain()
async def _send_error(self, writer, message):
"""Send an error response to the client, newline-terminated."""
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": message
}
}
await self._send_response(writer, error_response)
def _setup_enhanced_tools(self):
"""Setup enhanced tools including print functionality."""
# Print message tool
print_tool = MCPTool(
name="print_message",
description="Print a message that will be displayed on the web interface",
input_schema={
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "The message to print"
},
"level": {
"type": "string",
"enum": ["info", "warning", "error", "success"],
"default": "info",
"description": "Message level"
}
},
"required": ["message"]
},
output_schema={
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"},
"timestamp": {"type": "string"}
}
},
handler=self._print_message
)
# List messages tool
list_tool = MCPTool(
name="list_messages",
description="List all printed messages",
input_schema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"default": 10,
"description": "Maximum number of messages to return"
}
}
},
output_schema={
"type": "object",
"properties": {
"messages": {
"type": "array",
"items": {
"type": "object",
"properties": {
"message": {"type": "string"},
"level": {"type": "string"},
"timestamp": {"type": "string"}
}
}
}
}
},
handler=self._list_messages
)
# Clear messages tool
clear_tool = MCPTool(
name="clear_messages",
description="Clear all stored messages",
input_schema={},
output_schema={
"type": "object",
"properties": {
"success": {"type": "boolean"},
"count": {"type": "integer"}
}
},
handler=self._clear_messages
)
# Register managed server tool
register_server_tool = MCPTool(
name="register_managed_server",
description="Register another MCP server to be managed by this harness",
input_schema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the server"
},
"host": {
"type": "string",
"description": "Host address"
},
"port": {
"type": "integer",
"description": "Port number"
},
"description": {
"type": "string",
"description": "Server description"
}
},
"required": ["name", "host", "port"]
},
output_schema={
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"}
}
},
handler=self._register_managed_server
)
# List managed servers tool
list_servers_tool = MCPTool(
name="list_managed_servers",
description="List all managed MCP servers",
input_schema={},
output_schema={
"type": "object",
"properties": {
"servers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"host": {"type": "string"},
"port": {"type": "integer"},
"description": {"type": "string"},
"status": {"type": "string"}
}
}
}
}
},
handler=self._list_managed_servers
)
# Test managed server tool
test_server_tool = MCPTool(
name="test_managed_server",
description="Test a managed MCP server",
input_schema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the server to test"
}
},
"required": ["name"]
},
output_schema={
"type": "object",
"properties": {
"success": {"type": "boolean"},
"results": {"type": "object"}
}
},
handler=self._test_managed_server
)
# Register all tools
tools = [
print_tool, list_tool, clear_tool,
register_server_tool, list_servers_tool, test_server_tool
]
for tool in tools:
asyncio.create_task(self.registry.register_tool(tool))
async def _print_message(self, arguments):
"""Handle print message tool."""
message = arguments.get("message", "")
level = arguments.get("level", "info")
if not message:
return {
"success": False,
"error": "Message is required"
}
# Add message to storage
msg_data = {
"message": message,
"level": level,
"timestamp": datetime.now().isoformat()
}
self.messages.append(msg_data)
# Keep only last 100 messages
if len(self.messages) > 100:
self.messages = self.messages[-100:]
logger.info(f"PRINT [{level.upper()}]: {message}")
return {
"success": True,
"message": f"Message printed: {message}",
"timestamp": msg_data["timestamp"]
}
async def _list_messages(self, arguments):
"""Handle list messages tool."""
limit = arguments.get("limit", 10)
# Get recent messages
recent_messages = self.messages[-limit:] if self.messages else []
return {
"messages": recent_messages
}
async def _clear_messages(self, arguments):
"""Handle clear messages tool."""
count = len(self.messages)
self.messages.clear()
return {
"success": True,
"count": count
}
async def _register_managed_server(self, arguments):
"""Handle register managed server tool."""
name = arguments.get("name")
host = arguments.get("host")
port = arguments.get("port")
description = arguments.get("description", "")
if not all([name, host, port]):
return {
"success": False,
"error": "Name, host, and port are required"
}
# Register the server
self.managed_servers[name] = {
"name": name,
"host": host,
"port": port,
"description": description,
"status": "registered"
}
logger.info(f"Registered managed server: {name} at {host}:{port}")
return {
"success": True,
"message": f"Server '{name}' registered successfully"
}
async def _list_managed_servers(self, arguments):
"""Handle list managed servers tool."""
servers = list(self.managed_servers.values())
return {
"servers": servers
}
async def _test_managed_server(self, arguments):
"""Handle test managed server tool."""
name = arguments.get("name")
if name not in self.managed_servers:
return {
"success": False,
"error": f"Server '{name}' not found"
}
server_info = self.managed_servers[name]
try:
# Create tester
tester = MCPServerTester()
# Test the server
results = await tester.test_server(
host=server_info["host"],
port=server_info["port"]
)
# Update status
self.managed_servers[name]["status"] = "tested"
return {
"success": True,
"results": results
}
except Exception as e:
self.managed_servers[name]["status"] = "error"
return {
"success": False,
"error": str(e)
}
async def main():
"""Start the enhanced MCP server."""
print("๐ Starting Enhanced MCP Testing Harness")
print("=" * 45)
try:
# Create server
server = EnhancedMCPServer(host="localhost", port=8000, debug=True)
print(f"๐ก Server will start on localhost:8000")
print("๐ Starting server...")
# Start the server
await server.start()
except KeyboardInterrupt:
print("\n๐ Server stopped by user")
except Exception as e:
print(f"โ Error starting server: {e}")
sys.exit(1)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n๐ Server stopped")
except Exception as e:
print(f"โ Fatal error: {e}")
sys.exit(1)