"""
Parser utilities for Active Directory tools
Handles parsing output from various AD tools
"""
import re
import json
from typing import List, Dict, Any, Optional
def parse_netexec_output(output: str, output_type: str) -> List[Dict[str, Any]]:
"""
Parse NetExec output based on type
"""
results = []
if output_type == "users":
lines = output.split('\n')
for line in lines:
line = line.strip()
if line and not line.startswith('[') and not line.startswith('SMB'):
parts = line.split()
if parts:
username = parts[0]
results.append({
"username": username,
"full_line": line
})
elif output_type == "shares":
lines = output.split('\n')
for line in lines:
line = line.strip()
if line and 'READ' in line or 'WRITE' in line:
parts = line.split()
if len(parts) >= 3:
share_name = parts[0]
permissions = ' '.join(parts[1:])
results.append({
"name": share_name,
"permissions": permissions,
"full_line": line
})
return results
def parse_impacket_hashes(output: str, hash_type: str) -> List[Dict[str, str]]:
"""
Parse Impacket hash dumping output
"""
hashes = []
lines = output.split('\n')
for line in lines:
line = line.strip()
if hash_type == "asrep" and '$krb5asrep$' in line:
parts = line.split('$krb5asrep$')
if len(parts) > 1:
hash_part = '$krb5asrep$' + parts[1]
username_match = re.search(r'(\w+)@.*?:', line)
username = username_match.group(1) if username_match else "unknown"
hashes.append({
"username": username,
"hash": hash_part
})
elif hash_type == "tgs" and '$krb5tgs$' in line:
parts = line.split('$krb5tgs$')
if len(parts) > 1:
hash_part = '$krb5tgs$' + parts[1]
username_match = re.search(r'(\w+)@.*?:', line)
username = username_match.group(1) if username_match else "unknown"
spn_match = re.search(r'(\w+/\w+@\w+)', line)
spn = spn_match.group(1) if spn_match else None
hashes.append({
"username": username,
"hash": hash_part,
"spn": spn
})
elif hash_type == "ntlm" and ':' in line and len(line.split(':')) >= 4:
parts = line.split(':')
if len(parts) >= 4:
username = parts[0]
sid = parts[1]
lm_hash = parts[2]
nt_hash = parts[3]
hash_value = nt_hash if nt_hash and nt_hash != '31d6cfe0d16ae931b73c59d7e0c089c0' else lm_hash
if hash_value and hash_value != 'aad3b435b51404eeaad3b435b51404ee':
hashes.append({
"username": username,
"hash": hash_value,
"sid": sid,
"lm_hash": lm_hash,
"nt_hash": nt_hash
})
return hashes
def parse_bloodhound_json(json_files: List[str], analysis_type: str = "all") -> Dict[str, Any]:
"""
Parse BloodHound JSON files for quick analysis
"""
results = {
"kerberoastable": [],
"asreproastable": [],
"high_privilege": [],
"unconstrained_delegation": [],
"constrained_delegation": []
}
for json_file in json_files:
try:
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if "users" in json_file.lower():
for user in data:
if user.get("hasspn", False):
results["kerberoastable"].append(user.get("name", ""))
if user.get("dontreqpreauth", False):
results["asreproastable"].append(user.get("name", ""))
elif "computers" in json_file.lower():
for computer in data:
if computer.get("unconstraineddelegation", False):
results["unconstrained_delegation"].append(computer.get("name", ""))
if computer.get("allowedtodelegate", []):
results["constrained_delegation"].append(computer.get("name", ""))
except Exception as e:
print(f"Error parsing {json_file}: {e}")
continue
return results
def parse_certipy_output(output: str) -> List[Dict[str, str]]:
"""
Parse Certipy output for ADCS vulnerabilities
"""
vulnerabilities = []
lines = output.split('\n')
current_vuln = None
for line in lines:
line = line.strip()
if "ESC1" in line or "ESC2" in line or "ESC3" in line or "ESC4" in line or "ESC8" in line or "ESC9" in line or "ESC10" in line or "ESC11" in line or "ESC13" in line:
if current_vuln:
vulnerabilities.append(current_vuln)
vuln_type = line.split()[0] if line.split() else "Unknown"
current_vuln = {
"type": vuln_type,
"description": line,
"recommendation": ""
}
elif current_vuln and ("vulnerable" in line.lower() or "misconfigured" in line.lower()):
current_vuln["description"] += f" {line}"
elif current_vuln and ("recommend" in line.lower() or "fix" in line.lower()):
current_vuln["recommendation"] = line
if current_vuln:
vulnerabilities.append(current_vuln)
return vulnerabilities