tools.pyβ’22.3 kB
"""
Kali Linux Penetration Testing Tools Integration
Wrapper functions for common pentesting tools
"""
import asyncio
import subprocess
import json
import re
import logging
from typing import Dict, List, Optional, Any
from pathlib import Path
import xml.etree.ElementTree as ET
logger = logging.getLogger(__name__)
async def run_command(cmd: List[str], timeout: int = 300) -> Dict[str, Any]:
"""Run shell command and return output"""
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
return {
"success": process.returncode == 0,
"returncode": process.returncode,
"stdout": stdout.decode('utf-8', errors='ignore'),
"stderr": stderr.decode('utf-8', errors='ignore')
}
except asyncio.TimeoutError:
process.kill()
return {
"success": False,
"error": "Command timed out",
"timeout": timeout
}
except Exception as e:
logger.error(f"Error running command {cmd}: {e}")
return {
"success": False,
"error": str(e)
}
# ==================== RECONNAISSANCE TOOLS ====================
async def nmap_scan(
target: str,
scan_type: str = "quick",
ports: Optional[str] = None,
scripts: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Perform Nmap scan"""
output_file = f"/tmp/nmap_{target.replace('.', '_')}.xml"
# Build nmap command
cmd = ["nmap"]
if scan_type == "quick":
cmd.extend(["-T4", "-F"])
elif scan_type == "full":
cmd.extend(["-p-", "-T4", "-A"])
elif scan_type == "stealth":
cmd.extend(["-sS", "-T2"])
elif scan_type == "aggressive":
cmd.extend(["-A", "-T4"])
elif scan_type == "udp":
cmd.extend(["-sU", "--top-ports", "100"])
if ports:
cmd.extend(["-p", ports])
if scripts:
cmd.extend(["--script", ",".join(scripts)])
cmd.extend(["-oX", output_file, target])
result = await run_command(cmd, timeout=600)
# Parse XML output
findings = parse_nmap_xml(output_file)
return {
"tool": "nmap",
"target": target,
"scan_type": scan_type,
"success": result["success"],
"findings": findings,
"raw_output": result.get("stdout", "")
}
def parse_nmap_xml(xml_file: str) -> Dict[str, Any]:
"""Parse Nmap XML output"""
try:
tree = ET.parse(xml_file)
root = tree.getroot()
findings = {
"hosts": [],
"open_ports": [],
"services": [],
"vulnerabilities": []
}
for host in root.findall("host"):
if host.find("status").get("state") != "up":
continue
host_info = {
"ip": host.find("address").get("addr"),
"hostname": None,
"os": None,
"ports": []
}
# Get hostname
hostnames = host.find("hostnames")
if hostnames is not None:
hostname = hostnames.find("hostname")
if hostname is not None:
host_info["hostname"] = hostname.get("name")
# Get OS
os_elem = host.find("os")
if os_elem is not None:
osmatch = os_elem.find("osmatch")
if osmatch is not None:
host_info["os"] = osmatch.get("name")
# Get ports
ports_elem = host.find("ports")
if ports_elem is not None:
for port in ports_elem.findall("port"):
state = port.find("state")
if state.get("state") == "open":
service = port.find("service")
port_info = {
"port": port.get("portid"),
"protocol": port.get("protocol"),
"service": service.get("name") if service is not None else "unknown",
"version": service.get("version") if service is not None else None,
"product": service.get("product") if service is not None else None
}
host_info["ports"].append(port_info)
findings["open_ports"].append(f"{port.get('portid')}/{port.get('protocol')}")
findings["services"].append(port_info)
findings["hosts"].append(host_info)
return findings
except Exception as e:
logger.error(f"Error parsing Nmap XML: {e}")
return {"error": str(e)}
async def nikto_scan(target: str, ssl: bool = False, port: int = 80) -> Dict[str, Any]:
"""Perform Nikto web server scan"""
cmd = ["nikto", "-h", target, "-port", str(port)]
if ssl:
cmd.append("-ssl")
cmd.extend(["-Format", "json", "-output", "/tmp/nikto_output.json"])
result = await run_command(cmd, timeout=900)
# Try to read JSON output
findings = []
try:
with open("/tmp/nikto_output.json", "r") as f:
nikto_data = json.load(f)
findings = nikto_data.get("vulnerabilities", [])
except Exception as e:
logger.error(f"Error parsing Nikto output: {e}")
return {
"tool": "nikto",
"target": target,
"port": port,
"ssl": ssl,
"success": result["success"],
"findings": findings,
"raw_output": result.get("stdout", "")
}
async def gobuster_scan(
target: str,
wordlist: Optional[str] = None,
extensions: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Directory and file brute-forcing"""
if wordlist is None:
wordlist = "/usr/share/wordlists/dirb/common.txt"
cmd = ["gobuster", "dir", "-u", target, "-w", wordlist, "-q"]
if extensions:
cmd.extend(["-x", ",".join(extensions)])
result = await run_command(cmd, timeout=600)
# Parse output
found_paths = []
if result["success"]:
for line in result["stdout"].split("\n"):
if line.startswith("/"):
parts = line.split()
if len(parts) >= 2:
found_paths.append({
"path": parts[0],
"status_code": parts[1] if len(parts) > 1 else None
})
return {
"tool": "gobuster",
"target": target,
"wordlist": wordlist,
"success": result["success"],
"found_paths": found_paths,
"count": len(found_paths)
}
async def dns_enum(domain: str, record_types: List[str] = None) -> Dict[str, Any]:
"""DNS enumeration"""
if record_types is None:
record_types = ["A", "AAAA", "MX", "NS", "TXT", "SOA"]
records = {}
for record_type in record_types:
cmd = ["dig", "+short", domain, record_type]
result = await run_command(cmd, timeout=30)
if result["success"] and result["stdout"].strip():
records[record_type] = result["stdout"].strip().split("\n")
# Try subdomain enumeration with sublist3r
subdomains = []
cmd = ["sublist3r", "-d", domain, "-n"]
result = await run_command(cmd, timeout=300)
if result["success"]:
subdomains = [line.strip() for line in result["stdout"].split("\n") if line.strip()]
return {
"tool": "dns_enum",
"domain": domain,
"records": records,
"subdomains": subdomains,
"subdomain_count": len(subdomains)
}
async def enum4linux(target: str, username: Optional[str] = None, password: Optional[str] = None) -> Dict[str, Any]:
"""SMB enumeration"""
cmd = ["enum4linux", "-a", target]
if username and password:
cmd.extend(["-u", username, "-p", password])
result = await run_command(cmd, timeout=300)
findings = {
"shares": [],
"users": [],
"groups": [],
"os_info": None
}
# Parse output
if result["success"]:
output = result["stdout"]
# Extract shares
shares_match = re.findall(r"Sharename.*?Type.*?Comment\n(.*?)(?=\n\n|\Z)", output, re.DOTALL)
if shares_match:
for line in shares_match[0].split("\n"):
if line.strip() and not line.startswith("---"):
findings["shares"].append(line.strip())
# Extract users
users_match = re.findall(r"user:\[(.*?)\]", output)
findings["users"] = users_match
# Extract groups
groups_match = re.findall(r"group:\[(.*?)\]", output)
findings["groups"] = groups_match
return {
"tool": "enum4linux",
"target": target,
"success": result["success"],
"findings": findings
}
# ==================== VULNERABILITY SCANNING ====================
async def sqlmap_scan(
target: str,
data: Optional[str] = None,
cookie: Optional[str] = None,
level: int = 1,
risk: int = 1
) -> Dict[str, Any]:
"""SQL injection testing"""
cmd = ["sqlmap", "-u", target, "--batch", f"--level={level}", f"--risk={risk}"]
if data:
cmd.extend(["--data", data])
if cookie:
cmd.extend(["--cookie", cookie])
cmd.extend(["--output-dir=/tmp/sqlmap"])
result = await run_command(cmd, timeout=900)
vulnerable = "sqlmap identified" in result.get("stdout", "").lower()
return {
"tool": "sqlmap",
"target": target,
"success": result["success"],
"vulnerable": vulnerable,
"details": result.get("stdout", "")
}
async def nuclei_scan(
target: str,
templates: Optional[List[str]] = None,
severity: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Nuclei vulnerability scanner"""
cmd = ["nuclei", "-u", target, "-json", "-o", "/tmp/nuclei_output.json"]
if templates:
for template in templates:
cmd.extend(["-t", template])
if severity:
cmd.extend(["-severity", ",".join(severity)])
result = await run_command(cmd, timeout=600)
vulnerabilities = []
try:
with open("/tmp/nuclei_output.json", "r") as f:
for line in f:
vuln = json.loads(line.strip())
vulnerabilities.append(vuln)
except Exception as e:
logger.error(f"Error parsing Nuclei output: {e}")
return {
"tool": "nuclei",
"target": target,
"success": result["success"],
"vulnerabilities": vulnerabilities,
"count": len(vulnerabilities)
}
async def wpscan(target: str, enumerate: str = "vp", api_token: Optional[str] = None) -> Dict[str, Any]:
"""WordPress vulnerability scanner"""
cmd = ["wpscan", "--url", target, "--enumerate", enumerate, "--format", "json"]
if api_token:
cmd.extend(["--api-token", api_token])
cmd.extend(["-o", "/tmp/wpscan_output.json"])
result = await run_command(cmd, timeout=600)
findings = {}
try:
with open("/tmp/wpscan_output.json", "r") as f:
findings = json.load(f)
except Exception as e:
logger.error(f"Error parsing WPScan output: {e}")
return {
"tool": "wpscan",
"target": target,
"success": result["success"],
"findings": findings
}
async def owasp_zap_scan(target: str, scan_type: str = "quick") -> Dict[str, Any]:
"""OWASP ZAP scan"""
# Note: This requires ZAP to be running in daemon mode
# zap.sh -daemon -port 8080 -config api.key=changeme
cmd = ["zap-cli", "quick-scan", target]
if scan_type == "full":
cmd = ["zap-cli", "active-scan", target]
result = await run_command(cmd, timeout=1800)
return {
"tool": "owasp_zap",
"target": target,
"scan_type": scan_type,
"success": result["success"],
"output": result.get("stdout", "")
}
async def ssl_scan(target: str, port: int = 443) -> Dict[str, Any]:
"""SSL/TLS configuration scan"""
cmd = ["sslscan", f"{target}:{port}"]
result = await run_command(cmd, timeout=120)
findings = {
"weak_ciphers": [],
"vulnerabilities": [],
"certificate_info": {}
}
if result["success"]:
output = result["stdout"]
# Check for vulnerabilities
if "heartbleed" in output.lower():
findings["vulnerabilities"].append("Heartbleed")
if "poodle" in output.lower():
findings["vulnerabilities"].append("POODLE")
if "beast" in output.lower():
findings["vulnerabilities"].append("BEAST")
# Extract weak ciphers (this is simplified)
for line in output.split("\n"):
if "weak" in line.lower() or "export" in line.lower():
findings["weak_ciphers"].append(line.strip())
return {
"tool": "sslscan",
"target": target,
"port": port,
"success": result["success"],
"findings": findings
}
# ==================== EXPLOITATION TOOLS ====================
async def metasploit_search(query: str, type: Optional[str] = None) -> Dict[str, Any]:
"""Search Metasploit modules"""
cmd = ["msfconsole", "-q", "-x", f"search {query}; exit"]
if type:
cmd = ["msfconsole", "-q", "-x", f"search type:{type} {query}; exit"]
result = await run_command(cmd, timeout=60)
modules = []
if result["success"]:
# Parse msfconsole output
lines = result["stdout"].split("\n")
for line in lines:
if "exploit/" in line or "auxiliary/" in line or "post/" in line:
parts = line.split()
if len(parts) >= 2:
modules.append({
"name": parts[0],
"description": " ".join(parts[1:])
})
return {
"tool": "metasploit",
"query": query,
"modules": modules,
"count": len(modules)
}
async def searchsploit(query: str) -> Dict[str, Any]:
"""Search Exploit-DB"""
cmd = ["searchsploit", "-j", query]
result = await run_command(cmd, timeout=30)
exploits = []
if result["success"]:
try:
data = json.loads(result["stdout"])
exploits = data.get("RESULTS_EXPLOIT", [])
except json.JSONDecodeError:
pass
return {
"tool": "searchsploit",
"query": query,
"exploits": exploits,
"count": len(exploits)
}
# ==================== BRUTE FORCE TOOLS ====================
async def hydra_bruteforce(
target: str,
service: str,
username: Optional[str] = None,
password_list: str = "/usr/share/wordlists/rockyou.txt",
port: Optional[int] = None
) -> Dict[str, Any]:
"""Network login brute-force"""
cmd = ["hydra"]
if username:
cmd.extend(["-l", username])
else:
cmd.extend(["-L", "/usr/share/wordlists/metasploit/unix_users.txt"])
cmd.extend(["-P", password_list, "-t", "4"])
if port:
cmd.extend(["-s", str(port)])
cmd.extend([target, service])
result = await run_command(cmd, timeout=1800)
credentials = []
if result["success"]:
# Parse found credentials
for line in result["stdout"].split("\n"):
if "login:" in line and "password:" in line:
credentials.append(line.strip())
return {
"tool": "hydra",
"target": target,
"service": service,
"success": result["success"],
"credentials_found": credentials,
"count": len(credentials)
}
async def ffuf_fuzz(target: str, wordlist: str, method: str = "GET") -> Dict[str, Any]:
"""Fast web fuzzer"""
cmd = ["ffuf", "-u", target, "-w", wordlist, "-X", method, "-o", "/tmp/ffuf_output.json", "-of", "json"]
result = await run_command(cmd, timeout=600)
findings = []
try:
with open("/tmp/ffuf_output.json", "r") as f:
data = json.load(f)
findings = data.get("results", [])
except Exception as e:
logger.error(f"Error parsing ffuf output: {e}")
return {
"tool": "ffuf",
"target": target,
"method": method,
"success": result["success"],
"findings": findings,
"count": len(findings)
}
# ==================== PASSWORD CRACKING ====================
async def john_crack(
hash_file: str,
wordlist: Optional[str] = None,
format: Optional[str] = None
) -> Dict[str, Any]:
"""John the Ripper password cracking"""
cmd = ["john"]
if format:
cmd.extend([f"--format={format}"])
if wordlist:
cmd.extend([f"--wordlist={wordlist}"])
cmd.append(hash_file)
result = await run_command(cmd, timeout=3600)
# Get cracked passwords
show_cmd = ["john", "--show", hash_file]
show_result = await run_command(show_cmd, timeout=10)
cracked = []
if show_result["success"]:
for line in show_result["stdout"].split("\n"):
if ":" in line:
cracked.append(line.strip())
return {
"tool": "john",
"hash_file": hash_file,
"success": result["success"],
"cracked_passwords": cracked,
"count": len(cracked)
}
async def hashcat_crack(
hash: str,
hash_type: int,
wordlist: Optional[str] = None,
attack_mode: int = 0
) -> Dict[str, Any]:
"""Hashcat GPU password cracking"""
hash_file = "/tmp/hashcat_input.txt"
with open(hash_file, "w") as f:
f.write(hash)
cmd = ["hashcat", "-m", str(hash_type), "-a", str(attack_mode), hash_file]
if wordlist:
cmd.append(wordlist)
cmd.extend(["--outfile", "/tmp/hashcat_output.txt"])
result = await run_command(cmd, timeout=3600)
cracked = None
try:
with open("/tmp/hashcat_output.txt", "r") as f:
cracked = f.read().strip()
except Exception:
pass
return {
"tool": "hashcat",
"hash_type": hash_type,
"success": result["success"],
"cracked": cracked
}
# ==================== NETWORK ANALYSIS ====================
async def tcpdump_capture(
interface: str,
filter: Optional[str] = None,
duration: int = 60,
output_file: Optional[str] = None
) -> Dict[str, Any]:
"""Capture network packets"""
if output_file is None:
output_file = f"/tmp/capture_{interface}.pcap"
cmd = ["timeout", str(duration), "tcpdump", "-i", interface, "-w", output_file]
if filter:
cmd.append(filter)
result = await run_command(cmd, timeout=duration + 10)
return {
"tool": "tcpdump",
"interface": interface,
"duration": duration,
"output_file": output_file,
"success": result["success"]
}
async def snmp_check(target: str, community: str = "public") -> Dict[str, Any]:
"""SNMP enumeration"""
cmd = ["snmp-check", target, "-c", community]
result = await run_command(cmd, timeout=120)
return {
"tool": "snmp-check",
"target": target,
"community": community,
"success": result["success"],
"output": result.get("stdout", "")
}
# ==================== HELPER TOOLS ====================
async def smbclient_enum(target: str) -> Dict[str, Any]:
"""SMB share enumeration"""
cmd = ["smbclient", "-L", target, "-N"]
result = await run_command(cmd, timeout=60)
return {
"tool": "smbclient",
"target": target,
"success": result["success"],
"output": result.get("stdout", "")
}
async def wfuzz_scan(target: str, wordlist: str) -> Dict[str, Any]:
"""Web application fuzzer"""
cmd = ["wfuzz", "-w", wordlist, "-o", "json", target]
result = await run_command(cmd, timeout=600)
return {
"tool": "wfuzz",
"target": target,
"success": result["success"],
"output": result.get("stdout", "")
}
async def aircrack_ng(capture_file: str, wordlist: str) -> Dict[str, Any]:
"""Wireless password cracking"""
cmd = ["aircrack-ng", capture_file, "-w", wordlist]
result = await run_command(cmd, timeout=1800)
key_found = "KEY FOUND" in result.get("stdout", "")
return {
"tool": "aircrack-ng",
"capture_file": capture_file,
"success": result["success"],
"key_found": key_found,
"output": result.get("stdout", "")
}
async def wireshark_analyze(pcap_file: str) -> Dict[str, Any]:
"""Analyze packet capture with tshark"""
cmd = ["tshark", "-r", pcap_file, "-q", "-z", "conv,tcp"]
result = await run_command(cmd, timeout=120)
return {
"tool": "wireshark",
"pcap_file": pcap_file,
"success": result["success"],
"analysis": result.get("stdout", "")
}
async def burpsuite_scan(target: str) -> Dict[str, Any]:
"""Burp Suite scan (requires Burp Suite Pro)"""
# This is a placeholder - actual implementation would use Burp Suite REST API
return {
"tool": "burpsuite",
"target": target,
"success": False,
"note": "Requires Burp Suite Pro with REST API enabled"
}
async def exploit_db_search(query: str) -> Dict[str, Any]:
"""Search Exploit-DB (alias for searchsploit)"""
return await searchsploit(query)
async def vulnerability_assessment(target: str, assessment_type: str = "comprehensive") -> Dict[str, Any]:
"""Comprehensive vulnerability assessment"""
results = {
"target": target,
"assessment_type": assessment_type,
"scans": {}
}
if assessment_type in ["web", "comprehensive"]:
results["scans"]["nikto"] = await nikto_scan(target)
results["scans"]["nuclei"] = await nuclei_scan(target)
if assessment_type in ["network", "comprehensive"]:
results["scans"]["nmap"] = await nmap_scan(target, scan_type="full")
results["scans"]["ssl"] = await ssl_scan(target)
return results