"""
Credential Extraction Tool
Extracts potential credentials and sensitive tokens from PCAP files for security audits.
"""
import base64
import logging
import re
import subprocess
from typing import Dict, Any, List
logger = logging.getLogger(__name__)
def _run_tshark_fields(pcap_path: str, display_filter: str, fields: List[str], max_packets: int = 10000) -> List[List[str]]:
"""Run tshark with field extraction and return parsed rows."""
cmd = [
"tshark", "-r", pcap_path,
"-T", "fields",
"-c", str(max_packets),
]
if display_filter:
cmd.extend(["-Y", display_filter])
for field in fields:
cmd.extend(["-e", field])
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True, timeout=60
)
rows = []
for line in result.stdout.decode("utf-8").strip().split("\n"):
if line.strip():
rows.append(line.split("\t"))
return rows
def _safe_run(pcap_path: str, display_filter: str, fields: List[str]) -> List[List[str]]:
"""Run tshark, returning empty list on any error."""
try:
return _run_tshark_fields(pcap_path, display_filter, fields)
except Exception:
return []
def extract_credentials_execute(
project_name: str,
pcap_name: str
) -> Dict[str, Any]:
"""
Extract potential credentials from a PCAP file in a project.
Scans for credentials and sensitive tokens across protocols:
- HTTP Basic Auth, Authorization headers, cookies, form POST data
- FTP USER/PASS
- Telnet login/password
- SMTP AUTH
- IMAP/POP3 login
- SNMPv1/v2 community strings
- Kerberos hashes (crackable offline)
"""
try:
from .workspace_sync import get_pcap_path as get_project_pcap_path
pcap_path = get_project_pcap_path(project_name, pcap_name)
if not pcap_path:
return {"ok": False, "error": f"PCAP '{pcap_name}' not found in project '{project_name}'"}
logger.info(f"Extracting credentials from: {pcap_path}")
plaintext_creds = []
encrypted_creds = []
sensitive_tokens = []
protocols_scanned = []
# ── HTTP Basic Auth ──
rows = _safe_run(pcap_path, "http.authbasic", [
"frame.number", "ip.src", "ip.dst", "http.authbasic", "http.host", "http.request.uri"
])
if rows:
protocols_scanned.append("HTTP Basic Auth")
for row in rows:
if len(row) >= 4 and row[3]:
try:
decoded = base64.b64decode(row[3]).decode("utf-8", errors="replace")
if ":" in decoded:
username, password = decoded.split(":", 1)
plaintext_creds.append({
"type": "HTTP Basic Auth",
"username": username,
"password": password,
"host": row[4] if len(row) > 4 else "",
"uri": row[5] if len(row) > 5 else "",
"src_ip": row[1],
"dst_ip": row[2],
"frame": row[0],
})
except Exception:
pass
# ── HTTP Authorization headers (Bearer, Token, API keys) ──
rows = _safe_run(pcap_path, "http.authorization", [
"frame.number", "ip.src", "ip.dst", "http.authorization", "http.host"
])
if rows:
protocols_scanned.append("HTTP Authorization headers")
for row in rows:
if len(row) >= 4 and row[3]:
auth_val = row[3].strip()
if auth_val.lower().startswith("basic "):
continue # Already handled above
sensitive_tokens.append({
"type": "HTTP Authorization",
"value": auth_val[:200],
"host": row[4] if len(row) > 4 else "",
"src_ip": row[1],
"dst_ip": row[2],
"frame": row[0],
})
# ── HTTP Cookies ──
rows = _safe_run(pcap_path, "http.cookie", [
"frame.number", "ip.src", "ip.dst", "http.cookie", "http.host"
])
if rows:
protocols_scanned.append("HTTP Cookies")
for row in rows:
if len(row) >= 4 and row[3]:
cookie_val = row[3].strip()
interesting = any(k in cookie_val.lower() for k in [
"session", "token", "auth", "jwt", "sid", "phpsessid", "jsessionid"
])
if interesting:
sensitive_tokens.append({
"type": "HTTP Cookie (session/auth)",
"value": cookie_val[:300],
"host": row[4] if len(row) > 4 else "",
"src_ip": row[1],
"dst_ip": row[2],
"frame": row[0],
})
# ── HTTP POST form data (look for password fields) ──
rows = _safe_run(pcap_path, "http.request.method == \"POST\" && http.file_data", [
"frame.number", "ip.src", "ip.dst", "http.file_data", "http.host", "http.request.uri"
])
if rows:
protocols_scanned.append("HTTP POST form data")
for row in rows:
if len(row) >= 4 and row[3]:
body = row[3]
pw_patterns = re.findall(
r'(?:password|passwd|pass|pwd|secret|token|api_key|apikey)=([^&\s]{1,100})',
body, re.IGNORECASE
)
if pw_patterns:
plaintext_creds.append({
"type": "HTTP POST form",
"password_fields": pw_patterns[:5],
"host": row[4] if len(row) > 4 else "",
"uri": row[5] if len(row) > 5 else "",
"src_ip": row[1],
"dst_ip": row[2],
"frame": row[0],
})
# ── FTP USER/PASS ──
rows = _safe_run(pcap_path, "ftp.request.command", [
"frame.number", "ip.src", "ip.dst", "ftp.request.command", "ftp.request.arg"
])
if rows:
protocols_scanned.append("FTP")
ftp_users = {}
for row in rows:
if len(row) < 5:
continue
cmd, arg = row[3], row[4]
if cmd == "USER" and arg:
ftp_users[row[1]] = {"username": arg, "frame": row[0]}
elif cmd == "PASS" and arg:
user_info = ftp_users.get(row[1], {})
plaintext_creds.append({
"type": "FTP",
"username": user_info.get("username", "unknown"),
"password": arg,
"src_ip": row[1],
"dst_ip": row[2],
"frame": row[0],
})
# ── Telnet ──
rows = _safe_run(pcap_path, "telnet.data", [
"frame.number", "ip.src", "ip.dst", "telnet.data"
])
if rows:
protocols_scanned.append("Telnet")
telnet_state = {}
for row in rows:
if len(row) < 4 or not row[3]:
continue
data = row[3].strip()
conn_key = f"{row[1]}-{row[2]}"
if "login:" in data.lower() or "username:" in data.lower():
telnet_state[conn_key] = {"waiting_for": "username", "frame": row[0]}
elif "password:" in data.lower():
telnet_state[conn_key] = {"waiting_for": "password", "frame": row[0]}
elif conn_key in telnet_state and data and len(data) < 100:
state = telnet_state[conn_key]
plaintext_creds.append({
"type": "Telnet",
"credential_type": state["waiting_for"],
"value": data,
"src_ip": row[1],
"dst_ip": row[2],
"frame": row[0],
})
del telnet_state[conn_key]
# ── SMTP AUTH ──
rows = _safe_run(pcap_path, "smtp.req.command == \"AUTH\"", [
"frame.number", "ip.src", "ip.dst", "smtp.req.parameter"
])
if rows:
protocols_scanned.append("SMTP AUTH")
for row in rows:
if len(row) >= 4 and row[3]:
param = row[3].strip()
parts = param.split()
if len(parts) >= 2:
mechanism = parts[0]
encoded = parts[1] if len(parts) > 1 else ""
decoded_val = ""
if encoded:
try:
decoded_val = base64.b64decode(encoded).decode("utf-8", errors="replace")
except Exception:
decoded_val = encoded
plaintext_creds.append({
"type": "SMTP AUTH",
"mechanism": mechanism,
"credential_data": decoded_val or encoded,
"src_ip": row[1],
"dst_ip": row[2],
"frame": row[0],
})
# ── IMAP LOGIN ──
rows = _safe_run(pcap_path, "imap.request contains \"LOGIN\"", [
"frame.number", "ip.src", "ip.dst", "imap.request"
])
if rows:
protocols_scanned.append("IMAP")
for row in rows:
if len(row) >= 4 and row[3] and "LOGIN" in row[3].upper():
plaintext_creds.append({
"type": "IMAP LOGIN",
"command": row[3][:200],
"src_ip": row[1],
"dst_ip": row[2],
"frame": row[0],
})
# ── POP3 USER/PASS ──
rows = _safe_run(pcap_path, "pop.request.command == \"USER\" || pop.request.command == \"PASS\"", [
"frame.number", "ip.src", "ip.dst", "pop.request.command", "pop.request.parameter"
])
if rows:
protocols_scanned.append("POP3")
pop3_users = {}
for row in rows:
if len(row) < 5:
continue
cmd, param = row[3], row[4]
if cmd == "USER" and param:
pop3_users[row[1]] = {"username": param, "frame": row[0]}
elif cmd == "PASS" and param:
user_info = pop3_users.get(row[1], {})
plaintext_creds.append({
"type": "POP3",
"username": user_info.get("username", "unknown"),
"password": param,
"src_ip": row[1],
"dst_ip": row[2],
"frame": row[0],
})
# ── SNMP community strings ──
rows = _safe_run(pcap_path, "snmp.community", [
"frame.number", "ip.src", "ip.dst", "snmp.community", "snmp.version"
])
if rows:
protocols_scanned.append("SNMP community strings")
seen_communities = set()
for row in rows:
if len(row) >= 4 and row[3]:
community = row[3].strip()
key = f"{row[1]}-{row[2]}-{community}"
if key not in seen_communities:
seen_communities.add(key)
plaintext_creds.append({
"type": "SNMP Community String",
"community": community,
"version": row[4] if len(row) > 4 else "",
"src_ip": row[1],
"dst_ip": row[2],
"frame": row[0],
})
# ── Kerberos hashes ──
rows = _safe_run(pcap_path, "kerberos.cipher", [
"frame.number", "kerberos.CNameString", "kerberos.realm",
"kerberos.cipher", "kerberos.msg_type"
])
if rows:
protocols_scanned.append("Kerberos")
for row in rows:
if len(row) < 5:
continue
frame, cname, realm, cipher, msg_type = row[0], row[1], row[2], row[3], row[4]
if cipher and cname:
hash_format = ""
cracking_command = ""
if msg_type in ["10", "30"]:
hash_format = f"$krb5pa$23${cname}${realm}${cipher[:80]}..."
cracking_command = "hashcat -m 7500 hash.txt wordlist.txt"
elif msg_type == "11":
hash_format = f"$krb5asrep$23${cname}@{realm}${cipher[:80]}..."
cracking_command = "hashcat -m 18200 hash.txt wordlist.txt"
if hash_format:
encrypted_creds.append({
"type": "Kerberos",
"username": cname,
"realm": realm,
"hash_preview": hash_format,
"cracking_command": cracking_command,
"frame": frame,
})
# ── Build protocol presence summary ──
protocol_presence = {}
presence_checks = {
"HTTP (unencrypted)": "http",
"FTP": "ftp",
"Telnet": "telnet",
"SMTP": "smtp",
"IMAP": "imap",
"POP3": "pop",
"SNMP": "snmp",
"Kerberos": "kerberos",
"TLS/SSL": "tls",
"SSH": "ssh",
}
for proto_name, filter_str in presence_checks.items():
try:
count_rows = _run_tshark_fields(pcap_path, filter_str, ["frame.number"], max_packets=1)
if count_rows and count_rows[0][0]:
protocol_presence[proto_name] = "present"
else:
protocol_presence[proto_name] = "not found"
except Exception:
protocol_presence[proto_name] = "not found"
logger.info(f"Found {len(plaintext_creds)} plaintext, {len(encrypted_creds)} encrypted, {len(sensitive_tokens)} tokens")
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"plaintext_credentials": plaintext_creds,
"encrypted_credentials": encrypted_creds,
"sensitive_tokens": sensitive_tokens,
"total_plaintext": len(plaintext_creds),
"total_encrypted": len(encrypted_creds),
"total_tokens": len(sensitive_tokens),
"protocols_scanned": protocols_scanned,
"protocol_presence": protocol_presence,
"security_notes": {
"plaintext_warning": "Plaintext credentials were transmitted unencrypted and are immediately usable by attackers" if plaintext_creds else None,
"encrypted_note": "Encrypted credentials can be cracked offline using tools like hashcat or john" if encrypted_creds else None,
"token_warning": "Session tokens and API keys can be replayed for unauthorized access" if sensitive_tokens else None,
"all_clear": "No credentials or sensitive tokens found in cleartext. Traffic may be encrypted (TLS/IPsec)." if not (plaintext_creds or encrypted_creds or sensitive_tokens) else None,
"recommendations": [
rec for rec in [
"Use TLS/SSL for all authentication protocols" if protocol_presence.get("HTTP (unencrypted)") == "present" else None,
"Use SSH instead of Telnet" if protocol_presence.get("Telnet") == "present" else None,
"Use SFTP or FTPS instead of plain FTP" if protocol_presence.get("FTP") == "present" else None,
"Use SNMPv3 with authentication instead of community strings" if protocol_presence.get("SNMP") == "present" else None,
"Use encrypted SMTP (STARTTLS or SMTPS) for email" if protocol_presence.get("SMTP") == "present" else None,
"Use IMAPS/POP3S for encrypted email retrieval" if protocol_presence.get("IMAP") == "present" or protocol_presence.get("POP3") == "present" else None,
] if rec
],
}
}
except subprocess.TimeoutExpired:
logger.error("tshark command timed out")
return {"ok": False, "error": "Analysis timed out after 60 seconds"}
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode("utf-8") if e.stderr else str(e)
logger.error(f"tshark command failed: {error_msg}")
return {"ok": False, "error": f"tshark analysis failed: {error_msg}"}
except Exception as e:
logger.error(f"Error extracting credentials: {e}", exc_info=True)
return {"ok": False, "error": f"Analysis failed: {str(e)}"}