Skip to main content
Glama

MCP Kali Pentest

by Root1856
tools.pyβ€’22.3 kB
""" Kali Linux Penetration Testing Tools Integration Wrapper functions for common pentesting tools """ import asyncio import subprocess import json import re import logging from typing import Dict, List, Optional, Any from pathlib import Path import xml.etree.ElementTree as ET logger = logging.getLogger(__name__) async def run_command(cmd: List[str], timeout: int = 300) -> Dict[str, Any]: """Run shell command and return output""" try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout ) return { "success": process.returncode == 0, "returncode": process.returncode, "stdout": stdout.decode('utf-8', errors='ignore'), "stderr": stderr.decode('utf-8', errors='ignore') } except asyncio.TimeoutError: process.kill() return { "success": False, "error": "Command timed out", "timeout": timeout } except Exception as e: logger.error(f"Error running command {cmd}: {e}") return { "success": False, "error": str(e) } # ==================== RECONNAISSANCE TOOLS ==================== async def nmap_scan( target: str, scan_type: str = "quick", ports: Optional[str] = None, scripts: Optional[List[str]] = None ) -> Dict[str, Any]: """Perform Nmap scan""" output_file = f"/tmp/nmap_{target.replace('.', '_')}.xml" # Build nmap command cmd = ["nmap"] if scan_type == "quick": cmd.extend(["-T4", "-F"]) elif scan_type == "full": cmd.extend(["-p-", "-T4", "-A"]) elif scan_type == "stealth": cmd.extend(["-sS", "-T2"]) elif scan_type == "aggressive": cmd.extend(["-A", "-T4"]) elif scan_type == "udp": cmd.extend(["-sU", "--top-ports", "100"]) if ports: cmd.extend(["-p", ports]) if scripts: cmd.extend(["--script", ",".join(scripts)]) cmd.extend(["-oX", output_file, target]) result = await run_command(cmd, timeout=600) # Parse XML output findings = parse_nmap_xml(output_file) return { "tool": "nmap", "target": target, "scan_type": scan_type, "success": result["success"], "findings": findings, "raw_output": result.get("stdout", "") } def parse_nmap_xml(xml_file: str) -> Dict[str, Any]: """Parse Nmap XML output""" try: tree = ET.parse(xml_file) root = tree.getroot() findings = { "hosts": [], "open_ports": [], "services": [], "vulnerabilities": [] } for host in root.findall("host"): if host.find("status").get("state") != "up": continue host_info = { "ip": host.find("address").get("addr"), "hostname": None, "os": None, "ports": [] } # Get hostname hostnames = host.find("hostnames") if hostnames is not None: hostname = hostnames.find("hostname") if hostname is not None: host_info["hostname"] = hostname.get("name") # Get OS os_elem = host.find("os") if os_elem is not None: osmatch = os_elem.find("osmatch") if osmatch is not None: host_info["os"] = osmatch.get("name") # Get ports ports_elem = host.find("ports") if ports_elem is not None: for port in ports_elem.findall("port"): state = port.find("state") if state.get("state") == "open": service = port.find("service") port_info = { "port": port.get("portid"), "protocol": port.get("protocol"), "service": service.get("name") if service is not None else "unknown", "version": service.get("version") if service is not None else None, "product": service.get("product") if service is not None else None } host_info["ports"].append(port_info) findings["open_ports"].append(f"{port.get('portid')}/{port.get('protocol')}") findings["services"].append(port_info) findings["hosts"].append(host_info) return findings except Exception as e: logger.error(f"Error parsing Nmap XML: {e}") return {"error": str(e)} async def nikto_scan(target: str, ssl: bool = False, port: int = 80) -> Dict[str, Any]: """Perform Nikto web server scan""" cmd = ["nikto", "-h", target, "-port", str(port)] if ssl: cmd.append("-ssl") cmd.extend(["-Format", "json", "-output", "/tmp/nikto_output.json"]) result = await run_command(cmd, timeout=900) # Try to read JSON output findings = [] try: with open("/tmp/nikto_output.json", "r") as f: nikto_data = json.load(f) findings = nikto_data.get("vulnerabilities", []) except Exception as e: logger.error(f"Error parsing Nikto output: {e}") return { "tool": "nikto", "target": target, "port": port, "ssl": ssl, "success": result["success"], "findings": findings, "raw_output": result.get("stdout", "") } async def gobuster_scan( target: str, wordlist: Optional[str] = None, extensions: Optional[List[str]] = None ) -> Dict[str, Any]: """Directory and file brute-forcing""" if wordlist is None: wordlist = "/usr/share/wordlists/dirb/common.txt" cmd = ["gobuster", "dir", "-u", target, "-w", wordlist, "-q"] if extensions: cmd.extend(["-x", ",".join(extensions)]) result = await run_command(cmd, timeout=600) # Parse output found_paths = [] if result["success"]: for line in result["stdout"].split("\n"): if line.startswith("/"): parts = line.split() if len(parts) >= 2: found_paths.append({ "path": parts[0], "status_code": parts[1] if len(parts) > 1 else None }) return { "tool": "gobuster", "target": target, "wordlist": wordlist, "success": result["success"], "found_paths": found_paths, "count": len(found_paths) } async def dns_enum(domain: str, record_types: List[str] = None) -> Dict[str, Any]: """DNS enumeration""" if record_types is None: record_types = ["A", "AAAA", "MX", "NS", "TXT", "SOA"] records = {} for record_type in record_types: cmd = ["dig", "+short", domain, record_type] result = await run_command(cmd, timeout=30) if result["success"] and result["stdout"].strip(): records[record_type] = result["stdout"].strip().split("\n") # Try subdomain enumeration with sublist3r subdomains = [] cmd = ["sublist3r", "-d", domain, "-n"] result = await run_command(cmd, timeout=300) if result["success"]: subdomains = [line.strip() for line in result["stdout"].split("\n") if line.strip()] return { "tool": "dns_enum", "domain": domain, "records": records, "subdomains": subdomains, "subdomain_count": len(subdomains) } async def enum4linux(target: str, username: Optional[str] = None, password: Optional[str] = None) -> Dict[str, Any]: """SMB enumeration""" cmd = ["enum4linux", "-a", target] if username and password: cmd.extend(["-u", username, "-p", password]) result = await run_command(cmd, timeout=300) findings = { "shares": [], "users": [], "groups": [], "os_info": None } # Parse output if result["success"]: output = result["stdout"] # Extract shares shares_match = re.findall(r"Sharename.*?Type.*?Comment\n(.*?)(?=\n\n|\Z)", output, re.DOTALL) if shares_match: for line in shares_match[0].split("\n"): if line.strip() and not line.startswith("---"): findings["shares"].append(line.strip()) # Extract users users_match = re.findall(r"user:\[(.*?)\]", output) findings["users"] = users_match # Extract groups groups_match = re.findall(r"group:\[(.*?)\]", output) findings["groups"] = groups_match return { "tool": "enum4linux", "target": target, "success": result["success"], "findings": findings } # ==================== VULNERABILITY SCANNING ==================== async def sqlmap_scan( target: str, data: Optional[str] = None, cookie: Optional[str] = None, level: int = 1, risk: int = 1 ) -> Dict[str, Any]: """SQL injection testing""" cmd = ["sqlmap", "-u", target, "--batch", f"--level={level}", f"--risk={risk}"] if data: cmd.extend(["--data", data]) if cookie: cmd.extend(["--cookie", cookie]) cmd.extend(["--output-dir=/tmp/sqlmap"]) result = await run_command(cmd, timeout=900) vulnerable = "sqlmap identified" in result.get("stdout", "").lower() return { "tool": "sqlmap", "target": target, "success": result["success"], "vulnerable": vulnerable, "details": result.get("stdout", "") } async def nuclei_scan( target: str, templates: Optional[List[str]] = None, severity: Optional[List[str]] = None ) -> Dict[str, Any]: """Nuclei vulnerability scanner""" cmd = ["nuclei", "-u", target, "-json", "-o", "/tmp/nuclei_output.json"] if templates: for template in templates: cmd.extend(["-t", template]) if severity: cmd.extend(["-severity", ",".join(severity)]) result = await run_command(cmd, timeout=600) vulnerabilities = [] try: with open("/tmp/nuclei_output.json", "r") as f: for line in f: vuln = json.loads(line.strip()) vulnerabilities.append(vuln) except Exception as e: logger.error(f"Error parsing Nuclei output: {e}") return { "tool": "nuclei", "target": target, "success": result["success"], "vulnerabilities": vulnerabilities, "count": len(vulnerabilities) } async def wpscan(target: str, enumerate: str = "vp", api_token: Optional[str] = None) -> Dict[str, Any]: """WordPress vulnerability scanner""" cmd = ["wpscan", "--url", target, "--enumerate", enumerate, "--format", "json"] if api_token: cmd.extend(["--api-token", api_token]) cmd.extend(["-o", "/tmp/wpscan_output.json"]) result = await run_command(cmd, timeout=600) findings = {} try: with open("/tmp/wpscan_output.json", "r") as f: findings = json.load(f) except Exception as e: logger.error(f"Error parsing WPScan output: {e}") return { "tool": "wpscan", "target": target, "success": result["success"], "findings": findings } async def owasp_zap_scan(target: str, scan_type: str = "quick") -> Dict[str, Any]: """OWASP ZAP scan""" # Note: This requires ZAP to be running in daemon mode # zap.sh -daemon -port 8080 -config api.key=changeme cmd = ["zap-cli", "quick-scan", target] if scan_type == "full": cmd = ["zap-cli", "active-scan", target] result = await run_command(cmd, timeout=1800) return { "tool": "owasp_zap", "target": target, "scan_type": scan_type, "success": result["success"], "output": result.get("stdout", "") } async def ssl_scan(target: str, port: int = 443) -> Dict[str, Any]: """SSL/TLS configuration scan""" cmd = ["sslscan", f"{target}:{port}"] result = await run_command(cmd, timeout=120) findings = { "weak_ciphers": [], "vulnerabilities": [], "certificate_info": {} } if result["success"]: output = result["stdout"] # Check for vulnerabilities if "heartbleed" in output.lower(): findings["vulnerabilities"].append("Heartbleed") if "poodle" in output.lower(): findings["vulnerabilities"].append("POODLE") if "beast" in output.lower(): findings["vulnerabilities"].append("BEAST") # Extract weak ciphers (this is simplified) for line in output.split("\n"): if "weak" in line.lower() or "export" in line.lower(): findings["weak_ciphers"].append(line.strip()) return { "tool": "sslscan", "target": target, "port": port, "success": result["success"], "findings": findings } # ==================== EXPLOITATION TOOLS ==================== async def metasploit_search(query: str, type: Optional[str] = None) -> Dict[str, Any]: """Search Metasploit modules""" cmd = ["msfconsole", "-q", "-x", f"search {query}; exit"] if type: cmd = ["msfconsole", "-q", "-x", f"search type:{type} {query}; exit"] result = await run_command(cmd, timeout=60) modules = [] if result["success"]: # Parse msfconsole output lines = result["stdout"].split("\n") for line in lines: if "exploit/" in line or "auxiliary/" in line or "post/" in line: parts = line.split() if len(parts) >= 2: modules.append({ "name": parts[0], "description": " ".join(parts[1:]) }) return { "tool": "metasploit", "query": query, "modules": modules, "count": len(modules) } async def searchsploit(query: str) -> Dict[str, Any]: """Search Exploit-DB""" cmd = ["searchsploit", "-j", query] result = await run_command(cmd, timeout=30) exploits = [] if result["success"]: try: data = json.loads(result["stdout"]) exploits = data.get("RESULTS_EXPLOIT", []) except json.JSONDecodeError: pass return { "tool": "searchsploit", "query": query, "exploits": exploits, "count": len(exploits) } # ==================== BRUTE FORCE TOOLS ==================== async def hydra_bruteforce( target: str, service: str, username: Optional[str] = None, password_list: str = "/usr/share/wordlists/rockyou.txt", port: Optional[int] = None ) -> Dict[str, Any]: """Network login brute-force""" cmd = ["hydra"] if username: cmd.extend(["-l", username]) else: cmd.extend(["-L", "/usr/share/wordlists/metasploit/unix_users.txt"]) cmd.extend(["-P", password_list, "-t", "4"]) if port: cmd.extend(["-s", str(port)]) cmd.extend([target, service]) result = await run_command(cmd, timeout=1800) credentials = [] if result["success"]: # Parse found credentials for line in result["stdout"].split("\n"): if "login:" in line and "password:" in line: credentials.append(line.strip()) return { "tool": "hydra", "target": target, "service": service, "success": result["success"], "credentials_found": credentials, "count": len(credentials) } async def ffuf_fuzz(target: str, wordlist: str, method: str = "GET") -> Dict[str, Any]: """Fast web fuzzer""" cmd = ["ffuf", "-u", target, "-w", wordlist, "-X", method, "-o", "/tmp/ffuf_output.json", "-of", "json"] result = await run_command(cmd, timeout=600) findings = [] try: with open("/tmp/ffuf_output.json", "r") as f: data = json.load(f) findings = data.get("results", []) except Exception as e: logger.error(f"Error parsing ffuf output: {e}") return { "tool": "ffuf", "target": target, "method": method, "success": result["success"], "findings": findings, "count": len(findings) } # ==================== PASSWORD CRACKING ==================== async def john_crack( hash_file: str, wordlist: Optional[str] = None, format: Optional[str] = None ) -> Dict[str, Any]: """John the Ripper password cracking""" cmd = ["john"] if format: cmd.extend([f"--format={format}"]) if wordlist: cmd.extend([f"--wordlist={wordlist}"]) cmd.append(hash_file) result = await run_command(cmd, timeout=3600) # Get cracked passwords show_cmd = ["john", "--show", hash_file] show_result = await run_command(show_cmd, timeout=10) cracked = [] if show_result["success"]: for line in show_result["stdout"].split("\n"): if ":" in line: cracked.append(line.strip()) return { "tool": "john", "hash_file": hash_file, "success": result["success"], "cracked_passwords": cracked, "count": len(cracked) } async def hashcat_crack( hash: str, hash_type: int, wordlist: Optional[str] = None, attack_mode: int = 0 ) -> Dict[str, Any]: """Hashcat GPU password cracking""" hash_file = "/tmp/hashcat_input.txt" with open(hash_file, "w") as f: f.write(hash) cmd = ["hashcat", "-m", str(hash_type), "-a", str(attack_mode), hash_file] if wordlist: cmd.append(wordlist) cmd.extend(["--outfile", "/tmp/hashcat_output.txt"]) result = await run_command(cmd, timeout=3600) cracked = None try: with open("/tmp/hashcat_output.txt", "r") as f: cracked = f.read().strip() except Exception: pass return { "tool": "hashcat", "hash_type": hash_type, "success": result["success"], "cracked": cracked } # ==================== NETWORK ANALYSIS ==================== async def tcpdump_capture( interface: str, filter: Optional[str] = None, duration: int = 60, output_file: Optional[str] = None ) -> Dict[str, Any]: """Capture network packets""" if output_file is None: output_file = f"/tmp/capture_{interface}.pcap" cmd = ["timeout", str(duration), "tcpdump", "-i", interface, "-w", output_file] if filter: cmd.append(filter) result = await run_command(cmd, timeout=duration + 10) return { "tool": "tcpdump", "interface": interface, "duration": duration, "output_file": output_file, "success": result["success"] } async def snmp_check(target: str, community: str = "public") -> Dict[str, Any]: """SNMP enumeration""" cmd = ["snmp-check", target, "-c", community] result = await run_command(cmd, timeout=120) return { "tool": "snmp-check", "target": target, "community": community, "success": result["success"], "output": result.get("stdout", "") } # ==================== HELPER TOOLS ==================== async def smbclient_enum(target: str) -> Dict[str, Any]: """SMB share enumeration""" cmd = ["smbclient", "-L", target, "-N"] result = await run_command(cmd, timeout=60) return { "tool": "smbclient", "target": target, "success": result["success"], "output": result.get("stdout", "") } async def wfuzz_scan(target: str, wordlist: str) -> Dict[str, Any]: """Web application fuzzer""" cmd = ["wfuzz", "-w", wordlist, "-o", "json", target] result = await run_command(cmd, timeout=600) return { "tool": "wfuzz", "target": target, "success": result["success"], "output": result.get("stdout", "") } async def aircrack_ng(capture_file: str, wordlist: str) -> Dict[str, Any]: """Wireless password cracking""" cmd = ["aircrack-ng", capture_file, "-w", wordlist] result = await run_command(cmd, timeout=1800) key_found = "KEY FOUND" in result.get("stdout", "") return { "tool": "aircrack-ng", "capture_file": capture_file, "success": result["success"], "key_found": key_found, "output": result.get("stdout", "") } async def wireshark_analyze(pcap_file: str) -> Dict[str, Any]: """Analyze packet capture with tshark""" cmd = ["tshark", "-r", pcap_file, "-q", "-z", "conv,tcp"] result = await run_command(cmd, timeout=120) return { "tool": "wireshark", "pcap_file": pcap_file, "success": result["success"], "analysis": result.get("stdout", "") } async def burpsuite_scan(target: str) -> Dict[str, Any]: """Burp Suite scan (requires Burp Suite Pro)""" # This is a placeholder - actual implementation would use Burp Suite REST API return { "tool": "burpsuite", "target": target, "success": False, "note": "Requires Burp Suite Pro with REST API enabled" } async def exploit_db_search(query: str) -> Dict[str, Any]: """Search Exploit-DB (alias for searchsploit)""" return await searchsploit(query) async def vulnerability_assessment(target: str, assessment_type: str = "comprehensive") -> Dict[str, Any]: """Comprehensive vulnerability assessment""" results = { "target": target, "assessment_type": assessment_type, "scans": {} } if assessment_type in ["web", "comprehensive"]: results["scans"]["nikto"] = await nikto_scan(target) results["scans"]["nuclei"] = await nuclei_scan(target) if assessment_type in ["network", "comprehensive"]: results["scans"]["nmap"] = await nmap_scan(target, scan_type="full") results["scans"]["ssl"] = await ssl_scan(target) return results

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Root1856/mcpkali'

If you have feedback or need assistance with the MCP directory API, please join our Discord server