tools_extended.py•43.6 kB
"""
Extended Kali Linux Penetration Testing Tools Integration
Comprehensive tool suite covering all security testing domains
"""
import asyncio
import subprocess
import json
import re
import logging
from typing import Dict, List, Optional, Any
from pathlib import Path
import xml.etree.ElementTree as ET
import base64
logger = logging.getLogger(__name__)
async def run_command(cmd: List[str], timeout: int = 300) -> Dict[str, Any]:
"""Run shell command and return output"""
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
return {
"success": process.returncode == 0,
"returncode": process.returncode,
"stdout": stdout.decode('utf-8', errors='ignore'),
"stderr": stderr.decode('utf-8', errors='ignore')
}
except asyncio.TimeoutError:
process.kill()
return {
"success": False,
"error": "Command timed out",
"timeout": timeout
}
except Exception as e:
logger.error(f"Error running command {cmd}: {e}")
return {
"success": False,
"error": str(e)
}
# ==================== RECONNAISSANCE TOOLS ====================
async def nmap_scan(
target: str,
scan_type: str = "quick",
ports: Optional[str] = None,
scripts: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Perform Nmap scan"""
output_file = f"/tmp/nmap_{target.replace('.', '_')}.xml"
cmd = ["nmap"]
if scan_type == "quick":
cmd.extend(["-T4", "-F"])
elif scan_type == "full":
cmd.extend(["-p-", "-T4", "-A"])
elif scan_type == "stealth":
cmd.extend(["-sS", "-T2"])
elif scan_type == "aggressive":
cmd.extend(["-A", "-T4"])
elif scan_type == "udp":
cmd.extend(["-sU", "--top-ports", "100"])
if ports:
cmd.extend(["-p", ports])
if scripts:
cmd.extend(["--script", ",".join(scripts)])
cmd.extend(["-oX", output_file, target])
result = await run_command(cmd, timeout=600)
findings = parse_nmap_xml(output_file)
return {
"tool": "nmap",
"target": target,
"scan_type": scan_type,
"success": result["success"],
"findings": findings,
"raw_output": result.get("stdout", "")
}
def parse_nmap_xml(xml_file: str) -> Dict[str, Any]:
"""Parse Nmap XML output"""
try:
tree = ET.parse(xml_file)
root = tree.getroot()
findings = {
"hosts": [],
"open_ports": [],
"services": [],
"vulnerabilities": []
}
for host in root.findall("host"):
if host.find("status").get("state") != "up":
continue
host_info = {
"ip": host.find("address").get("addr"),
"hostname": None,
"os": None,
"ports": []
}
hostnames = host.find("hostnames")
if hostnames is not None:
hostname = hostnames.find("hostname")
if hostname is not None:
host_info["hostname"] = hostname.get("name")
os_elem = host.find("os")
if os_elem is not None:
osmatch = os_elem.find("osmatch")
if osmatch is not None:
host_info["os"] = osmatch.get("name")
ports_elem = host.find("ports")
if ports_elem is not None:
for port in ports_elem.findall("port"):
state = port.find("state")
if state.get("state") == "open":
service = port.find("service")
port_info = {
"port": port.get("portid"),
"protocol": port.get("protocol"),
"service": service.get("name") if service is not None else "unknown",
"version": service.get("version") if service is not None else None,
"product": service.get("product") if service is not None else None
}
host_info["ports"].append(port_info)
findings["open_ports"].append(f"{port.get('portid')}/{port.get('protocol')}")
findings["services"].append(port_info)
findings["hosts"].append(host_info)
return findings
except Exception as e:
logger.error(f"Error parsing Nmap XML: {e}")
return {"error": str(e)}
async def masscan_scan(target: str, ports: str = "0-65535", rate: int = 1000) -> Dict[str, Any]:
"""Ultra-fast port scanner"""
output_file = f"/tmp/masscan_{target.replace('.', '_')}.json"
cmd = ["masscan", target, "-p", ports, "--rate", str(rate), "-oJ", output_file]
result = await run_command(cmd, timeout=300)
findings = []
try:
with open(output_file, "r") as f:
findings = json.load(f)
except Exception as e:
logger.error(f"Error parsing masscan output: {e}")
return {
"tool": "masscan",
"target": target,
"success": result["success"],
"findings": findings
}
async def amass_enum(domain: str, passive: bool = True) -> Dict[str, Any]:
"""Advanced subdomain enumeration"""
cmd = ["amass", "enum", "-d", domain]
if passive:
cmd.append("-passive")
cmd.extend(["-json", "/tmp/amass_output.json"])
result = await run_command(cmd, timeout=600)
subdomains = []
try:
with open("/tmp/amass_output.json", "r") as f:
for line in f:
data = json.loads(line.strip())
subdomains.append(data)
except Exception as e:
logger.error(f"Error parsing amass output: {e}")
return {
"tool": "amass",
"domain": domain,
"success": result["success"],
"subdomains": subdomains,
"count": len(subdomains)
}
async def theHarvester(domain: str, sources: str = "all") -> Dict[str, Any]:
"""Email and subdomain harvester"""
cmd = ["theHarvester", "-d", domain, "-b", sources, "-f", "/tmp/theharvester_output"]
result = await run_command(cmd, timeout=300)
return {
"tool": "theHarvester",
"domain": domain,
"success": result["success"],
"output": result.get("stdout", "")
}
async def recon_ng(workspace: str, module: str, target: str) -> Dict[str, Any]:
"""Recon-ng framework"""
cmd = ["recon-ng", "-w", workspace, "-m", module, "-o", f"SOURCE={target}", "-x"]
result = await run_command(cmd, timeout=300)
return {
"tool": "recon-ng",
"workspace": workspace,
"module": module,
"success": result["success"],
"output": result.get("stdout", "")
}
async def shodan_search(query: str, api_key: str) -> Dict[str, Any]:
"""Shodan search integration"""
cmd = ["shodan", "search", "--fields", "ip_str,port,org,hostnames", query]
# Set API key
import os
os.environ['SHODAN_API_KEY'] = api_key
result = await run_command(cmd, timeout=60)
return {
"tool": "shodan",
"query": query,
"success": result["success"],
"results": result.get("stdout", "")
}
async def whatweb(target: str, aggression: int = 1) -> Dict[str, Any]:
"""Web technology fingerprinting"""
cmd = ["whatweb", f"--aggression={aggression}", "--log-json=/tmp/whatweb_output.json", target]
result = await run_command(cmd, timeout=120)
findings = []
try:
with open("/tmp/whatweb_output.json", "r") as f:
findings = json.load(f)
except Exception as e:
logger.error(f"Error parsing whatweb output: {e}")
return {
"tool": "whatweb",
"target": target,
"success": result["success"],
"findings": findings
}
async def wafw00f(target: str) -> Dict[str, Any]:
"""Web Application Firewall detection"""
cmd = ["wafw00f", target, "-o", "/tmp/wafw00f_output.json"]
result = await run_command(cmd, timeout=60)
return {
"tool": "wafw00f",
"target": target,
"success": result["success"],
"output": result.get("stdout", ""),
"waf_detected": "behind" in result.get("stdout", "").lower()
}
# ==================== WEB APPLICATION TESTING ====================
async def burpsuite_scan(target: str, api_key: str, scan_type: str = "crawl_and_audit") -> Dict[str, Any]:
"""Burp Suite Professional API integration"""
import aiohttp
burp_url = "http://localhost:8090/v0.1"
async with aiohttp.ClientSession() as session:
# Create scan
async with session.post(
f"{burp_url}/scan",
json={
"scope": {
"include": [{"rule": target}]
},
"scan_configurations": [{"type": scan_type}]
},
headers={"X-Api-Key": api_key}
) as resp:
if resp.status == 201:
scan_id = (await resp.json()).get("task_id")
return {
"tool": "burpsuite",
"target": target,
"success": True,
"scan_id": scan_id,
"status": "started"
}
return {
"tool": "burpsuite",
"target": target,
"success": False,
"error": "Failed to start scan"
}
async def dirb(target: str, wordlist: Optional[str] = None) -> Dict[str, Any]:
"""Directory brute-forcing with dirb"""
if wordlist is None:
wordlist = "/usr/share/wordlists/dirb/common.txt"
cmd = ["dirb", target, wordlist, "-o", "/tmp/dirb_output.txt", "-S"]
result = await run_command(cmd, timeout=600)
found_dirs = []
try:
with open("/tmp/dirb_output.txt", "r") as f:
for line in f:
if "CODE:" in line:
found_dirs.append(line.strip())
except Exception:
pass
return {
"tool": "dirb",
"target": target,
"success": result["success"],
"found_directories": found_dirs,
"count": len(found_dirs)
}
async def dirbuster(target: str, wordlist: str, threads: int = 10) -> Dict[str, Any]:
"""DirBuster directory enumeration"""
# DirBuster headless mode
cmd = ["dirbuster", "-u", target, "-l", wordlist, "-t", str(threads), "-r", "/tmp/dirbuster_output.txt"]
result = await run_command(cmd, timeout=900)
return {
"tool": "dirbuster",
"target": target,
"success": result["success"],
"output_file": "/tmp/dirbuster_output.txt"
}
async def wfuzz(target: str, wordlist: str, filter_codes: Optional[List[int]] = None) -> Dict[str, Any]:
"""Advanced web fuzzer"""
cmd = ["wfuzz", "-w", wordlist]
if filter_codes:
cmd.extend(["--hc", ",".join(map(str, filter_codes))])
cmd.extend(["-o", "json", target])
result = await run_command(cmd, timeout=600)
return {
"tool": "wfuzz",
"target": target,
"success": result["success"],
"output": result.get("stdout", "")
}
async def commix(target: str, data: Optional[str] = None) -> Dict[str, Any]:
"""Command injection exploitation tool"""
cmd = ["commix", "--url", target, "--batch"]
if data:
cmd.extend(["--data", data])
result = await run_command(cmd, timeout=900)
return {
"tool": "commix",
"target": target,
"success": result["success"],
"vulnerable": "vulnerable" in result.get("stdout", "").lower(),
"output": result.get("stdout", "")
}
async def xsstrike(target: str) -> Dict[str, Any]:
"""Advanced XSS detection"""
cmd = ["xsstrike", "-u", target, "--crawl"]
result = await run_command(cmd, timeout=600)
return {
"tool": "xsstrike",
"target": target,
"success": result["success"],
"vulnerable": "vulnerable" in result.get("stdout", "").lower(),
"output": result.get("stdout", "")
}
async def ssrf_detector(target: str, callback_url: str) -> Dict[str, Any]:
"""SSRF vulnerability detection"""
# Custom SSRF detection logic
test_payloads = [
f"{callback_url}",
f"http://localhost",
f"http://127.0.0.1",
f"http://169.254.169.254/latest/meta-data/"
]
findings = []
for payload in test_payloads:
# This would integrate with actual testing logic
findings.append({
"payload": payload,
"tested": True
})
return {
"tool": "ssrf_detector",
"target": target,
"success": True,
"findings": findings
}
async def joomscan(target: str) -> Dict[str, Any]:
"""Joomla vulnerability scanner"""
cmd = ["joomscan", "-u", target, "--enumerate-components"]
result = await run_command(cmd, timeout=300)
return {
"tool": "joomscan",
"target": target,
"success": result["success"],
"output": result.get("stdout", "")
}
async def droopescan(target: str, plugin: str = "drupal") -> Dict[str, Any]:
"""Drupal/SilverStripe scanner"""
cmd = ["droopescan", "scan", plugin, "-u", target]
result = await run_command(cmd, timeout=300)
return {
"tool": "droopescan",
"target": target,
"plugin": plugin,
"success": result["success"],
"output": result.get("stdout", "")
}
# ==================== WIRELESS SECURITY ====================
async def aircrack_suite(interface: str, target_bssid: str, channel: int) -> Dict[str, Any]:
"""Complete aircrack-ng suite workflow"""
# Monitor mode
await run_command(["airmon-ng", "start", interface])
# Start airodump
dump_file = "/tmp/airodump"
cmd = ["airodump-ng", "-c", str(channel), "--bssid", target_bssid, "-w", dump_file, f"{interface}mon"]
result = await run_command(cmd, timeout=300)
return {
"tool": "aircrack-suite",
"interface": interface,
"target_bssid": target_bssid,
"success": result["success"],
"capture_file": f"{dump_file}.cap"
}
async def reaver(interface: str, bssid: str) -> Dict[str, Any]:
"""WPS attack with Reaver"""
cmd = ["reaver", "-i", interface, "-b", bssid, "-vv"]
result = await run_command(cmd, timeout=3600)
return {
"tool": "reaver",
"interface": interface,
"bssid": bssid,
"success": result["success"],
"output": result.get("stdout", "")
}
async def wifite(interface: str, wpa_only: bool = True) -> Dict[str, Any]:
"""Automated wireless attack"""
cmd = ["wifite", "-i", interface]
if wpa_only:
cmd.append("--wpa")
result = await run_command(cmd, timeout=3600)
return {
"tool": "wifite",
"interface": interface,
"success": result["success"],
"output": result.get("stdout", "")
}
async def bettercap(interface: str, caplet: Optional[str] = None) -> Dict[str, Any]:
"""Network attack and monitoring framework"""
cmd = ["bettercap", "-iface", interface]
if caplet:
cmd.extend(["-caplet", caplet])
result = await run_command(cmd, timeout=600)
return {
"tool": "bettercap",
"interface": interface,
"success": result["success"],
"output": result.get("stdout", "")
}
async def kismet_scan(interface: str) -> Dict[str, Any]:
"""Wireless network detector"""
cmd = ["kismet", "-c", interface, "--daemonize"]
result = await run_command(cmd, timeout=300)
return {
"tool": "kismet",
"interface": interface,
"success": result["success"]
}
# ==================== SOCIAL ENGINEERING ====================
async def setoolkit(attack_vector: str, payload_type: str, lhost: str) -> Dict[str, Any]:
"""Social Engineering Toolkit"""
# SET would typically be interactive, this is a simplified version
return {
"tool": "setoolkit",
"attack_vector": attack_vector,
"payload_type": payload_type,
"lhost": lhost,
"status": "configured",
"note": "SET requires interactive configuration"
}
async def gophish_campaign(name: str, template: str, targets: List[str]) -> Dict[str, Any]:
"""Phishing campaign with GoPhish"""
# This would integrate with GoPhish API
return {
"tool": "gophish",
"campaign_name": name,
"template": template,
"targets": len(targets),
"status": "created",
"note": "Requires GoPhish API configuration"
}
async def king_phisher(template: str, server: str) -> Dict[str, Any]:
"""King Phisher phishing campaign"""
return {
"tool": "king_phisher",
"template": template,
"server": server,
"status": "ready",
"note": "Requires King Phisher server"
}
# ==================== POST-EXPLOITATION ====================
async def mimikatz(command: str, target: str) -> Dict[str, Any]:
"""Mimikatz credential extraction (Windows)"""
# This would be executed on Windows target
return {
"tool": "mimikatz",
"command": command,
"target": target,
"note": "Requires execution on Windows target"
}
async def bloodhound_ingest(domain: str, username: str, password: str) -> Dict[str, Any]:
"""BloodHound Active Directory enumeration"""
cmd = ["bloodhound-python", "-d", domain, "-u", username, "-p", password, "-c", "All"]
result = await run_command(cmd, timeout=600)
return {
"tool": "bloodhound",
"domain": domain,
"success": result["success"],
"output": result.get("stdout", "")
}
async def empire_agent(listener: str, stager: str) -> Dict[str, Any]:
"""PowerShell Empire agent"""
return {
"tool": "empire",
"listener": listener,
"stager": stager,
"note": "Requires Empire server running"
}
async def crackmapexec(target: str, username: str, password: str, module: str) -> Dict[str, Any]:
"""CrackMapExec post-exploitation"""
cmd = ["crackmapexec", "smb", target, "-u", username, "-p", password, "-M", module]
result = await run_command(cmd, timeout=300)
return {
"tool": "crackmapexec",
"target": target,
"module": module,
"success": result["success"],
"output": result.get("stdout", "")
}
async def linpeas(target_ip: str) -> Dict[str, Any]:
"""Linux privilege escalation enumeration"""
# Would typically download and execute on target
return {
"tool": "linpeas",
"target": target_ip,
"note": "Execute on target system for enumeration"
}
async def winpeas(target_ip: str) -> Dict[str, Any]:
"""Windows privilege escalation enumeration"""
return {
"tool": "winpeas",
"target": target_ip,
"note": "Execute on target system for enumeration"
}
# ==================== MOBILE TESTING ====================
async def mobsf_scan(apk_path: str, api_url: str = "http://localhost:8000") -> Dict[str, Any]:
"""Mobile Security Framework scan"""
import aiohttp
async with aiohttp.ClientSession() as session:
# Upload APK
with open(apk_path, 'rb') as f:
async with session.post(
f"{api_url}/api/v1/upload",
data={'file': f}
) as resp:
if resp.status == 200:
data = await resp.json()
return {
"tool": "mobsf",
"apk": apk_path,
"success": True,
"scan_id": data.get("hash")
}
return {
"tool": "mobsf",
"apk": apk_path,
"success": False
}
async def drozer_scan(package: str) -> Dict[str, Any]:
"""Android security assessment with Drozer"""
cmd = ["drozer", "console", "connect", "-c", f"run app.package.info -a {package}"]
result = await run_command(cmd, timeout=300)
return {
"tool": "drozer",
"package": package,
"success": result["success"],
"output": result.get("stdout", "")
}
async def frida_trace(package: str, function: str) -> Dict[str, Any]:
"""Dynamic instrumentation with Frida"""
cmd = ["frida-trace", "-U", "-f", package, "-i", function]
result = await run_command(cmd, timeout=300)
return {
"tool": "frida",
"package": package,
"function": function,
"success": result["success"],
"output": result.get("stdout", "")
}
# ==================== API TESTING ====================
async def postman_scan(collection_path: str, environment: str) -> Dict[str, Any]:
"""Postman API testing"""
cmd = ["newman", "run", collection_path, "-e", environment]
result = await run_command(cmd, timeout=300)
return {
"tool": "postman_newman",
"collection": collection_path,
"success": result["success"],
"output": result.get("stdout", "")
}
async def rest_api_fuzzer(target: str, spec_file: str) -> Dict[str, Any]:
"""RESTful API fuzzing"""
# Using custom fuzzing logic or tools like RESTler
return {
"tool": "rest_api_fuzzer",
"target": target,
"spec_file": spec_file,
"note": "Implement with RESTler or similar"
}
async def graphql_scanner(endpoint: str) -> Dict[str, Any]:
"""GraphQL security scanner"""
# Check for introspection
import aiohttp
introspection_query = {
"query": "{ __schema { types { name } } }"
}
async with aiohttp.ClientSession() as session:
async with session.post(endpoint, json=introspection_query) as resp:
if resp.status == 200:
data = await resp.json()
return {
"tool": "graphql_scanner",
"endpoint": endpoint,
"success": True,
"introspection_enabled": "data" in data,
"types": data.get("data", {}).get("__schema", {}).get("types", [])
}
return {
"tool": "graphql_scanner",
"endpoint": endpoint,
"success": False
}
# ==================== FORENSICS & REVERSE ENGINEERING ====================
async def volatility_analyze(memory_dump: str, profile: str, plugin: str) -> Dict[str, Any]:
"""Memory forensics with Volatility"""
cmd = ["volatility", "-f", memory_dump, "--profile", profile, plugin]
result = await run_command(cmd, timeout=600)
return {
"tool": "volatility",
"memory_dump": memory_dump,
"profile": profile,
"plugin": plugin,
"success": result["success"],
"output": result.get("stdout", "")
}
async def autopsy_case(evidence_file: str, case_name: str) -> Dict[str, Any]:
"""Digital forensics with Autopsy"""
return {
"tool": "autopsy",
"evidence_file": evidence_file,
"case_name": case_name,
"note": "Requires Autopsy GUI"
}
async def binwalk_extract(firmware_file: str) -> Dict[str, Any]:
"""Firmware analysis with Binwalk"""
cmd = ["binwalk", "-e", firmware_file]
result = await run_command(cmd, timeout=300)
return {
"tool": "binwalk",
"firmware_file": firmware_file,
"success": result["success"],
"output": result.get("stdout", "")
}
async def ghidra_decompile(binary_path: str) -> Dict[str, Any]:
"""Reverse engineering with Ghidra"""
return {
"tool": "ghidra",
"binary": binary_path,
"note": "Requires Ghidra headless analysis"
}
async def radare2_analyze(binary_path: str, command: str = "aaa") -> Dict[str, Any]:
"""Binary analysis with radare2"""
cmd = ["r2", "-c", command, "-q", binary_path]
result = await run_command(cmd, timeout=300)
return {
"tool": "radare2",
"binary": binary_path,
"success": result["success"],
"output": result.get("stdout", "")
}
async def strings_extract(file_path: str, min_length: int = 4) -> Dict[str, Any]:
"""Extract strings from binary"""
cmd = ["strings", "-n", str(min_length), file_path]
result = await run_command(cmd, timeout=60)
return {
"tool": "strings",
"file": file_path,
"success": result["success"],
"strings": result.get("stdout", "").split("\n")
}
# ==================== CLOUD SECURITY ====================
async def scout_suite(provider: str, profile: str) -> Dict[str, Any]:
"""Multi-cloud security auditing"""
cmd = ["scout", provider, "--profile", profile]
result = await run_command(cmd, timeout=600)
return {
"tool": "scout_suite",
"provider": provider,
"profile": profile,
"success": result["success"],
"output": result.get("stdout", "")
}
async def cloudfox_enum(profile: str, service: str) -> Dict[str, Any]:
"""AWS enumeration with CloudFox"""
cmd = ["cloudfox", "aws", "--profile", profile, service]
result = await run_command(cmd, timeout=300)
return {
"tool": "cloudfox",
"profile": profile,
"service": service,
"success": result["success"],
"output": result.get("stdout", "")
}
async def pacu_module(profile: str, module: str) -> Dict[str, Any]:
"""AWS exploitation framework"""
return {
"tool": "pacu",
"profile": profile,
"module": module,
"note": "Requires Pacu interactive session"
}
# ==================== CONTAINER SECURITY ====================
async def trivy_scan(image: str) -> Dict[str, Any]:
"""Container vulnerability scanner"""
cmd = ["trivy", "image", "--format", "json", "-o", "/tmp/trivy_output.json", image]
result = await run_command(cmd, timeout=300)
findings = []
try:
with open("/tmp/trivy_output.json", "r") as f:
findings = json.load(f)
except Exception as e:
logger.error(f"Error parsing trivy output: {e}")
return {
"tool": "trivy",
"image": image,
"success": result["success"],
"findings": findings
}
async def docker_bench_security() -> Dict[str, Any]:
"""Docker security audit"""
cmd = ["docker-bench-security"]
result = await run_command(cmd, timeout=300)
return {
"tool": "docker_bench_security",
"success": result["success"],
"output": result.get("stdout", "")
}
async def clair_scan(image: str) -> Dict[str, Any]:
"""Clair vulnerability scanner for containers"""
return {
"tool": "clair",
"image": image,
"note": "Requires Clair server"
}
# ==================== EXISTING TOOLS (from original file) ====================
async def nikto_scan(target: str, ssl: bool = False, port: int = 80) -> Dict[str, Any]:
"""Perform Nikto web server scan"""
cmd = ["nikto", "-h", target, "-port", str(port)]
if ssl:
cmd.append("-ssl")
cmd.extend(["-Format", "json", "-output", "/tmp/nikto_output.json"])
result = await run_command(cmd, timeout=900)
findings = []
try:
with open("/tmp/nikto_output.json", "r") as f:
nikto_data = json.load(f)
findings = nikto_data.get("vulnerabilities", [])
except Exception as e:
logger.error(f"Error parsing Nikto output: {e}")
return {
"tool": "nikto",
"target": target,
"port": port,
"ssl": ssl,
"success": result["success"],
"findings": findings,
"raw_output": result.get("stdout", "")
}
async def sqlmap_scan(target: str, data: Optional[str] = None, cookie: Optional[str] = None, level: int = 1, risk: int = 1) -> Dict[str, Any]:
"""SQL injection testing"""
cmd = ["sqlmap", "-u", target, "--batch", f"--level={level}", f"--risk={risk}"]
if data:
cmd.extend(["--data", data])
if cookie:
cmd.extend(["--cookie", cookie])
cmd.extend(["--output-dir=/tmp/sqlmap"])
result = await run_command(cmd, timeout=900)
vulnerable = "sqlmap identified" in result.get("stdout", "").lower()
return {
"tool": "sqlmap",
"target": target,
"success": result["success"],
"vulnerable": vulnerable,
"details": result.get("stdout", "")
}
async def gobuster_scan(target: str, wordlist: Optional[str] = None, extensions: Optional[List[str]] = None) -> Dict[str, Any]:
"""Directory and file brute-forcing"""
if wordlist is None:
wordlist = "/usr/share/wordlists/dirb/common.txt"
cmd = ["gobuster", "dir", "-u", target, "-w", wordlist, "-q"]
if extensions:
cmd.extend(["-x", ",".join(extensions)])
result = await run_command(cmd, timeout=600)
found_paths = []
if result["success"]:
for line in result["stdout"].split("\n"):
if line.startswith("/"):
parts = line.split()
if len(parts) >= 2:
found_paths.append({
"path": parts[0],
"status_code": parts[1] if len(parts) > 1 else None
})
return {
"tool": "gobuster",
"target": target,
"wordlist": wordlist,
"success": result["success"],
"found_paths": found_paths,
"count": len(found_paths)
}
async def nuclei_scan(target: str, templates: Optional[List[str]] = None, severity: Optional[List[str]] = None) -> Dict[str, Any]:
"""Nuclei vulnerability scanner"""
cmd = ["nuclei", "-u", target, "-json", "-o", "/tmp/nuclei_output.json"]
if templates:
for template in templates:
cmd.extend(["-t", template])
if severity:
cmd.extend(["-severity", ",".join(severity)])
result = await run_command(cmd, timeout=600)
vulnerabilities = []
try:
with open("/tmp/nuclei_output.json", "r") as f:
for line in f:
vuln = json.loads(line.strip())
vulnerabilities.append(vuln)
except Exception as e:
logger.error(f"Error parsing Nuclei output: {e}")
return {
"tool": "nuclei",
"target": target,
"success": result["success"],
"vulnerabilities": vulnerabilities,
"count": len(vulnerabilities)
}
async def wpscan(target: str, enumerate: str = "vp", api_token: Optional[str] = None) -> Dict[str, Any]:
"""WordPress vulnerability scanner"""
cmd = ["wpscan", "--url", target, "--enumerate", enumerate, "--format", "json"]
if api_token:
cmd.extend(["--api-token", api_token])
cmd.extend(["-o", "/tmp/wpscan_output.json"])
result = await run_command(cmd, timeout=600)
findings = {}
try:
with open("/tmp/wpscan_output.json", "r") as f:
findings = json.load(f)
except Exception as e:
logger.error(f"Error parsing WPScan output: {e}")
return {
"tool": "wpscan",
"target": target,
"success": result["success"],
"findings": findings
}
async def hydra_bruteforce(target: str, service: str, username: Optional[str] = None, password_list: str = "/usr/share/wordlists/rockyou.txt", port: Optional[int] = None) -> Dict[str, Any]:
"""Network login brute-force"""
cmd = ["hydra"]
if username:
cmd.extend(["-l", username])
else:
cmd.extend(["-L", "/usr/share/wordlists/metasploit/unix_users.txt"])
cmd.extend(["-P", password_list, "-t", "4"])
if port:
cmd.extend(["-s", str(port)])
cmd.extend([target, service])
result = await run_command(cmd, timeout=1800)
credentials = []
if result["success"]:
for line in result["stdout"].split("\n"):
if "login:" in line and "password:" in line:
credentials.append(line.strip())
return {
"tool": "hydra",
"target": target,
"service": service,
"success": result["success"],
"credentials_found": credentials,
"count": len(credentials)
}
async def metasploit_search(query: str, type: Optional[str] = None) -> Dict[str, Any]:
"""Search Metasploit modules"""
cmd = ["msfconsole", "-q", "-x", f"search {query}; exit"]
if type:
cmd = ["msfconsole", "-q", "-x", f"search type:{type} {query}; exit"]
result = await run_command(cmd, timeout=60)
modules = []
if result["success"]:
lines = result["stdout"].split("\n")
for line in lines:
if "exploit/" in line or "auxiliary/" in line or "post/" in line:
parts = line.split()
if len(parts) >= 2:
modules.append({
"name": parts[0],
"description": " ".join(parts[1:])
})
return {
"tool": "metasploit",
"query": query,
"modules": modules,
"count": len(modules)
}
async def searchsploit(query: str) -> Dict[str, Any]:
"""Search Exploit-DB"""
cmd = ["searchsploit", "-j", query]
result = await run_command(cmd, timeout=30)
exploits = []
if result["success"]:
try:
data = json.loads(result["stdout"])
exploits = data.get("RESULTS_EXPLOIT", [])
except json.JSONDecodeError:
pass
return {
"tool": "searchsploit",
"query": query,
"exploits": exploits,
"count": len(exploits)
}
# Include other existing functions from original tools.py...
# (dns_enum, enum4linux, ssl_scan, owasp_zap_scan, ffuf_fuzz, john_crack, hashcat_crack, tcpdump_capture, snmp_check, etc.)
async def dns_enum(domain: str, record_types: List[str] = None) -> Dict[str, Any]:
"""DNS enumeration"""
if record_types is None:
record_types = ["A", "AAAA", "MX", "NS", "TXT", "SOA"]
records = {}
for record_type in record_types:
cmd = ["dig", "+short", domain, record_type]
result = await run_command(cmd, timeout=30)
if result["success"] and result["stdout"].strip():
records[record_type] = result["stdout"].strip().split("\n")
subdomains = []
cmd = ["sublist3r", "-d", domain, "-n"]
result = await run_command(cmd, timeout=300)
if result["success"]:
subdomains = [line.strip() for line in result["stdout"].split("\n") if line.strip()]
return {
"tool": "dns_enum",
"domain": domain,
"records": records,
"subdomains": subdomains,
"subdomain_count": len(subdomains)
}
async def ssl_scan(target: str, port: int = 443) -> Dict[str, Any]:
"""SSL/TLS configuration scan"""
cmd = ["sslscan", f"{target}:{port}"]
result = await run_command(cmd, timeout=120)
findings = {
"weak_ciphers": [],
"vulnerabilities": [],
"certificate_info": {}
}
if result["success"]:
output = result["stdout"]
if "heartbleed" in output.lower():
findings["vulnerabilities"].append("Heartbleed")
if "poodle" in output.lower():
findings["vulnerabilities"].append("POODLE")
if "beast" in output.lower():
findings["vulnerabilities"].append("BEAST")
for line in output.split("\n"):
if "weak" in line.lower() or "export" in line.lower():
findings["weak_ciphers"].append(line.strip())
return {
"tool": "sslscan",
"target": target,
"port": port,
"success": result["success"],
"findings": findings
}
async def vulnerability_assessment(target: str, assessment_type: str = "comprehensive") -> Dict[str, Any]:
"""Comprehensive vulnerability assessment"""
results = {
"target": target,
"assessment_type": assessment_type,
"scans": {}
}
if assessment_type in ["web", "comprehensive"]:
results["scans"]["nikto"] = await nikto_scan(target)
results["scans"]["nuclei"] = await nuclei_scan(target)
if assessment_type in ["network", "comprehensive"]:
results["scans"]["nmap"] = await nmap_scan(target, scan_type="full")
results["scans"]["ssl"] = await ssl_scan(target)
return results
# Add remaining helper functions as needed...
async def enum4linux(target: str, username: Optional[str] = None, password: Optional[str] = None) -> Dict[str, Any]:
"""SMB enumeration"""
cmd = ["enum4linux", "-a", target]
if username and password:
cmd.extend(["-u", username, "-p", password])
result = await run_command(cmd, timeout=300)
findings = {
"shares": [],
"users": [],
"groups": [],
"os_info": None
}
if result["success"]:
output = result["stdout"]
shares_match = re.findall(r"Sharename.*?Type.*?Comment\n(.*?)(?=\n\n|\Z)", output, re.DOTALL)
if shares_match:
for line in shares_match[0].split("\n"):
if line.strip() and not line.startswith("---"):
findings["shares"].append(line.strip())
users_match = re.findall(r"user:\[(.*?)\]", output)
findings["users"] = users_match
groups_match = re.findall(r"group:\[(.*?)\]", output)
findings["groups"] = groups_match
return {
"tool": "enum4linux",
"target": target,
"success": result["success"],
"findings": findings
}
async def owasp_zap_scan(target: str, scan_type: str = "quick") -> Dict[str, Any]:
"""OWASP ZAP scan"""
cmd = ["zap-cli", "quick-scan", target]
if scan_type == "full":
cmd = ["zap-cli", "active-scan", target]
result = await run_command(cmd, timeout=1800)
return {
"tool": "owasp_zap",
"target": target,
"scan_type": scan_type,
"success": result["success"],
"output": result.get("stdout", "")
}
async def ffuf_fuzz(target: str, wordlist: str, method: str = "GET") -> Dict[str, Any]:
"""Fast web fuzzer"""
cmd = ["ffuf", "-u", target, "-w", wordlist, "-X", method, "-o", "/tmp/ffuf_output.json", "-of", "json"]
result = await run_command(cmd, timeout=600)
findings = []
try:
with open("/tmp/ffuf_output.json", "r") as f:
data = json.load(f)
findings = data.get("results", [])
except Exception as e:
logger.error(f"Error parsing ffuf output: {e}")
return {
"tool": "ffuf",
"target": target,
"method": method,
"success": result["success"],
"findings": findings,
"count": len(findings)
}
async def john_crack(hash_file: str, wordlist: Optional[str] = None, format: Optional[str] = None) -> Dict[str, Any]:
"""John the Ripper password cracking"""
cmd = ["john"]
if format:
cmd.extend([f"--format={format}"])
if wordlist:
cmd.extend([f"--wordlist={wordlist}"])
cmd.append(hash_file)
result = await run_command(cmd, timeout=3600)
show_cmd = ["john", "--show", hash_file]
show_result = await run_command(show_cmd, timeout=10)
cracked = []
if show_result["success"]:
for line in show_result["stdout"].split("\n"):
if ":" in line:
cracked.append(line.strip())
return {
"tool": "john",
"hash_file": hash_file,
"success": result["success"],
"cracked_passwords": cracked,
"count": len(cracked)
}
async def hashcat_crack(hash: str, hash_type: int, wordlist: Optional[str] = None, attack_mode: int = 0) -> Dict[str, Any]:
"""Hashcat GPU password cracking"""
hash_file = "/tmp/hashcat_input.txt"
with open(hash_file, "w") as f:
f.write(hash)
cmd = ["hashcat", "-m", str(hash_type), "-a", str(attack_mode), hash_file]
if wordlist:
cmd.append(wordlist)
cmd.extend(["--outfile", "/tmp/hashcat_output.txt"])
result = await run_command(cmd, timeout=3600)
cracked = None
try:
with open("/tmp/hashcat_output.txt", "r") as f:
cracked = f.read().strip()
except Exception:
pass
return {
"tool": "hashcat",
"hash_type": hash_type,
"success": result["success"],
"cracked": cracked
}
async def tcpdump_capture(interface: str, filter: Optional[str] = None, duration: int = 60, output_file: Optional[str] = None) -> Dict[str, Any]:
"""Capture network packets"""
if output_file is None:
output_file = f"/tmp/capture_{interface}.pcap"
cmd = ["timeout", str(duration), "tcpdump", "-i", interface, "-w", output_file]
if filter:
cmd.append(filter)
result = await run_command(cmd, timeout=duration + 10)
return {
"tool": "tcpdump",
"interface": interface,
"duration": duration,
"output_file": output_file,
"success": result["success"]
}
async def snmp_check(target: str, community: str = "public") -> Dict[str, Any]:
"""SNMP enumeration"""
cmd = ["snmp-check", target, "-c", community]
result = await run_command(cmd, timeout=120)
return {
"tool": "snmp-check",
"target": target,
"community": community,
"success": result["success"],
"output": result.get("stdout", "")
}
# Placeholder functions for tools that need more complex implementation
async def smbclient_enum(target: str) -> Dict[str, Any]:
"""SMB share enumeration"""
cmd = ["smbclient", "-L", target, "-N"]
result = await run_command(cmd, timeout=60)
return {
"tool": "smbclient",
"target": target,
"success": result["success"],
"output": result.get("stdout", "")
}
async def wfuzz_scan(target: str, wordlist: str) -> Dict[str, Any]:
"""Web application fuzzer"""
return await wfuzz(target, wordlist)
async def aircrack_ng(capture_file: str, wordlist: str) -> Dict[str, Any]:
"""Wireless password cracking"""
cmd = ["aircrack-ng", capture_file, "-w", wordlist]
result = await run_command(cmd, timeout=1800)
key_found = "KEY FOUND" in result.get("stdout", "")
return {
"tool": "aircrack-ng",
"capture_file": capture_file,
"success": result["success"],
"key_found": key_found,
"output": result.get("stdout", "")
}
async def wireshark_analyze(pcap_file: str) -> Dict[str, Any]:
"""Analyze packet capture with tshark"""
cmd = ["tshark", "-r", pcap_file, "-q", "-z", "conv,tcp"]
result = await run_command(cmd, timeout=120)
return {
"tool": "wireshark",
"pcap_file": pcap_file,
"success": result["success"],
"analysis": result.get("stdout", "")
}
async def burpsuite_scan(target: str) -> Dict[str, Any]:
"""Burp Suite scan (requires Burp Suite Pro)"""
return {
"tool": "burpsuite",
"target": target,
"success": False,
"note": "Requires Burp Suite Pro with REST API enabled"
}
async def exploit_db_search(query: str) -> Dict[str, Any]:
"""Search Exploit-DB (alias for searchsploit)"""
return await searchsploit(query)