#!/usr/bin/env python3
"""
Comprehensive MCP Server Test Script
This script tests the NetBox MCP Server end-to-end functionality
including tool discovery, tool execution, and error handling.
Usage:
python examples/test_mcp_server.py
"""
import asyncio
import os
import sys
# Add the src directory to the Python path
src_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "src")
)
sys.path.insert(0, src_path)
# Import MCP server functions
try:
from server import call_tool, list_tools
except ImportError:
# Fallback for different import paths
sys.path.insert(
0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
)
from src.server import call_tool, list_tools
class Colors:
"""ANSI color codes for terminal output."""
GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
WHITE = "\033[97m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
END = "\033[0m"
def print_header(text: str) -> None:
"""Print a formatted header."""
print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*60}{Colors.END}")
print(f"{Colors.BOLD}{Colors.CYAN}{text:^60}{Colors.END}")
print(f"{Colors.BOLD}{Colors.CYAN}{'='*60}{Colors.END}")
def print_success(text: str) -> None:
"""Print success message."""
print(f"{Colors.GREEN}[OK] {text}{Colors.END}")
def print_error(text: str) -> None:
"""Print error message."""
print(f"{Colors.RED}[ERROR] {text}{Colors.END}")
def print_warning(text: str) -> None:
"""Print warning message."""
print(f"{Colors.YELLOW}[WARN] {text}{Colors.END}")
def print_info(text: str) -> None:
"""Print info message."""
print(f"{Colors.BLUE}[INFO] {text}{Colors.END}")
async def test_tool_discovery() -> bool:
"""Test MCP tool discovery functionality."""
print_header("Testing Tool Discovery")
try:
tools = await list_tools()
if not tools:
print_error("No tools discovered")
return False
print_success(f"Discovered {len(tools)} MCP tools")
# Categorize tools
tool_categories = {}
for tool in tools:
category = tool.name.split("_")[0] # hosts, vms, ips, vlans
if category not in tool_categories:
tool_categories[category] = []
tool_categories[category].append(tool.name)
# Display tool categories
for category, tool_names in tool_categories.items():
print_info(f"{category.upper()}: {len(tool_names)} tools")
for tool_name in tool_names:
print(f" - {tool_name}")
# Verify expected tools are present
expected_tools = [
"list_hosts",
"get_host",
"search_hosts",
"list_vms",
"get_vm",
"list_vm_interfaces",
"list_ips",
"get_ip",
"search_ips",
"list_vlans",
"get_vlan",
"list_vlan_ips",
]
missing_tools = []
for expected_tool in expected_tools:
if not any(tool.name == expected_tool for tool in tools):
missing_tools.append(expected_tool)
if missing_tools:
print_error(f"Missing expected tools: {missing_tools}")
return False
print_success("All expected tools are present")
return True
except Exception as e:
print_error(f"Tool discovery failed: {e}")
return False
async def test_tool_execution() -> bool:
"""Test MCP tool execution functionality."""
print_header("Testing Tool Execution")
test_cases = [
{
"name": "list_hosts",
"args": {"limit": 5, "include_certainty": True},
"description": "List hosts with certainty scores",
},
{
"name": "search_hosts",
"args": {"query": "test", "limit": 3},
"description": "Search hosts by query",
},
{
"name": "list_vms",
"args": {"limit": 5, "include_certainty": True},
"description": "List virtual machines",
},
{
"name": "list_ips",
"args": {"limit": 5, "include_certainty": True},
"description": "List IP addresses",
},
{
"name": "list_vlans",
"args": {"limit": 5, "include_certainty": True},
"description": "List VLANs",
},
]
success_count = 0
total_count = len(test_cases)
for test_case in test_cases:
try:
print_info(
f"Testing {test_case['name']}: {test_case['description']}"
)
result = await call_tool(test_case["name"], test_case["args"])
if not result:
print_error(f"{test_case['name']} returned no results")
continue
if not isinstance(result, list):
print_error(f"{test_case['name']} returned non-list result")
continue
if len(result) == 0:
print_warning(f"{test_case['name']} returned empty results")
continue
# Check result format
if not all(hasattr(item, "text") for item in result):
print_error(
f"{test_case['name']} returned invalid result format"
)
continue
print_success(f"{test_case['name']} executed successfully")
success_count += 1
except Exception as e:
print_error(f"{test_case['name']} failed: {e}")
success_rate = (success_count / total_count) * 100
print_info(
f"Tool execution success rate: {success_count}/{total_count} ({success_rate:.1f}%)"
)
return success_count == total_count
async def test_error_handling() -> bool:
"""Test MCP tool error handling."""
print_header("Testing Error Handling")
error_test_cases = [
{
"name": "get_host",
"args": {}, # Missing required hostname
"description": "Missing required parameter",
},
{
"name": "get_vm",
"args": {}, # Missing required hostname
"description": "Missing required parameter",
},
{
"name": "get_ip",
"args": {}, # Missing required ip_address
"description": "Missing required parameter",
},
{
"name": "get_vlan",
"args": {}, # Missing required vlan_id
"description": "Missing required parameter",
},
]
success_count = 0
total_count = len(error_test_cases)
for test_case in error_test_cases:
try:
print_info(
f"Testing {test_case['name']}: {test_case['description']}"
)
result = await call_tool(test_case["name"], test_case["args"])
if not result:
print_error(
f"{test_case['name']} returned no results for error case"
)
continue
if not isinstance(result, list) or len(result) == 0:
print_error(
f"{test_case['name']} returned invalid error response"
)
continue
# Check if error message is present
error_text = result[0].text.lower()
if "error" not in error_text and "required" not in error_text:
print_error(
f"{test_case['name']} did not return proper error message"
)
continue
print_success(f"{test_case['name']} handled error correctly")
success_count += 1
except Exception as e:
print_error(f"{test_case['name']} error handling failed: {e}")
success_rate = (success_count / total_count) * 100
print_info(
f"Error handling success rate: {success_count}/{total_count} ({success_rate:.1f}%)"
)
return success_count == total_count
async def test_performance() -> bool:
"""Test MCP server performance."""
print_header("Testing Performance")
try:
import time
# Test response time
start_time = time.time()
await call_tool("list_hosts", {"limit": 10})
end_time = time.time()
response_time = end_time - start_time
if response_time < 1.0:
print_success(f"Response time: {response_time:.3f}s (excellent)")
elif response_time < 5.0:
print_success(f"Response time: {response_time:.3f}s (good)")
else:
print_warning(f"Response time: {response_time:.3f}s (slow)")
# Test concurrent execution
print_info("Testing concurrent execution...")
start_time = time.time()
tasks = [
call_tool("list_hosts", {"limit": 5}),
call_tool("list_vms", {"limit": 5}),
call_tool("list_ips", {"limit": 5}),
call_tool("list_vlans", {"limit": 5}),
]
results = await asyncio.gather(*tasks)
end_time = time.time()
concurrent_time = end_time - start_time
if concurrent_time < 2.0:
print_success(
f"Concurrent execution time: {concurrent_time:.3f}s (excellent)"
)
elif concurrent_time < 5.0:
print_success(
f"Concurrent execution time: {concurrent_time:.3f}s (good)"
)
else:
print_warning(
f"Concurrent execution time: {concurrent_time:.3f}s (slow)"
)
# Verify all concurrent tasks completed
if all(len(result) > 0 for result in results):
print_success("All concurrent tasks completed successfully")
else:
print_error("Some concurrent tasks failed")
return False
return True
except Exception as e:
print_error(f"Performance test failed: {e}")
return False
async def test_environment() -> bool:
"""Test environment configuration."""
print_header("Testing Environment Configuration")
# Check environment variables
env_vars = [
"NETBOX_URL",
"VAULT_ADDR",
"POSTGRES_HOST",
"POSTGRES_DB",
"MCP_SERVER_LOG_LEVEL",
]
missing_vars = []
for var in env_vars:
if not os.getenv(var):
missing_vars.append(var)
if missing_vars:
print_warning(f"Missing environment variables: {missing_vars}")
else:
print_success("All required environment variables are set")
# Test external service connectivity
print_info("Testing external service connectivity...")
import aiohttp
services = [
{
"name": "NetBox Mock",
"url": os.getenv("NETBOX_URL", "http://localhost:8000"),
},
{
"name": "Vault Mock",
"url": os.getenv("VAULT_ADDR", "http://localhost:8200"),
},
]
async with aiohttp.ClientSession() as session:
for service in services:
try:
async with session.get(service["url"], timeout=5) as response:
if response.status == 200:
print_success(f"{service['name']} is accessible")
else:
print_warning(
f"{service['name']} returned status {response.status}"
)
except Exception as e:
print_warning(f"{service['name']} is not accessible: {e}")
return True
async def main():
"""Main test function."""
print_header("NetBox MCP Server Comprehensive Test Suite")
print_info("Starting comprehensive MCP server testing...")
print_info(
"This test suite validates tool discovery, execution, error handling, and performance."
)
# Run all tests
tests = [
("Tool Discovery", test_tool_discovery),
("Tool Execution", test_tool_execution),
("Error Handling", test_error_handling),
("Performance", test_performance),
("Environment", test_environment),
]
results = {}
for test_name, test_func in tests:
try:
result = await test_func()
results[test_name] = result
except Exception as e:
print_error(f"{test_name} test crashed: {e}")
results[test_name] = False
# Display summary
print_header("Test Results Summary")
passed_tests = sum(1 for result in results.values() if result)
total_tests = len(results)
for test_name, result in results.items():
if result:
print_success(f"{test_name}: PASSED")
else:
print_error(f"{test_name}: FAILED")
print(
f"\n{Colors.BOLD}Overall Result: {passed_tests}/{total_tests} tests passed{Colors.END}"
)
if passed_tests == total_tests:
print_success("All tests passed! MCP server is working correctly.")
return True
else:
print_error(
f"{total_tests - passed_tests} test(s) failed. Please check the output above."
)
return False
if __name__ == "__main__":
try:
result = asyncio.run(main())
sys.exit(0 if result else 1)
except KeyboardInterrupt:
print("\n\n[WARN] Test interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n\n[ERROR] Test suite crashed: {e}")
sys.exit(1)