#!/usr/bin/env python3
"""
MCP Server Diagnostics Module
Comprehensive diagnostic tools for monitoring server health,
performance metrics, and troubleshooting common issues.
Author: Kommandant
Version: 1.0.0
License: MIT
"""
import json
import logging
import os
import platform
import subprocess
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
import psutil
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class DiagnosticResult:
"""Diagnostic test result."""
name: str
status: str # 'pass', 'fail', 'warning', 'info'
message: str
details: Optional[Dict[str, Any]] = None
timestamp: float = field(default_factory=time.time)
@dataclass
class SystemInfo:
"""System information data."""
os_name: str
os_version: str
python_version: str
python_executable: str
working_directory: str
user: str
hostname: str
class DiagnosticSuite:
"""
Comprehensive diagnostic suite for MCP server.
Provides health checks, performance monitoring, and troubleshooting tools.
"""
def __init__(self):
"""Initialize diagnostic suite."""
self.results: List[DiagnosticResult] = []
self.start_time = time.time()
def run_all_diagnostics(self) -> List[DiagnosticResult]:
"""
Run all diagnostic tests.
Returns:
List of diagnostic results
"""
logger.info("Starting comprehensive diagnostics...")
self.check_system_info()
self.check_python_version()
self.check_dependencies()
self.check_required_files()
self.check_file_permissions()
self.check_configuration_files()
self.check_server_imports()
self.check_server_syntax()
self.check_disk_space()
self.check_memory_usage()
self.check_vscode_running()
self.check_network_connectivity()
logger.info(f"Diagnostics completed in {time.time() - self.start_time:.2f}s")
return self.results
def add_result(
self,
name: str,
status: str,
message: str,
details: Optional[Dict[str, Any]] = None
) -> None:
"""Add a diagnostic result."""
result = DiagnosticResult(
name=name,
status=status,
message=message,
details=details
)
self.results.append(result)
log_msg = f"{name}: {message}"
if status == 'pass':
logger.info(f"OK {log_msg}")
elif status == 'fail':
logger.error(f"FAIL {log_msg}")
elif status == 'warning':
logger.warning(f"WARN {log_msg}")
else:
logger.info(f"INFO {log_msg}")
def check_system_info(self) -> None:
"""Check and report system information."""
try:
info = SystemInfo(
os_name=platform.system(),
os_version=platform.version(),
python_version=platform.python_version(),
python_executable=sys.executable,
working_directory=os.getcwd(),
user=os.getenv('USER', os.getenv('USERNAME', 'unknown')),
hostname=platform.node()
)
self.add_result(
"System Information",
"info",
f"{info.os_name} {info.os_version}",
{
"python_version": info.python_version,
"python_path": info.python_executable,
"working_dir": info.working_directory,
"user": info.user,
"hostname": info.hostname
}
)
except Exception as e:
self.add_result(
"System Information",
"warning",
f"Could not gather system info: {e}"
)
def check_python_version(self) -> None:
"""Check Python version compatibility."""
version_info = sys.version_info
version_str = f"{version_info.major}.{version_info.minor}.{version_info.micro}"
if version_info >= (3, 8):
self.add_result(
"Python Version",
"pass",
f"Python {version_str} (compatible)",
{"required": "3.8+", "current": version_str}
)
else:
self.add_result(
"Python Version",
"fail",
f"Python {version_str} is too old (requires 3.8+)",
{"required": "3.8+", "current": version_str}
)
def check_dependencies(self) -> None:
"""Check for required Python modules."""
required_modules = [
'json', 'pathlib', 'subprocess', 'asyncio',
'hashlib', 'logging', 'dataclasses', 'typing'
]
missing = []
for module in required_modules:
try:
__import__(module)
except ImportError:
missing.append(module)
if not missing:
self.add_result(
"Python Dependencies",
"pass",
"All required modules available",
{"checked": len(required_modules)}
)
else:
self.add_result(
"Python Dependencies",
"fail",
f"Missing modules: {', '.join(missing)}",
{"missing": missing}
)
def check_required_files(self) -> None:
"""Check if all required files exist."""
required_files = [
'mcp_server.py',
'tools.py',
'vscode_detector.py',
'languages.json'
]
missing = []
found = []
for filename in required_files:
if Path(filename).exists():
found.append(filename)
else:
missing.append(filename)
if not missing:
self.add_result(
"Required Files",
"pass",
f"All {len(required_files)} required files found",
{"files": found}
)
else:
self.add_result(
"Required Files",
"fail",
f"Missing files: {', '.join(missing)}",
{"missing": missing, "found": found}
)
def check_file_permissions(self) -> None:
"""Check file permissions."""
files_to_check = ['mcp_server.py', 'tools.py', 'vscode_detector.py']
permission_issues = []
for filename in files_to_check:
filepath = Path(filename)
if filepath.exists():
if not os.access(filepath, os.R_OK):
permission_issues.append(f"{filename} (not readable)")
if platform.system() != 'Windows':
if filename.endswith('.py') and not os.access(filepath, os.X_OK):
permission_issues.append(f"{filename} (not executable)")
if not permission_issues:
self.add_result(
"File Permissions",
"pass",
"All files have correct permissions"
)
else:
self.add_result(
"File Permissions",
"warning",
f"Permission issues: {', '.join(permission_issues)}",
{"issues": permission_issues}
)
def check_configuration_files(self) -> None:
"""Check configuration file validity."""
try:
with open('languages.json', 'r', encoding='utf-8') as f:
config = json.load(f)
if 'languages' in config:
lang_count = len(config['languages'])
self.add_result(
"Language Configuration",
"pass",
f"Valid configuration with {lang_count} languages",
{"languages": lang_count}
)
else:
self.add_result(
"Language Configuration",
"warning",
"Configuration missing 'languages' key"
)
except FileNotFoundError:
self.add_result(
"Language Configuration",
"fail",
"languages.json not found"
)
except json.JSONDecodeError as e:
self.add_result(
"Language Configuration",
"fail",
f"Invalid JSON in languages.json: {e}"
)
def check_server_imports(self) -> None:
"""Check if server modules can be imported."""
modules_to_test = [
('tools', 'tools.py'),
('vscode_detector', 'vscode_detector.py')
]
import_issues = []
for module_name, filename in modules_to_test:
if Path(filename).exists():
try:
__import__(module_name)
except Exception as e:
import_issues.append(f"{module_name}: {str(e)}")
if not import_issues:
self.add_result(
"Module Imports",
"pass",
"All modules import successfully"
)
else:
self.add_result(
"Module Imports",
"fail",
f"Import errors: {'; '.join(import_issues)}",
{"errors": import_issues}
)
def check_server_syntax(self) -> None:
"""Check Python files for syntax errors."""
files_to_check = ['mcp_server.py', 'tools.py', 'vscode_detector.py']
syntax_errors = []
for filename in files_to_check:
if Path(filename).exists():
try:
with open(filename, 'r', encoding='utf-8') as f:
compile(f.read(), filename, 'exec')
except SyntaxError as e:
syntax_errors.append(f"{filename}: Line {e.lineno}")
if not syntax_errors:
self.add_result(
"Syntax Check",
"pass",
"No syntax errors found"
)
else:
self.add_result(
"Syntax Check",
"fail",
f"Syntax errors: {'; '.join(syntax_errors)}",
{"errors": syntax_errors}
)
def check_disk_space(self) -> None:
"""Check available disk space."""
try:
if platform.system() == 'Windows':
import ctypes
free_bytes = ctypes.c_ulonglong(0)
ctypes.windll.kernel32.GetDiskFreeSpaceExW(
ctypes.c_wchar_p(os.getcwd()),
None,
None,
ctypes.pointer(free_bytes)
)
free_mb = free_bytes.value / (1024 * 1024)
else:
stat = os.statvfs(os.getcwd())
free_mb = (stat.f_bavail * stat.f_frsize) / (1024 * 1024)
if free_mb > 100:
self.add_result(
"Disk Space",
"pass",
f"{free_mb:.0f} MB available",
{"free_mb": int(free_mb)}
)
else:
self.add_result(
"Disk Space",
"warning",
f"Low disk space: {free_mb:.0f} MB",
{"free_mb": int(free_mb)}
)
except Exception as e:
self.add_result(
"Disk Space",
"warning",
f"Could not check disk space: {e}"
)
def check_memory_usage(self) -> None:
"""Check memory usage."""
try:
mem = psutil.virtual_memory()
if mem.percent < 90:
self.add_result(
"Memory Usage",
"pass",
f"{mem.percent:.1f}% used ({mem.available // (1024**2)} MB available)",
{"percent": mem.percent, "available_mb": mem.available // (1024**2)}
)
else:
self.add_result(
"Memory Usage",
"warning",
f"High memory usage: {mem.percent:.1f}%",
{"percent": mem.percent}
)
except ImportError:
self.add_result(
"Memory Usage",
"info",
"psutil not installed (optional)"
)
except Exception as e:
self.add_result(
"Memory Usage",
"warning",
f"Could not check memory: {e}"
)
def check_vscode_running(self) -> None:
"""Check if VSCode is running."""
try:
if platform.system() == 'Windows':
result = subprocess.run(
['tasklist', '/FI', 'IMAGENAME eq Code.exe'],
capture_output=True,
text=True,
timeout=3
)
is_running = 'Code.exe' in result.stdout
else:
result = subprocess.run(
['pgrep', '-x', 'code'],
capture_output=True,
timeout=3
)
is_running = result.returncode == 0
if is_running:
self.add_result(
"VSCode Status",
"pass",
"VSCode is running"
)
else:
self.add_result(
"VSCode Status",
"info",
"VSCode is not running"
)
except Exception as e:
self.add_result(
"VSCode Status",
"warning",
f"Could not check VSCode status: {e}"
)
def check_network_connectivity(self) -> None:
"""Check basic network connectivity."""
try:
import socket
socket.create_connection(("8.8.8.8", 53), timeout=3)
self.add_result(
"Network Connectivity",
"pass",
"Network connection available"
)
except OSError:
self.add_result(
"Network Connectivity",
"warning",
"No network connection detected"
)
except Exception as e:
self.add_result(
"Network Connectivity",
"warning",
f"Could not check network: {e}"
)
def generate_report(self) -> str:
"""Generate a formatted diagnostic report."""
report_lines = [
"",
"=" * 70,
" " * 20 + "DIAGNOSTIC REPORT",
"=" * 70,
"",
f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}",
f"Duration: {time.time() - self.start_time:.2f}s",
""
]
status_counts = {
'pass': 0,
'fail': 0,
'warning': 0,
'info': 0
}
for result in self.results:
status_counts[result.status] = status_counts.get(result.status, 0) + 1
report_lines.extend([
"SUMMARY:",
"-" * 70,
f" Passed: {status_counts['pass']}",
f" Failed: {status_counts['fail']}",
f" Warnings: {status_counts['warning']}",
f" Info: {status_counts['info']}",
""
])
report_lines.extend([
"DETAILED RESULTS:",
"-" * 70,
""
])
for result in self.results:
status_icon = {
'pass': '[OK]',
'fail': '[FAIL]',
'warning': '[WARN]',
'info': '[INFO]'
}.get(result.status, '[?]')
report_lines.append(f"{status_icon} {result.name}")
report_lines.append(f" {result.message}")
if result.details:
for key, value in result.details.items():
report_lines.append(f" {key}: {value}")
report_lines.append("")
report_lines.append("=" * 70)
return "\n".join(report_lines)
def save_report(self, filename: str = "diagnostic_report.txt") -> None:
"""Save diagnostic report to file."""
report = self.generate_report()
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write(report)
logger.info(f"Report saved to: {filename}")
except Exception as e:
logger.error(f"Could not save report: {e}")
def run_quick_diagnostics() -> None:
"""Run quick diagnostic check and print results."""
suite = DiagnosticSuite()
suite.check_python_version()
suite.check_required_files()
suite.check_configuration_files()
suite.check_server_syntax()
print(suite.generate_report())
def run_full_diagnostics(save: bool = True) -> None:
"""Run complete diagnostic suite."""
suite = DiagnosticSuite()
suite.run_all_diagnostics()
print(suite.generate_report())
if save:
suite.save_report()
def get_health_status() -> Dict[str, Any]:
"""Get server health status as JSON."""
suite = DiagnosticSuite()
suite.check_python_version()
suite.check_required_files()
suite.check_server_imports()
has_failures = any(r.status == 'fail' for r in suite.results)
has_warnings = any(r.status == 'warning' for r in suite.results)
if has_failures:
health = 'unhealthy'
elif has_warnings:
health = 'degraded'
else:
health = 'healthy'
return {
'status': health,
'timestamp': time.time(),
'checks': [
{
'name': r.name,
'status': r.status,
'message': r.message
}
for r in suite.results
]
}
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='MCP Server Diagnostics')
parser.add_argument(
'--mode',
choices=['quick', 'full', 'health'],
default='full',
help='Diagnostic mode'
)
parser.add_argument(
'--no-save',
action='store_true',
help='Do not save report to file'
)
args = parser.parse_args()
if args.mode == 'quick':
run_quick_diagnostics()
elif args.mode == 'health':
health = get_health_status()
print(json.dumps(health, indent=2))
else:
run_full_diagnostics(save=not args.no_save)