"""
PCAP Triage Tool
Automated high-level analysis combining capture overview, top talkers,
flow summary, TCP health, DNS, TLS, and anomaly indicators into a
single structured report. Designed as the first tool an LLM should call
on any new PCAP.
"""
import logging
import os
import subprocess
from collections import Counter
from typing import Dict, Any, List
logger = logging.getLogger(__name__)
def _run_tshark_fields(pcap_path: str, display_filter: str, fields: List[str], max_packets: int = 10000) -> List[List[str]]:
"""Run tshark with field extraction and return parsed rows."""
cmd = [
"tshark", "-r", pcap_path,
"-T", "fields",
"-c", str(max_packets),
]
if display_filter:
cmd.extend(["-Y", display_filter])
for field in fields:
cmd.extend(["-e", field])
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True, timeout=120
)
rows = []
for line in result.stdout.decode("utf-8").strip().split("\n"):
if line.strip():
rows.append(line.split("\t"))
return rows
def _count_filter(pcap_path: str, display_filter: str) -> int:
"""Count packets matching a display filter."""
cmd = [
"tshark", "-r", pcap_path,
"-Y", display_filter,
"-T", "fields", "-e", "frame.number",
]
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True, timeout=120
)
output = result.stdout.decode("utf-8").strip()
return len(output.split("\n")) if output else 0
def _safe_count(pcap_path: str, display_filter: str) -> int:
"""Count packets matching a filter, returning 0 on any error."""
try:
return _count_filter(pcap_path, display_filter)
except Exception:
return 0
def pcap_triage_execute(
project_name: str,
pcap_name: str,
) -> Dict[str, Any]:
"""
Perform automated triage on a locally synced PCAP file.
Runs multiple lightweight tshark passes to produce a single structured
overview covering: capture stats, protocol breakdown, top talkers,
TCP health, DNS summary, TLS summary, and anomaly indicators.
Args:
project_name: Name of the project containing the PCAP
pcap_name: Name of the PCAP file
"""
try:
from .workspace_sync import get_pcap_path as get_project_pcap_path
pcap_path = get_project_pcap_path(project_name, pcap_name)
if not pcap_path:
return {"ok": False, "error": f"PCAP '{pcap_name}' not found in project '{project_name}'"}
logger.info(f"Running PCAP triage on: {pcap_path}")
file_size = os.path.getsize(pcap_path)
# ── 1. Capture overview ──
overview_fields = [
"frame.time_epoch", "frame.len", "frame.protocols",
"ip.src", "ip.dst",
"tcp.srcport", "tcp.dstport",
"udp.srcport", "udp.dstport",
]
all_rows = _run_tshark_fields(pcap_path, "", overview_fields, max_packets=50000)
total_packets = len(all_rows)
if total_packets == 0:
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"file_size_bytes": file_size,
"total_packets": 0,
"message": "No packets found in capture",
}
timestamps = []
total_bytes = 0
protocol_counter = Counter()
src_ip_bytes = Counter()
dst_ip_bytes = Counter()
src_ip_packets = Counter()
dst_ip_packets = Counter()
conversation_bytes = Counter()
port_counter = Counter()
for row in all_rows:
if len(row) < 5:
continue
try:
ts = float(row[0]) if row[0] else 0
timestamps.append(ts)
except ValueError:
pass
try:
pkt_len = int(row[1]) if row[1] else 0
total_bytes += pkt_len
except ValueError:
pkt_len = 0
protos = row[2] if row[2] else ""
for p in protos.split(":"):
p = p.strip().upper()
if p:
protocol_counter[p] += 1
src_ip = row[3] if len(row) > 3 and row[3] else ""
dst_ip = row[4] if len(row) > 4 and row[4] else ""
if src_ip:
src_ip_bytes[src_ip] += pkt_len
src_ip_packets[src_ip] += 1
if dst_ip:
dst_ip_bytes[dst_ip] += pkt_len
dst_ip_packets[dst_ip] += 1
if src_ip and dst_ip:
conv_key = tuple(sorted([src_ip, dst_ip]))
conversation_bytes[conv_key] += pkt_len
# Ports
tcp_dst = row[6] if len(row) > 6 and row[6] else ""
udp_dst = row[8] if len(row) > 8 and row[8] else ""
if tcp_dst:
port_counter[f"TCP/{tcp_dst}"] += 1
if udp_dst:
port_counter[f"UDP/{udp_dst}"] += 1
duration = (max(timestamps) - min(timestamps)) if len(timestamps) > 1 else 0
capture_overview = {
"total_packets": total_packets,
"total_bytes": total_bytes,
"file_size_bytes": file_size,
"duration_seconds": round(duration, 3),
"packets_per_second": round(total_packets / duration, 1) if duration > 0 else total_packets,
"bytes_per_second": round(total_bytes / duration, 1) if duration > 0 else total_bytes,
}
# ── 2. Protocol breakdown ──
protocol_breakdown = dict(protocol_counter.most_common(20))
# ── 3. Top talkers ──
top_sources = [
{"ip": ip, "packets": src_ip_packets[ip], "bytes": src_ip_bytes[ip]}
for ip, _ in src_ip_bytes.most_common(10)
]
top_destinations = [
{"ip": ip, "packets": dst_ip_packets[ip], "bytes": dst_ip_bytes[ip]}
for ip, _ in dst_ip_bytes.most_common(10)
]
top_conversations = [
{"endpoints": list(pair), "bytes": b}
for pair, b in conversation_bytes.most_common(10)
]
top_ports = dict(port_counter.most_common(15))
# ── 4. TCP health summary ──
tcp_total = _safe_count(pcap_path, "tcp")
tcp_retrans = _safe_count(pcap_path, "tcp.analysis.retransmission")
tcp_dup_ack = _safe_count(pcap_path, "tcp.analysis.duplicate_ack")
tcp_zero_win = _safe_count(pcap_path, "tcp.analysis.zero_window")
tcp_ooo = _safe_count(pcap_path, "tcp.analysis.out_of_order")
tcp_rst = _safe_count(pcap_path, "tcp.flags.reset == 1")
tcp_issues_total = tcp_retrans + tcp_dup_ack + tcp_zero_win + tcp_ooo
tcp_issue_rate = (tcp_issues_total / tcp_total * 100) if tcp_total > 0 else 0
if tcp_issue_rate < 0.1:
tcp_health_status = "excellent"
elif tcp_issue_rate < 1.0:
tcp_health_status = "good"
elif tcp_issue_rate < 5.0:
tcp_health_status = "degraded"
elif tcp_issue_rate < 15.0:
tcp_health_status = "poor"
else:
tcp_health_status = "critical"
tcp_health = {
"total_tcp_packets": tcp_total,
"retransmissions": tcp_retrans,
"duplicate_acks": tcp_dup_ack,
"zero_windows": tcp_zero_win,
"out_of_order": tcp_ooo,
"resets": tcp_rst,
"issue_rate_percent": round(tcp_issue_rate, 3),
"health_status": tcp_health_status,
}
# ── 5. DNS summary ──
dns_total = _safe_count(pcap_path, "dns")
dns_queries = _safe_count(pcap_path, "dns.flags.response == 0")
dns_responses = _safe_count(pcap_path, "dns.flags.response == 1")
dns_nxdomain = _safe_count(pcap_path, "dns.flags.rcode == 3")
dns_servfail = _safe_count(pcap_path, "dns.flags.rcode == 2")
dns_summary = {
"total_dns_packets": dns_total,
"queries": dns_queries,
"responses": dns_responses,
"nxdomain": dns_nxdomain,
"servfail": dns_servfail,
"nxdomain_rate_percent": round((dns_nxdomain / dns_responses * 100), 1) if dns_responses > 0 else 0,
}
# Get top queried domains (quick pass)
if dns_total > 0:
try:
domain_rows = _run_tshark_fields(
pcap_path, "dns.flags.response == 0", ["dns.qry.name"], max_packets=5000
)
domain_counter = Counter()
for row in domain_rows:
if row and row[0]:
domain_counter[row[0].strip()] += 1
dns_summary["top_queried_domains"] = dict(domain_counter.most_common(10))
dns_summary["unique_domains"] = len(domain_counter)
except Exception:
pass
# ── 6. TLS summary ──
tls_total = _safe_count(pcap_path, "tls")
tls_summary = {"total_tls_packets": tls_total}
if tls_total > 0:
try:
sni_rows = _run_tshark_fields(
pcap_path,
"tls.handshake.extensions_server_name",
["tls.handshake.extensions_server_name"],
max_packets=5000
)
sni_counter = Counter()
for row in sni_rows:
if row and row[0]:
sni_counter[row[0].strip()] += 1
tls_summary["top_sni"] = dict(sni_counter.most_common(10))
tls_summary["unique_sni"] = len(sni_counter)
except Exception:
pass
try:
ver_rows = _run_tshark_fields(
pcap_path,
"tls.handshake.type == 1",
["tls.handshake.version"],
max_packets=5000
)
ver_counter = Counter()
version_map = {
"0x0301": "TLS 1.0",
"0x0302": "TLS 1.1",
"0x0303": "TLS 1.2",
"0x0304": "TLS 1.3",
}
for row in ver_rows:
if row and row[0]:
ver_hex = row[0].strip()
ver_counter[version_map.get(ver_hex, ver_hex)] += 1
tls_summary["tls_versions"] = dict(ver_counter)
except Exception:
pass
# ── 7. Anomaly indicators ──
anomalies = []
if tcp_health_status in ("poor", "critical"):
anomalies.append({
"category": "tcp_health",
"severity": "high",
"detail": f"TCP issue rate is {tcp_issue_rate:.1f}% - significant packet loss or congestion",
})
if dns_summary.get("nxdomain_rate_percent", 0) > 20:
anomalies.append({
"category": "dns",
"severity": "high",
"detail": f"NXDOMAIN rate is {dns_summary['nxdomain_rate_percent']}% - possible misconfiguration or malware",
})
if tcp_rst > 0 and tcp_total > 0 and (tcp_rst / tcp_total * 100) > 10:
anomalies.append({
"category": "connection_failures",
"severity": "high",
"detail": f"RST rate is {tcp_rst/tcp_total*100:.1f}% ({tcp_rst} RSTs) - many connections being rejected/reset",
})
# Check for unencrypted HTTP
http_count = _safe_count(pcap_path, "http")
if http_count > 0:
anomalies.append({
"category": "security",
"severity": "medium",
"detail": f"{http_count} unencrypted HTTP packets detected",
})
# Check for cleartext auth protocols
ftp_count = _safe_count(pcap_path, "ftp")
telnet_count = _safe_count(pcap_path, "telnet")
if ftp_count > 0:
anomalies.append({
"category": "security",
"severity": "high",
"detail": f"{ftp_count} FTP packets - credentials transmitted in cleartext",
})
if telnet_count > 0:
anomalies.append({
"category": "security",
"severity": "high",
"detail": f"{telnet_count} Telnet packets - credentials transmitted in cleartext",
})
# ── 8. Recommended next steps ──
recommendations = []
if tcp_health_status in ("degraded", "poor", "critical"):
recommendations.append("Run wireshark_tcp_health for detailed retransmission and congestion analysis")
if dns_total > 0 and (dns_summary.get("nxdomain_rate_percent", 0) > 5 or dns_summary.get("unique_domains", 0) > 50):
recommendations.append("Run wireshark_dns_analysis for detailed DNS query pattern analysis")
if http_count > 0:
recommendations.append("Run wireshark_display_filter with filter 'http' to inspect HTTP traffic")
if tls_total > 0:
recommendations.append("Review TLS SNI values to identify encrypted destinations")
if tcp_rst > 10:
recommendations.append("Run wireshark_display_filter with filter 'tcp.flags.reset == 1' to investigate connection resets")
if not recommendations:
recommendations.append("Use wireshark_display_filter to drill into specific protocols or hosts")
recommendations.append("Use wireshark_follow_stream to inspect individual conversation payloads")
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"capture_overview": capture_overview,
"protocol_breakdown": protocol_breakdown,
"top_talkers": {
"top_sources": top_sources,
"top_destinations": top_destinations,
"top_conversations": top_conversations,
"top_ports": top_ports,
},
"tcp_health": tcp_health,
"dns_summary": dns_summary,
"tls_summary": tls_summary,
"anomalies": anomalies,
"recommended_next_tools": recommendations,
}
except subprocess.TimeoutExpired:
logger.error("tshark command timed out during triage")
return {"ok": False, "error": "Analysis timed out"}
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode("utf-8") if e.stderr else str(e)
logger.error(f"tshark failed during triage: {error_msg}")
return {"ok": False, "error": f"tshark failed: {error_msg}"}
except Exception as e:
logger.error(f"Error during PCAP triage: {e}", exc_info=True)
return {"ok": False, "error": f"Failed: {str(e)}"}