"""
TLS Analysis Tool
Extracts SNI, cipher suites, TLS versions, handshake details,
certificate info, and identifies weak configurations.
"""
import logging
import subprocess
from collections import Counter
from typing import Dict, Any, List
logger = logging.getLogger(__name__)
TLS_VERSION_MAP = {
"0x0300": "SSL 3.0",
"0x0301": "TLS 1.0",
"0x0302": "TLS 1.1",
"0x0303": "TLS 1.2",
"0x0304": "TLS 1.3",
"768": "SSL 3.0",
"769": "TLS 1.0",
"770": "TLS 1.1",
"771": "TLS 1.2",
"772": "TLS 1.3",
}
WEAK_CIPHERS_KEYWORDS = [
"RC4", "DES", "3DES", "NULL", "EXPORT", "anon", "MD5",
]
DEPRECATED_VERSIONS = {"SSL 3.0", "TLS 1.0", "TLS 1.1"}
def _run_tshark_fields(pcap_path: str, display_filter: str, fields: List[str], max_packets: int = 10000) -> List[List[str]]:
"""Run tshark with field extraction and return parsed rows."""
cmd = [
"tshark", "-r", pcap_path,
"-T", "fields",
"-c", str(max_packets),
]
if display_filter:
cmd.extend(["-Y", display_filter])
for field in fields:
cmd.extend(["-e", field])
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True, timeout=120
)
rows = []
for line in result.stdout.decode("utf-8").strip().split("\n"):
if line.strip():
rows.append(line.split("\t"))
return rows
def _safe_run(pcap_path: str, display_filter: str, fields: List[str]) -> List[List[str]]:
try:
return _run_tshark_fields(pcap_path, display_filter, fields)
except Exception:
return []
def _safe_count(pcap_path: str, display_filter: str) -> int:
try:
rows = _run_tshark_fields(pcap_path, display_filter, ["frame.number"], max_packets=50000)
return len(rows) if rows and rows[0][0] else 0
except Exception:
return 0
def _resolve_version(raw: str) -> str:
"""Resolve raw tshark version value to a human-readable name."""
return TLS_VERSION_MAP.get(raw.strip(), raw.strip())
def _is_weak_cipher(cipher_name: str) -> bool:
upper = cipher_name.upper()
return any(kw in upper for kw in WEAK_CIPHERS_KEYWORDS)
def _detect_tls_prefix(pcap_path: str) -> str:
"""Detect whether tshark uses 'tls' or 'ssl' field prefix for this capture."""
for prefix in ["tls", "ssl"]:
try:
rows = _run_tshark_fields(
pcap_path, f"{prefix}.handshake.type == 1",
["frame.number"], max_packets=5
)
if rows and rows[0][0]:
return prefix
except Exception:
pass
return "tls"
def tls_analysis_execute(
project_name: str,
pcap_name: str,
) -> Dict[str, Any]:
"""
Analyze TLS/SSL traffic from a locally synced PCAP file.
Extracts: SNI values, TLS versions, cipher suites offered and selected,
certificate subjects/issuers, handshake success/failure, ALPN,
and flags weak or deprecated configurations.
Args:
project_name: Name of the project containing the PCAP
pcap_name: Name of the PCAP file
"""
try:
from .workspace_sync import get_pcap_path as get_project_pcap_path
pcap_path = get_project_pcap_path(project_name, pcap_name)
if not pcap_path:
return {"ok": False, "error": f"PCAP '{pcap_name}' not found in project '{project_name}'"}
logger.info(f"Analyzing TLS traffic in: {pcap_path}")
# Try both 'tls' and 'ssl' since tshark version / build determines the prefix
tls_total = _safe_count(pcap_path, "tls")
ssl_total = _safe_count(pcap_path, "ssl") if tls_total == 0 else 0
total = tls_total + ssl_total
if total == 0:
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"total_tls_packets": 0,
"message": "No TLS/SSL traffic found in capture",
}
# Auto-detect field prefix
p = _detect_tls_prefix(pcap_path)
logger.info(f"Using tshark field prefix: '{p}'")
# ── Client Hello (handshake type 1) ──
ch_fields = [
"frame.number", "ip.src", "ip.dst",
f"{p}.handshake.extensions_server_name",
f"{p}.handshake.version",
f"{p}.handshake.extensions.supported_version",
f"{p}.handshake.ciphersuite",
f"{p}.handshake.extensions_alpn_str",
]
ch_rows = _safe_run(pcap_path, f"{p}.handshake.type == 1", ch_fields)
sni_counter = Counter()
client_versions = Counter()
offered_ciphers = Counter()
alpn_counter = Counter()
client_ips = Counter()
server_ips = Counter()
connections = []
for row in ch_rows:
if len(row) < 5:
continue
src_ip = row[1]
dst_ip = row[2]
sni = row[3].strip() if row[3] else ""
hs_version = row[4].strip() if row[4] else ""
supported_versions = row[5].strip() if len(row) > 5 and row[5] else ""
ciphers = row[6].strip() if len(row) > 6 and row[6] else ""
alpn = row[7].strip() if len(row) > 7 and row[7] else ""
if sni:
sni_counter[sni] += 1
if src_ip:
client_ips[src_ip] += 1
if dst_ip:
server_ips[dst_ip] += 1
if supported_versions:
for v in supported_versions.split(","):
resolved = _resolve_version(v)
client_versions[resolved] += 1
elif hs_version:
resolved = _resolve_version(hs_version)
client_versions[resolved] += 1
if ciphers:
for c in ciphers.split(","):
c = c.strip()
if c:
offered_ciphers[c] += 1
if alpn:
for a in alpn.split(","):
a = a.strip()
if a:
alpn_counter[a] += 1
if len(connections) < 50:
connections.append({
"frame": row[0],
"client": src_ip,
"server": dst_ip,
"sni": sni,
"version": _resolve_version(supported_versions.split(",")[0]) if supported_versions else _resolve_version(hs_version),
})
# ── Server Hello (handshake type 2) ──
sh_fields = [
"frame.number", "ip.src", "ip.dst",
f"{p}.handshake.version",
f"{p}.handshake.extensions.supported_version",
f"{p}.handshake.ciphersuite",
]
sh_rows = _safe_run(pcap_path, f"{p}.handshake.type == 2", sh_fields)
negotiated_versions = Counter()
selected_ciphers = Counter()
for row in sh_rows:
if len(row) < 4:
continue
hs_version = row[3].strip() if row[3] else ""
sup_version = row[4].strip() if len(row) > 4 and row[4] else ""
cipher = row[5].strip() if len(row) > 5 and row[5] else ""
version = sup_version or hs_version
if version:
resolved = _resolve_version(version.split(",")[0])
negotiated_versions[resolved] += 1
if cipher:
selected_ciphers[cipher] += 1
# ── Certificate info ──
cert_fields = [
"frame.number",
"x509sat.uTF8String",
"x509ce.dNSName",
"x509af.utcTime",
]
cert_filter = f"{p}.handshake.type == 11"
cert_rows = _safe_run(pcap_path, cert_filter, cert_fields)
certificates = []
for row in cert_rows:
if len(row) < 2:
continue
cert_info = {
"frame": row[0],
"subject_cn": row[1] if row[1] else "",
"san_dns": row[2] if len(row) > 2 and row[2] else "",
"validity": row[3] if len(row) > 3 and row[3] else "",
}
if cert_info["subject_cn"] or cert_info["san_dns"]:
certificates.append(cert_info)
# ── TLS Alerts (handshake failures) ──
alert_filter = f"{p}.alert_message"
alert_rows = _safe_run(pcap_path, alert_filter, [
"frame.number", "ip.src", "ip.dst",
f"{p}.alert_message.level", f"{p}.alert_message.desc",
])
alerts = []
for row in alert_rows:
if len(row) >= 5:
alerts.append({
"frame": row[0],
"src": row[1],
"dst": row[2],
"level": row[3],
"description": row[4],
})
# ── Security findings ──
findings = []
deprecated_found = {v for v in negotiated_versions if v in DEPRECATED_VERSIONS}
if deprecated_found:
findings.append({
"severity": "high",
"type": "deprecated_tls_version",
"detail": f"Deprecated TLS versions negotiated: {', '.join(deprecated_found)}. Upgrade to TLS 1.2 or 1.3.",
"affected_versions": list(deprecated_found),
})
weak_selected = [c for c in selected_ciphers if _is_weak_cipher(c)]
if weak_selected:
findings.append({
"severity": "high",
"type": "weak_cipher_negotiated",
"detail": f"Weak cipher suites were negotiated: {', '.join(weak_selected[:5])}",
"ciphers": weak_selected[:10],
})
if alerts:
findings.append({
"severity": "medium",
"type": "tls_alerts",
"detail": f"{len(alerts)} TLS alert(s) detected - may indicate handshake failures or certificate issues",
"count": len(alerts),
})
if not negotiated_versions.get("TLS 1.3", 0) and negotiated_versions:
findings.append({
"severity": "low",
"type": "no_tls_13",
"detail": "No TLS 1.3 connections observed. Consider enabling TLS 1.3 for improved security and performance.",
})
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"total_tls_packets": total,
"tshark_field_prefix": p,
"total_client_hellos": len(ch_rows),
"total_server_hellos": len(sh_rows),
"sni_values": dict(sni_counter.most_common(25)),
"unique_sni": len(sni_counter),
"client_offered_versions": dict(client_versions),
"negotiated_versions": dict(negotiated_versions),
"selected_cipher_suites": dict(selected_ciphers.most_common(20)),
"alpn_protocols": dict(alpn_counter),
"top_client_ips": dict(client_ips.most_common(10)),
"top_server_ips": dict(server_ips.most_common(10)),
"certificates": certificates[:20],
"tls_alerts": alerts[:20],
"security_findings": findings,
"sample_connections": connections[:20],
}
except subprocess.TimeoutExpired:
logger.error("tshark command timed out")
return {"ok": False, "error": "Analysis timed out"}
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode("utf-8") if e.stderr else str(e)
logger.error(f"tshark failed: {error_msg}")
return {"ok": False, "error": f"tshark failed: {error_msg}"}
except Exception as e:
logger.error(f"Error in TLS analysis: {e}", exc_info=True)
return {"ok": False, "error": f"Failed: {str(e)}"}