check_firewall_config
Verify firewall rules and open ports on remote servers to identify security vulnerabilities and ensure proper network access controls.
Instructions
检查防火墙配置和开放端口
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| hostname | Yes | ||
| username | Yes | ||
| password | No | ||
| port | No | ||
| timeout | No |
Implementation Reference
- Primary handler implementation for the 'check_firewall_config' MCP tool. Performs detailed SSH-based checks for UFW, firewalld, iptables status, rules, and open ports.def check_firewall_config( hostname: str, username: str, password: str = "", port: int = 22, timeout: int = 30 ) -> dict: """检查防火墙配置和开放端口""" result = {"status": "unknown", "firewall": {"active": False, "type": "unknown"}, "open_ports": [], "rules": [], "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 检查UFW状态(Ubuntu/Debian) ufw_command = "which ufw > /dev/null && ufw status || echo 'ufw not found'" stdin, stdout, stderr = ssh.exec_command(ufw_command, timeout=timeout) ufw_output = stdout.read().decode().strip() # 检查firewalld状态(CentOS/RHEL) firewalld_command = "which firewall-cmd > /dev/null && firewall-cmd --state || echo 'firewalld not found'" stdin, stdout, stderr = ssh.exec_command(firewalld_command, timeout=timeout) firewalld_output = stdout.read().decode().strip() # 检查iptables状态 iptables_command = "which iptables > /dev/null && iptables -L -n || echo 'iptables not found'" stdin, stdout, stderr = ssh.exec_command(iptables_command, timeout=timeout) iptables_output = stdout.read().decode().strip() # 确定防火墙类型和状态 if "Status: active" in ufw_output: result["firewall"]["type"] = "ufw" result["firewall"]["active"] = True # 获取UFW规则 ufw_rules_command = "ufw status numbered" stdin, stdout, stderr = ssh.exec_command(ufw_rules_command, timeout=timeout) ufw_rules = stdout.read().decode().strip() # 解析UFW规则和开放端口 for line in ufw_rules.split('\n'): if "ALLOW" in line or "DENY" in line: result["rules"].append(line.strip()) # 提取端口 port_match = re.search(r'(\d+)/tcp', line) if port_match: result["open_ports"].append(port_match.group(1)) elif "running" in firewalld_output: result["firewall"]["type"] = "firewalld" result["firewall"]["active"] = True # 获取firewalld区域和规则 zones_command = "firewall-cmd --list-all-zones" stdin, stdout, stderr = ssh.exec_command(zones_command, timeout=timeout) zones_output = stdout.read().decode().strip() # 解析firewalld规则 current_zone = None for line in zones_output.split('\n'): if line.endswith("(active)"): current_zone = line.split()[0] if current_zone and "ports:" in line: ports = line.split("ports:")[1].strip() for port in ports.split(): if "/" in port: result["open_ports"].append(port.split("/")[0]) result["rules"].append(f"{current_zone} zone: {port}") elif "Chain INPUT" in iptables_output: result["firewall"]["type"] = "iptables" result["firewall"]["active"] = True # 解析iptables规则 for line in iptables_output.split('\n'): if "ACCEPT" in line and "dpt:" in line: port_match = re.search(r'dpt:(\d+)', line) if port_match: result["open_ports"].append(port_match.group(1)) result["rules"].append(line.strip()) else: result["firewall"]["type"] = "none" result["firewall"]["active"] = False result["rules"].append("未检测到活动的防火墙") # 如果没有检测到防火墙规则,尝试使用netstat或ss检查开放端口 if not result["open_ports"]: ports_command = "ss -tuln || netstat -tuln" stdin, stdout, stderr = ssh.exec_command(ports_command, timeout=timeout) ports_output = stdout.read().decode().strip() for line in ports_output.split('\n'): if "LISTEN" in line: port_match = re.search(r':(\d+)', line) if port_match: result["open_ports"].append(port_match.group(1)) # 去重开放端口 result["open_ports"] = list(set(result["open_ports"])) result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result
- Secondary handler for SSE version of 'check_firewall_config'. Simpler check for firewall services status via SSH.def check_firewall_config( hostname: str, username: str, password: str = "", port: int = 22, timeout: int = 30 ) -> dict: """检查防火墙配置""" result = {"status": "unknown", "firewall_info": {}, "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 检查常见的防火墙服务 firewall_services = ["ufw", "firewalld", "iptables"] firewall_info = {} for fw in firewall_services: if fw == "ufw": stdin, stdout, stderr = ssh.exec_command("ufw status", timeout=timeout) output = stdout.read().decode().strip() if "Status: active" in output: firewall_info["ufw"] = { "active": True, "rules": output.split('\n')[1:] if len(output.split('\n')) > 1 else [] } else: firewall_info["ufw"] = {"active": False} elif fw == "firewalld": stdin, stdout, stderr = ssh.exec_command("firewall-cmd --state", timeout=timeout) output = stdout.read().decode().strip() if output == "running": # 获取区域信息 stdin, stdout, stderr = ssh.exec_command("firewall-cmd --list-all", timeout=timeout) zones_output = stdout.read().decode().strip() firewall_info["firewalld"] = { "active": True, "config": zones_output } else: firewall_info["firewalld"] = {"active": False} elif fw == "iptables": stdin, stdout, stderr = ssh.exec_command("iptables -L", timeout=timeout) output = stdout.read().decode().strip() # 简单检查是否有规则 has_rules = len(output.split('\n')) > 6 # 基本的链定义通常有6行 firewall_info["iptables"] = { "active": has_rules, "rules": output.split('\n') } result["firewall_info"] = firewall_info result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result
- server_monitor/main.py:42-66 (registration)Registration of the check_firewall_config handler in the MCP tools dictionary and dynamic @mcp.tool() decoration in server_monitor.tools_dict = { 'get_memory_info': get_memory_info, 'remote_server_inspection': remote_server_inspection, 'get_system_load': get_system_load, 'monitor_processes': monitor_processes, 'check_service_status': check_service_status, 'get_os_details': get_os_details, 'check_ssh_risk_logins': check_ssh_risk_logins, 'check_firewall_config': check_firewall_config, 'security_vulnerability_scan': security_vulnerability_scan, 'backup_critical_files': backup_critical_files, 'inspect_network': inspect_network, 'analyze_logs': analyze_logs, 'list_docker_containers': list_docker_containers, 'list_docker_images': list_docker_images, 'list_docker_volumes': list_docker_volumes, 'get_container_logs': get_container_logs, 'monitor_container_stats': monitor_container_stats, 'check_docker_health': check_docker_health } # 使用装饰器动态注册所有工具 for name, func in tools_dict.items(): mcp.tool()(func)
- server_monitor_sse/server.py:143-155 (registration)Tool dispatch registration in SSE server's @app.call_tool() handler for check_firewall_config.elif name == "check_firewall_config": required_args = ["hostname", "username"] for arg in required_args: if arg not in arguments: raise ValueError(f"Missing required argument '{arg}'") result = check_firewall_config( hostname=arguments["hostname"], username=arguments["username"], password=arguments.get("password", ""), port=arguments.get("port", 22), timeout=arguments.get("timeout", 30) )
- JSON schema definition for the check_firewall_config tool parameters and description.{"name": "check_firewall_config", "description": "检查防火墙配置和开放端口", "parameters": [ {"name": "hostname", "type": "str", "default": None}, {"name": "username", "type": "str", "default": None}, {"name": "password", "type": "str", "default": ""}, {"name": "port", "type": "int", "default": 22}, {"name": "timeout", "type": "int", "default": 30} ]},