"""
Top Talkers Tool
Identifies the highest-volume source and destination IPs, ports,
and conversations to quickly spot heavy or unusual traffic.
"""
import logging
import subprocess
from collections import Counter
from typing import Dict, Any, List
logger = logging.getLogger(__name__)
def _run_tshark_fields(pcap_path: str, display_filter: str, fields: List[str], max_packets: int = 50000) -> List[List[str]]:
"""Run tshark with field extraction and return parsed rows."""
cmd = [
"tshark", "-r", pcap_path,
"-T", "fields",
"-c", str(max_packets),
]
if display_filter:
cmd.extend(["-Y", display_filter])
for field in fields:
cmd.extend(["-e", field])
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True, timeout=120
)
rows = []
for line in result.stdout.decode("utf-8").strip().split("\n"):
if line.strip():
rows.append(line.split("\t"))
return rows
def top_talkers_execute(
project_name: str,
pcap_name: str,
top_n: int = 15,
) -> Dict[str, Any]:
"""
Identify top talkers in a locally synced PCAP file.
Returns highest-volume source IPs, destination IPs, source-destination
pairs, ports, and protocols ranked by both packet count and byte volume.
Args:
project_name: Name of the project containing the PCAP
pcap_name: Name of the PCAP file
top_n: Number of top entries to return per category (default: 15)
"""
try:
from .workspace_sync import get_pcap_path as get_project_pcap_path
pcap_path = get_project_pcap_path(project_name, pcap_name)
if not pcap_path:
return {"ok": False, "error": f"PCAP '{pcap_name}' not found in project '{project_name}'"}
logger.info(f"Analyzing top talkers in: {pcap_path}")
fields = [
"frame.len", "frame.protocols",
"ip.src", "ip.dst",
"tcp.srcport", "tcp.dstport",
"udp.srcport", "udp.dstport",
]
rows = _run_tshark_fields(pcap_path, "", fields)
if not rows or (len(rows) == 1 and not rows[0][0]):
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"total_packets": 0,
"message": "No packets found in capture",
}
total_packets = len(rows)
total_bytes = 0
src_ip_packets = Counter()
src_ip_bytes = Counter()
dst_ip_packets = Counter()
dst_ip_bytes = Counter()
conv_packets = Counter()
conv_bytes = Counter()
dst_port_packets = Counter()
dst_port_bytes = Counter()
protocol_packets = Counter()
protocol_bytes = Counter()
src_dst_port_packets = Counter()
src_dst_port_bytes = Counter()
for row in rows:
if len(row) < 4:
continue
try:
pkt_len = int(row[0]) if row[0] else 0
except ValueError:
pkt_len = 0
total_bytes += pkt_len
protos = row[1] if row[1] else ""
src_ip = row[2] if row[2] else ""
dst_ip = row[3] if row[3] else ""
tcp_src = row[4] if len(row) > 4 and row[4] else ""
tcp_dst = row[5] if len(row) > 5 and row[5] else ""
udp_src = row[6] if len(row) > 6 and row[6] else ""
udp_dst = row[7] if len(row) > 7 and row[7] else ""
if src_ip:
src_ip_packets[src_ip] += 1
src_ip_bytes[src_ip] += pkt_len
if dst_ip:
dst_ip_packets[dst_ip] += 1
dst_ip_bytes[dst_ip] += pkt_len
if src_ip and dst_ip:
conv_key = f"{src_ip} -> {dst_ip}"
conv_packets[conv_key] += 1
conv_bytes[conv_key] += pkt_len
# Destination port (service port)
dst_port = tcp_dst or udp_dst
proto_label = "TCP" if tcp_dst else ("UDP" if udp_dst else "")
if dst_port and proto_label:
port_key = f"{proto_label}/{dst_port}"
dst_port_packets[port_key] += 1
dst_port_bytes[port_key] += pkt_len
# Full 5-tuple style: src:port -> dst:port
src_port = tcp_src or udp_src
if src_ip and dst_ip and src_port and dst_port and proto_label:
tuple_key = f"{src_ip}:{src_port} -> {dst_ip}:{dst_port} ({proto_label})"
src_dst_port_packets[tuple_key] += 1
src_dst_port_bytes[tuple_key] += pkt_len
# Top-level protocol (last in the colon-separated chain)
if protos:
top_proto = protos.split(":")[-1].strip().upper()
if top_proto:
protocol_packets[top_proto] += 1
protocol_bytes[top_proto] += pkt_len
def _build_ranked(packets_counter: Counter, bytes_counter: Counter, n: int) -> List[Dict[str, Any]]:
"""Build a ranked list sorted by bytes descending."""
keys_by_bytes = bytes_counter.most_common(n)
return [
{
"name": k,
"packets": packets_counter[k],
"bytes": b,
"percent_bytes": round((b / total_bytes) * 100, 1) if total_bytes > 0 else 0,
"percent_packets": round((packets_counter[k] / total_packets) * 100, 1) if total_packets > 0 else 0,
}
for k, b in keys_by_bytes
]
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"total_packets": total_packets,
"total_bytes": total_bytes,
"top_sources_by_bytes": _build_ranked(src_ip_packets, src_ip_bytes, top_n),
"top_destinations_by_bytes": _build_ranked(dst_ip_packets, dst_ip_bytes, top_n),
"top_conversations_by_bytes": _build_ranked(conv_packets, conv_bytes, top_n),
"top_destination_ports_by_bytes": _build_ranked(dst_port_packets, dst_port_bytes, top_n),
"top_protocols_by_bytes": _build_ranked(protocol_packets, protocol_bytes, top_n),
"top_flows_by_bytes": _build_ranked(src_dst_port_packets, src_dst_port_bytes, top_n),
"unique_source_ips": len(src_ip_packets),
"unique_destination_ips": len(dst_ip_packets),
"unique_conversations": len(conv_packets),
"unique_destination_ports": len(dst_port_packets),
}
except subprocess.TimeoutExpired:
logger.error("tshark command timed out")
return {"ok": False, "error": "Analysis timed out"}
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode("utf-8") if e.stderr else str(e)
logger.error(f"tshark failed: {error_msg}")
return {"ok": False, "error": f"tshark failed: {error_msg}"}
except Exception as e:
logger.error(f"Error in top talkers analysis: {e}", exc_info=True)
return {"ok": False, "error": f"Failed: {str(e)}"}