Skip to main content
Glama
ping_mcp_server.py34 kB
#!/usr/bin/env python3 """ Ping MCP Server v1.0 Provides network connectivity testing via ICMP ping across homelab infrastructure Reads host configuration from Ansible inventory with fallback to .env Features: - Ping individual hosts by name - Ping entire Ansible groups - Ping all hosts - Custom timeout and packet count - Cross-platform support (Windows/Linux/macOS) """ import asyncio import json import logging import os import platform import re import sys from pathlib import Path from typing import Dict, List, Set, Tuple, Union logging.basicConfig(level=logging.INFO, stream=sys.stderr) logger = logging.getLogger(__name__) import mcp.server.stdio import mcp.types as types from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions from ansible_config_manager import AnsibleConfigManager from mcp_config_loader import load_env_file, load_indexed_env_vars, COMMON_ALLOWED_ENV_VARS server = Server("ping-info") # Load .env with security hardening SCRIPT_DIR = Path(__file__).parent ENV_FILE = SCRIPT_DIR / ".env" PING_ALLOWED_VARS = COMMON_ALLOWED_ENV_VARS | { "PING_*", # Pattern for ping-specific variables if needed } # Only load env file at module level if not in unified mode if not os.getenv("MCP_UNIFIED_MODE"): load_env_file(ENV_FILE, allowed_vars=PING_ALLOWED_VARS, strict=True) # Configuration ANSIBLE_INVENTORY_PATH = os.getenv("ANSIBLE_INVENTORY_PATH", "") logger.info(f"Ansible inventory: {ANSIBLE_INVENTORY_PATH}") # Global inventory cache INVENTORY_DATA = None def load_ping_targets_from_env(): """ Fallback: Load ping targets from environment variables. Returns dict with hosts and groups in same format as Ansible inventory. Expects environment variables like: - PING_TARGET1=8.8.8.8 - PING_TARGET1_NAME=Google-DNS - PING_TARGET2=1.1.1.1 - PING_TARGET2_NAME=Cloudflare-DNS """ # Use generic function to parse indexed environment variables indexed_targets = load_indexed_env_vars( prefix="PING_TARGET", name_suffix="_NAME", target_suffix="", logger_obj=logger ) # Convert generic format to Ansible-like format hosts = {} for index, target_info in indexed_targets.items(): target = target_info["target"] name = target_info["name"] if not target: logger.warning(f"PING_TARGET{index} name defined but no target IP/hostname provided") continue # Use provided name or derive from target hostname = name if name else f"ping-target-{index}" hosts[hostname] = { "groups": ["env_targets"], "vars": { "ansible_host": target } } logger.info(f"Added ping target: {hostname} ({target})") if not hosts: logger.warning("No ping targets found in environment variables") return {"hosts": {}, "groups": {}} logger.info(f"Loaded {len(hosts)} ping targets from environment variables") # Return in same format as Ansible inventory return { "hosts": hosts, "groups": {"env_targets": list(hosts.keys())} } def load_ansible_inventory(inventory=None): """ Load and cache the Ansible inventory with full variable inheritance. Falls back to environment variables if Ansible inventory not found. Uses centralized AnsibleConfigManager which implements proper two-pass variable inheritance (group vars → child vars → host vars). Args: inventory: Optional pre-loaded Ansible inventory dict (avoids file locking in unified mode) Returns: Dict with structure: {"hosts": {...}, "groups": {...}} """ global INVENTORY_DATA # Use cached data if available if INVENTORY_DATA is not None: return INVENTORY_DATA # Get ansible inventory path ansible_inventory_path = os.getenv("ANSIBLE_INVENTORY_PATH", "") if not ansible_inventory_path or not Path(ansible_inventory_path).exists(): logger.warning(f"Ansible inventory not found at: {ansible_inventory_path}") logger.info("Attempting to load ping targets from environment variables") INVENTORY_DATA = load_ping_targets_from_env() if INVENTORY_DATA and INVENTORY_DATA.get("hosts"): logger.info(f"Loaded {len(INVENTORY_DATA['hosts'])} ping targets from environment") return INVENTORY_DATA logger.error("No ping targets configured in Ansible inventory or environment variables") return {"hosts": {}, "groups": {}} # Use centralized config manager manager = AnsibleConfigManager( inventory_path=ansible_inventory_path, logger_obj=logger ) if not manager.is_available(): logger.warning("Ansible inventory not accessible via AnsibleConfigManager") logger.info("Attempting to load ping targets from environment variables as fallback") INVENTORY_DATA = load_ping_targets_from_env() if INVENTORY_DATA and INVENTORY_DATA.get("hosts"): logger.info(f"Loaded {len(INVENTORY_DATA['hosts'])} ping targets from environment variables") return INVENTORY_DATA return {"hosts": {}, "groups": {}} # Get all hosts with proper inheritance INVENTORY_DATA = manager.get_all_hosts_with_inheritance() if not INVENTORY_DATA.get("hosts"): logger.warning("No hosts found in Ansible inventory, falling back to environment variables") INVENTORY_DATA = load_ping_targets_from_env() if INVENTORY_DATA and INVENTORY_DATA.get("hosts"): logger.info(f"Loaded {len(INVENTORY_DATA['hosts'])} ping targets from environment variables") return INVENTORY_DATA logger.info( f"Loaded {len(INVENTORY_DATA['hosts'])} hosts and {len(INVENTORY_DATA['groups'])} groups " f"with variable inheritance" ) return INVENTORY_DATA def get_host_ip(hostname: str, host_data: dict) -> str: """ Extract IP address or hostname for pinging Checks: ansible_host var, static_ip var, or uses hostname directly """ # Check for ansible_host variable if "vars" in host_data and "ansible_host" in host_data["vars"]: return host_data["vars"]["ansible_host"] # Check for static_ip variable if "vars" in host_data and "static_ip" in host_data["vars"]: return host_data["vars"]["static_ip"] # Handle special case: hostname with port (e.g., hostname.example.com:2222) if ":" in hostname: hostname = hostname.split(":")[0] # Use hostname directly return hostname async def ping_host(host: str, count: int = 4, timeout: int = 5) -> Dict: """ Ping a single host using system ping command Args: host: Hostname or IP address to ping count: Number of ping packets to send timeout: Timeout in seconds Returns: Dict with status, stats, and error info """ system = platform.system().lower() # Build platform-specific ping command if system == "windows": cmd = ["ping", "-n", str(count), "-w", str(timeout * 1000), host] else: # Linux, macOS, etc. cmd = ["ping", "-c", str(count), "-W", str(timeout), host] try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout + 5 ) output = stdout.decode("utf-8", errors="ignore") # Parse output for statistics result = { "host": host, "reachable": process.returncode == 0, "packets_sent": count, "packets_received": 0, "packet_loss": 100.0, "rtt_min": None, "rtt_avg": None, "rtt_max": None, } if process.returncode == 0: # Parse statistics from output if system == "windows": # Windows format: "Packets: Sent = 4, Received = 4, Lost = 0 (0% loss)" match = re.search(r"Received = (\d+)", output) if match: result["packets_received"] = int(match.group(1)) result["packet_loss"] = ( (count - result["packets_received"]) / count ) * 100 # Parse RTT: "Minimum = 1ms, Maximum = 2ms, Average = 1ms" min_match = re.search(r"Minimum = (\d+)ms", output) max_match = re.search(r"Maximum = (\d+)ms", output) avg_match = re.search(r"Average = (\d+)ms", output) if min_match: result["rtt_min"] = float(min_match.group(1)) if max_match: result["rtt_max"] = float(max_match.group(1)) if avg_match: result["rtt_avg"] = float(avg_match.group(1)) else: # Unix format: "4 packets transmitted, 4 received, 0% packet loss" match = re.search(r"(\d+) received", output) if match: result["packets_received"] = int(match.group(1)) result["packet_loss"] = ( (count - result["packets_received"]) / count ) * 100 # Parse RTT: "rtt min/avg/max/mdev = 1.234/2.345/3.456/0.123 ms" rtt_match = re.search( r"rtt min/avg/max[/\w]* = ([\d.]+)/([\d.]+)/([\d.]+)", output ) if rtt_match: result["rtt_min"] = float(rtt_match.group(1)) result["rtt_avg"] = float(rtt_match.group(2)) result["rtt_max"] = float(rtt_match.group(3)) else: result["error"] = f"Ping failed with return code {process.returncode}" return result except asyncio.TimeoutError: return { "host": host, "reachable": False, "error": f"Ping timeout after {timeout + 5} seconds", } except Exception as e: logger.error(f"Ping exception for {host}: {e}", exc_info=True) return { "host": host, "reachable": False, "error": f"{type(e).__name__}: {str(e)}", } def format_ping_result(result: Dict) -> str: """Format a single ping result for display""" output = [] if result["reachable"]: output.append(f"✓ {result['host']}: REACHABLE") if result.get("packets_received") is not None: output.append( f" Packets: {result['packets_received']}/{result['packets_sent']} received ({result['packet_loss']:.1f}% loss)" ) if result.get("rtt_avg") is not None: output.append( f" RTT: min={result['rtt_min']:.2f}ms avg={result['rtt_avg']:.2f}ms max={result['rtt_max']:.2f}ms" ) else: output.append(f"✗ {result['host']}: UNREACHABLE") if "error" in result: output.append(f" Error: {result['error']}") return "\n".join(output) def format_inventory_error(item_type: str, requested_name: str, inventory: dict, discovery_tool_base: str) -> str: """ Format a helpful error message when a host/group is not found. Shows first 10 available options + count of remaining, suggests discovery tool. Args: item_type: "host" or "group" requested_name: The name that was requested but not found inventory: The full inventory dict discovery_tool_base: Base name of tool (e.g., "list_hosts" or "list_groups") Returns: Formatted error message with suggestions """ if item_type == "host": available = sorted(inventory["hosts"].keys()) container_name = "hosts" elif item_type == "group": available = sorted(inventory["groups"].keys()) container_name = "groups" else: return f"Error: {item_type} '{requested_name}' not found in inventory" error_msg = f"Error: {item_type.capitalize()} '{requested_name}' not found in inventory.\n\n" error_msg += f"Available {container_name} ({len(available)} total):\n" # Show first 10, then "and X more" for item in available[:10]: error_msg += f" • {item}\n" if len(available) > 10: error_msg += f" • ... and {len(available) - 10} more\n" # Suggest both tool names (standalone and unified modes) tool_names = f"'{discovery_tool_base}'" if not discovery_tool_base.startswith("ping_"): # Add unified mode variant if this is the base name tool_names += f" or 'ping_{discovery_tool_base}'" error_msg += f"\nRun {tool_names} to see all available {container_name}." return error_msg class PingMCPServer: """Ping MCP Server - Class-based implementation""" def __init__(self, ansible_inventory=None, ansible_config=None): """Initialize configuration using existing config loading logic Args: ansible_inventory: Optional pre-loaded Ansible inventory dict (for unified mode) ansible_config: Optional AnsibleConfigManager instance (for enum generation) """ # Load environment configuration (skip if in unified mode) if not os.getenv("MCP_UNIFIED_MODE"): load_env_file(ENV_FILE, allowed_vars=PING_ALLOWED_VARS, strict=True) self.ansible_inventory_path = os.getenv("ANSIBLE_INVENTORY_PATH", "") logger.info(f"[PingMCPServer] Ansible inventory: {self.ansible_inventory_path}") # Store config manager for enum generation self.ansible_config = ansible_config # Load inventory (with caching) self.inventory_data = None self.ansible_inventory = ansible_inventory # Store pre-loaded inventory self._load_inventory() def _load_inventory(self): """Load Ansible inventory (internal method)""" # Use pre-loaded inventory if available, otherwise load it self.inventory_data = load_ansible_inventory(self.ansible_inventory) logger.info(f"[PingMCPServer] Loaded {len(self.inventory_data['hosts'])} hosts") async def list_tools(self) -> list[types.Tool]: """Return list of Tool objects this server provides (with ping_ prefix)""" # Get dynamic enums from Ansible inventory ansible_groups = [] if self.ansible_config and self.ansible_config.is_available(): ansible_groups = self.ansible_config.get_all_groups() # Build group parameter schema with optional enum group_property = { "type": "string", "description": "Ansible group name from your inventory", } if ansible_groups: group_property["enum"] = ansible_groups return [ types.Tool( name="ping_ping_host", description="Ping a specific host by hostname from Ansible inventory", inputSchema={ "type": "object", "properties": { "hostname": { "type": "string", "description": "Hostname from Ansible inventory (e.g., 'server1.example.com', 'server2.example.com')", }, "count": { "type": "integer", "description": "Number of ping packets to send (default: 4)", "default": 4, }, "timeout": { "type": "integer", "description": "Timeout in seconds per ping (default: 5)", "default": 5, }, }, "required": ["hostname"], }, title="Ping Host", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=False, openWorldHint=True, ) ), types.Tool( name="ping_ping_group", description="Ping all hosts in an Ansible group", inputSchema={ "type": "object", "properties": { "group": group_property, "count": { "type": "integer", "description": "Number of ping packets to send (default: 2)", "default": 2, }, "timeout": { "type": "integer", "description": "Timeout in seconds per ping (default: 3)", "default": 3, }, }, "required": ["group"], }, title="Ping Group", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=False, openWorldHint=True, ) ), types.Tool( name="ping_ping_all", description="Ping all hosts in the infrastructure", inputSchema={ "type": "object", "properties": { "count": { "type": "integer", "description": "Number of ping packets to send (default: 2)", "default": 2, }, "timeout": { "type": "integer", "description": "Timeout in seconds per ping (default: 3)", "default": 3, }, }, "required": [], }, title="Ping All Hosts", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=False, openWorldHint=True, ) ), types.Tool( name="ping_list_groups", description="List all available Ansible groups for pinging (call this first to discover valid group names)", inputSchema={"type": "object", "properties": {}, "required": []}, title="List Ansible Groups", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=True, openWorldHint=True, ) ), types.Tool( name="ping_list_hosts", description="List all hosts in the Ansible inventory with their resolved IPs (call this first to discover valid hostnames)", inputSchema={"type": "object", "properties": {}, "required": []}, title="List Inventory Hosts", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=True, openWorldHint=True, ) ), types.Tool( name="ping_reload_inventory", description="Reload Ansible inventory from disk (useful after inventory changes)", inputSchema={"type": "object", "properties": {}, "required": []}, title="Reload Inventory", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=False, openWorldHint=True, ) ), ] async def handle_tool(self, tool_name: str, arguments: dict | None) -> list[types.TextContent]: """Route tool calls to appropriate handler methods""" # Strip the ping_ prefix for routing name = tool_name.replace("ping_", "", 1) if tool_name.startswith("ping_") else tool_name logger.info(f"[PingMCPServer] Tool called: {tool_name} -> {name} with args: {arguments}") # Call the shared implementation with this instance's inventory return await handle_call_tool_impl(name, arguments, self.inventory_data, self._reload_inventory_impl) async def _reload_inventory_impl(self): """Reload inventory for this instance""" self.inventory_data = None self._load_inventory() return self.inventory_data @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available ping tools""" return [ types.Tool( name="ping_host", description="Ping a specific host by hostname from Ansible inventory", inputSchema={ "type": "object", "properties": { "hostname": { "type": "string", "description": "Hostname from Ansible inventory (e.g., 'server1.example.com', 'server2.example.com')", }, "count": { "type": "integer", "description": "Number of ping packets to send (default: 4)", "default": 4, }, "timeout": { "type": "integer", "description": "Timeout in seconds per ping (default: 5)", "default": 5, }, }, "required": ["hostname"], }, title="Ping Host", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=False, openWorldHint=True, ) ), types.Tool( name="ping_group", description="Ping all hosts in an Ansible group", inputSchema={ "type": "object", "properties": { "group": { "type": "string", "description": "Ansible group name (e.g., 'webservers', 'databases', 'docker_hosts')", }, "count": { "type": "integer", "description": "Number of ping packets to send (default: 2)", "default": 2, }, "timeout": { "type": "integer", "description": "Timeout in seconds per ping (default: 3)", "default": 3, }, }, "required": ["group"], }, title="Ping Group", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=False, openWorldHint=True, ) ), types.Tool( name="ping_all", description="Ping all hosts in the infrastructure", inputSchema={ "type": "object", "properties": { "count": { "type": "integer", "description": "Number of ping packets to send (default: 2)", "default": 2, }, "timeout": { "type": "integer", "description": "Timeout in seconds per ping (default: 3)", "default": 3, }, }, "required": [], }, title="Ping All Hosts", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=False, openWorldHint=True, ) ), types.Tool( name="list_groups", description="List all available Ansible groups for pinging (call this first to discover valid group names)", inputSchema={"type": "object", "properties": {}, "required": []}, title="List Ansible Groups", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=True, openWorldHint=True, ) ), types.Tool( name="list_hosts", description="List all hosts in the Ansible inventory with their resolved IPs (call this first to discover valid hostnames)", inputSchema={"type": "object", "properties": {}, "required": []}, title="List Inventory Hosts", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=True, openWorldHint=True, ) ), types.Tool( name="reload_inventory", description="Reload Ansible inventory from disk (useful after inventory changes)", inputSchema={"type": "object", "properties": {}, "required": []}, title="Reload Inventory", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=False, openWorldHint=True, ) ), ] async def handle_call_tool_impl( name: str, arguments: dict | None, inventory: dict, reload_inventory_func=None ) -> list[types.TextContent]: """Core tool execution logic that can be called by both class and module-level handlers""" try: if name == "list_groups": output = "=== AVAILABLE ANSIBLE GROUPS ===\n\n" if not inventory["groups"]: output += "No groups found in inventory\n" else: for group_name, hosts in sorted(inventory["groups"].items()): output += f"• {group_name} ({len(hosts)} hosts)\n" return [types.TextContent(type="text", text=output)] elif name == "list_hosts": output = "=== ALL HOSTS IN INVENTORY ===\n\n" if not inventory["hosts"]: output += "No hosts found in inventory\n" else: for hostname in sorted(inventory["hosts"].keys()): host_data = inventory["hosts"][hostname] target = get_host_ip(hostname, host_data) groups = ", ".join(host_data.get("groups", [])[:3]) if len(host_data.get("groups", [])) > 3: groups += ", ..." output += f"• {hostname}\n" output += f" Target: {target}\n" if groups: output += f" Groups: {groups}\n" output += "\n" output += f"Total: {len(inventory['hosts'])} hosts\n" return [types.TextContent(type="text", text=output)] elif name == "reload_inventory": # Use provided reload function or reload global inventory if reload_inventory_func: inventory = await reload_inventory_func() else: global INVENTORY_DATA INVENTORY_DATA = None inventory = load_ansible_inventory() output = "=== INVENTORY RELOADED ===\n\n" output += f"✓ Loaded {len(inventory['hosts'])} hosts\n" output += f"✓ Loaded {len(inventory['groups'])} groups\n" return [types.TextContent(type="text", text=output)] elif name == "ping_host": if not arguments or "hostname" not in arguments: return [ types.TextContent( type="text", text="Error: hostname parameter required" ) ] hostname = arguments["hostname"] count = arguments.get("count", 4) timeout = arguments.get("timeout", 5) if hostname not in inventory["hosts"]: error_msg = format_inventory_error("host", hostname, inventory, "list_hosts") return [types.TextContent(type="text", text=error_msg)] host_data = inventory["hosts"][hostname] target = get_host_ip(hostname, host_data) output = f"=== PINGING {hostname} ===\n" output += f"Target: {target}\n" output += f"Packets: {count}, Timeout: {timeout}s\n\n" result = await ping_host(target, count, timeout) output += format_ping_result(result) return [types.TextContent(type="text", text=output)] elif name == "ping_group": if not arguments or "group" not in arguments: return [ types.TextContent( type="text", text="Error: group parameter required" ) ] group_name = arguments["group"] count = arguments.get("count", 2) timeout = arguments.get("timeout", 3) if group_name not in inventory["groups"]: error_msg = format_inventory_error("group", group_name, inventory, "list_groups") return [types.TextContent(type="text", text=error_msg)] hostnames = inventory["groups"][group_name] output = f"=== PINGING GROUP: {group_name} ===\n" output += ( f"Hosts: {len(hostnames)}, Packets: {count}, Timeout: {timeout}s\n\n" ) # Ping all hosts concurrently tasks = [] for hostname in hostnames: if hostname in inventory["hosts"]: host_data = inventory["hosts"][hostname] target = get_host_ip(hostname, host_data) tasks.append(ping_host(target, count, timeout)) results = await asyncio.gather(*tasks) # Sort by reachability (reachable first) results.sort(key=lambda r: (not r["reachable"], r["host"])) for result in results: output += format_ping_result(result) + "\n" # Summary reachable = sum(1 for r in results if r["reachable"]) output += f"\n--- SUMMARY ---\n" output += f"Reachable: {reachable}/{len(results)}\n" return [types.TextContent(type="text", text=output)] elif name == "ping_all": count = arguments.get("count", 2) if arguments else 2 timeout = arguments.get("timeout", 3) if arguments else 3 hostnames = list(inventory["hosts"].keys()) output = f"=== PINGING ALL HOSTS ===\n" output += f"Total: {len(hostnames)} hosts, Packets: {count}, Timeout: {timeout}s\n\n" if not hostnames: output += "No hosts found in inventory\n" return [types.TextContent(type="text", text=output)] # Ping all hosts concurrently tasks = [] for hostname in hostnames: host_data = inventory["hosts"][hostname] target = get_host_ip(hostname, host_data) tasks.append(ping_host(target, count, timeout)) results = await asyncio.gather(*tasks) # Sort by reachability (reachable first) results.sort(key=lambda r: (not r["reachable"], r["host"])) for result in results: output += format_ping_result(result) + "\n" # Summary reachable = sum(1 for r in results if r["reachable"]) output += f"\n--- SUMMARY ---\n" if len(results) > 0: output += f"Reachable: {reachable}/{len(results)} ({(reachable/len(results)*100):.1f}%)\n" else: output += "No results to summarize\n" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Unknown tool: {name}")] except Exception as e: logger.error(f"Error in tool {name}: {str(e)}", exc_info=True) return [types.TextContent(type="text", text=f"Error: {str(e)}")] @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent]: """Handle tool calls (module-level wrapper for standalone mode)""" # For standalone mode, load inventory fresh each time or use cached inventory = load_ansible_inventory() return await handle_call_tool_impl(name, arguments, inventory) async def main(): """Main entry point""" # Load inventory on startup inventory = load_ansible_inventory() logger.info(f"Ping MCP Server starting with {len(inventory['hosts'])} hosts") async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="ping-info", server_version="1.0.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bjeans/homelab-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server