test_mcp_server.py•13.5 kB
#!/usr/bin/env python3
"""
Comprehensive MCP Server Test Script
This script tests the NetBox MCP Server end-to-end functionality
including tool discovery, tool execution, and error handling.
Usage:
python examples/test_mcp_server.py
"""
import asyncio
import json
import os
import sys
from typing import Any, Dict, List
# Add the src directory to the Python path
src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'src'))
sys.path.insert(0, src_path)
# Import MCP server functions
try:
from server import list_tools, call_tool
except ImportError:
# Fallback for different import paths
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from src.server import list_tools, call_tool
class Colors:
"""ANSI color codes for terminal output."""
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
MAGENTA = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
def print_header(text: str) -> None:
"""Print a formatted header."""
print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*60}{Colors.END}")
print(f"{Colors.BOLD}{Colors.CYAN}{text:^60}{Colors.END}")
print(f"{Colors.BOLD}{Colors.CYAN}{'='*60}{Colors.END}")
def print_success(text: str) -> None:
"""Print success message."""
print(f"{Colors.GREEN}[OK] {text}{Colors.END}")
def print_error(text: str) -> None:
"""Print error message."""
print(f"{Colors.RED}[ERROR] {text}{Colors.END}")
def print_warning(text: str) -> None:
"""Print warning message."""
print(f"{Colors.YELLOW}[WARN] {text}{Colors.END}")
def print_info(text: str) -> None:
"""Print info message."""
print(f"{Colors.BLUE}[INFO] {text}{Colors.END}")
async def test_tool_discovery() -> bool:
"""Test MCP tool discovery functionality."""
print_header("Testing Tool Discovery")
try:
tools = await list_tools()
if not tools:
print_error("No tools discovered")
return False
print_success(f"Discovered {len(tools)} MCP tools")
# Categorize tools
tool_categories = {}
for tool in tools:
category = tool.name.split('_')[0] # hosts, vms, ips, vlans
if category not in tool_categories:
tool_categories[category] = []
tool_categories[category].append(tool.name)
# Display tool categories
for category, tool_names in tool_categories.items():
print_info(f"{category.upper()}: {len(tool_names)} tools")
for tool_name in tool_names:
print(f" - {tool_name}")
# Verify expected tools are present
expected_tools = [
'list_hosts', 'get_host', 'search_hosts',
'list_vms', 'get_vm', 'list_vm_interfaces',
'list_ips', 'get_ip', 'search_ips',
'list_vlans', 'get_vlan', 'list_vlan_ips'
]
missing_tools = []
for expected_tool in expected_tools:
if not any(tool.name == expected_tool for tool in tools):
missing_tools.append(expected_tool)
if missing_tools:
print_error(f"Missing expected tools: {missing_tools}")
return False
print_success("All expected tools are present")
return True
except Exception as e:
print_error(f"Tool discovery failed: {e}")
return False
async def test_tool_execution() -> bool:
"""Test MCP tool execution functionality."""
print_header("Testing Tool Execution")
test_cases = [
{
'name': 'list_hosts',
'args': {'limit': 5, 'include_certainty': True},
'description': 'List hosts with certainty scores'
},
{
'name': 'search_hosts',
'args': {'query': 'test', 'limit': 3},
'description': 'Search hosts by query'
},
{
'name': 'list_vms',
'args': {'limit': 5, 'include_certainty': True},
'description': 'List virtual machines'
},
{
'name': 'list_ips',
'args': {'limit': 5, 'include_certainty': True},
'description': 'List IP addresses'
},
{
'name': 'list_vlans',
'args': {'limit': 5, 'include_certainty': True},
'description': 'List VLANs'
}
]
success_count = 0
total_count = len(test_cases)
for test_case in test_cases:
try:
print_info(f"Testing {test_case['name']}: {test_case['description']}")
result = await call_tool(test_case['name'], test_case['args'])
if not result:
print_error(f"{test_case['name']} returned no results")
continue
if not isinstance(result, list):
print_error(f"{test_case['name']} returned non-list result")
continue
if len(result) == 0:
print_warning(f"{test_case['name']} returned empty results")
continue
# Check result format
if not all(hasattr(item, 'text') for item in result):
print_error(f"{test_case['name']} returned invalid result format")
continue
print_success(f"{test_case['name']} executed successfully")
success_count += 1
except Exception as e:
print_error(f"{test_case['name']} failed: {e}")
success_rate = (success_count / total_count) * 100
print_info(f"Tool execution success rate: {success_count}/{total_count} ({success_rate:.1f}%)")
return success_count == total_count
async def test_error_handling() -> bool:
"""Test MCP tool error handling."""
print_header("Testing Error Handling")
error_test_cases = [
{
'name': 'get_host',
'args': {}, # Missing required hostname
'description': 'Missing required parameter'
},
{
'name': 'get_vm',
'args': {}, # Missing required hostname
'description': 'Missing required parameter'
},
{
'name': 'get_ip',
'args': {}, # Missing required ip_address
'description': 'Missing required parameter'
},
{
'name': 'get_vlan',
'args': {}, # Missing required vlan_id
'description': 'Missing required parameter'
}
]
success_count = 0
total_count = len(error_test_cases)
for test_case in error_test_cases:
try:
print_info(f"Testing {test_case['name']}: {test_case['description']}")
result = await call_tool(test_case['name'], test_case['args'])
if not result:
print_error(f"{test_case['name']} returned no results for error case")
continue
if not isinstance(result, list) or len(result) == 0:
print_error(f"{test_case['name']} returned invalid error response")
continue
# Check if error message is present
error_text = result[0].text.lower()
if 'error' not in error_text and 'required' not in error_text:
print_error(f"{test_case['name']} did not return proper error message")
continue
print_success(f"{test_case['name']} handled error correctly")
success_count += 1
except Exception as e:
print_error(f"{test_case['name']} error handling failed: {e}")
success_rate = (success_count / total_count) * 100
print_info(f"Error handling success rate: {success_count}/{total_count} ({success_rate:.1f}%)")
return success_count == total_count
async def test_performance() -> bool:
"""Test MCP server performance."""
print_header("Testing Performance")
try:
import time
# Test response time
start_time = time.time()
result = await call_tool('list_hosts', {'limit': 10})
end_time = time.time()
response_time = end_time - start_time
if response_time < 1.0:
print_success(f"Response time: {response_time:.3f}s (excellent)")
elif response_time < 5.0:
print_success(f"Response time: {response_time:.3f}s (good)")
else:
print_warning(f"Response time: {response_time:.3f}s (slow)")
# Test concurrent execution
print_info("Testing concurrent execution...")
start_time = time.time()
tasks = [
call_tool('list_hosts', {'limit': 5}),
call_tool('list_vms', {'limit': 5}),
call_tool('list_ips', {'limit': 5}),
call_tool('list_vlans', {'limit': 5})
]
results = await asyncio.gather(*tasks)
end_time = time.time()
concurrent_time = end_time - start_time
if concurrent_time < 2.0:
print_success(f"Concurrent execution time: {concurrent_time:.3f}s (excellent)")
elif concurrent_time < 5.0:
print_success(f"Concurrent execution time: {concurrent_time:.3f}s (good)")
else:
print_warning(f"Concurrent execution time: {concurrent_time:.3f}s (slow)")
# Verify all concurrent tasks completed
if all(len(result) > 0 for result in results):
print_success("All concurrent tasks completed successfully")
else:
print_error("Some concurrent tasks failed")
return False
return True
except Exception as e:
print_error(f"Performance test failed: {e}")
return False
async def test_environment() -> bool:
"""Test environment configuration."""
print_header("Testing Environment Configuration")
# Check environment variables
env_vars = [
'NETBOX_URL',
'VAULT_ADDR',
'POSTGRES_HOST',
'POSTGRES_DB',
'MCP_SERVER_LOG_LEVEL'
]
missing_vars = []
for var in env_vars:
if not os.getenv(var):
missing_vars.append(var)
if missing_vars:
print_warning(f"Missing environment variables: {missing_vars}")
else:
print_success("All required environment variables are set")
# Test external service connectivity
print_info("Testing external service connectivity...")
import aiohttp
services = [
{'name': 'NetBox Mock', 'url': os.getenv('NETBOX_URL', 'http://localhost:8000')},
{'name': 'Vault Mock', 'url': os.getenv('VAULT_ADDR', 'http://localhost:8200')}
]
async with aiohttp.ClientSession() as session:
for service in services:
try:
async with session.get(service['url'], timeout=5) as response:
if response.status == 200:
print_success(f"{service['name']} is accessible")
else:
print_warning(f"{service['name']} returned status {response.status}")
except Exception as e:
print_warning(f"{service['name']} is not accessible: {e}")
return True
async def main():
"""Main test function."""
print_header("NetBox MCP Server Comprehensive Test Suite")
print_info("Starting comprehensive MCP server testing...")
print_info("This test suite validates tool discovery, execution, error handling, and performance.")
# Run all tests
tests = [
("Tool Discovery", test_tool_discovery),
("Tool Execution", test_tool_execution),
("Error Handling", test_error_handling),
("Performance", test_performance),
("Environment", test_environment)
]
results = {}
for test_name, test_func in tests:
try:
result = await test_func()
results[test_name] = result
except Exception as e:
print_error(f"{test_name} test crashed: {e}")
results[test_name] = False
# Display summary
print_header("Test Results Summary")
passed_tests = sum(1 for result in results.values() if result)
total_tests = len(results)
for test_name, result in results.items():
if result:
print_success(f"{test_name}: PASSED")
else:
print_error(f"{test_name}: FAILED")
print(f"\n{Colors.BOLD}Overall Result: {passed_tests}/{total_tests} tests passed{Colors.END}")
if passed_tests == total_tests:
print_success("All tests passed! MCP server is working correctly.")
return True
else:
print_error(f"{total_tests - passed_tests} test(s) failed. Please check the output above.")
return False
if __name__ == "__main__":
try:
result = asyncio.run(main())
sys.exit(0 if result else 1)
except KeyboardInterrupt:
print("\n\n[WARN] Test interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n\n[ERROR] Test suite crashed: {e}")
sys.exit(1)