"""
Protocol Hierarchy Statistics Tool
Provides protocol distribution and hierarchy statistics from PCAP data.
"""
import logging
import os
import subprocess
import tempfile
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
def get_protocol_hierarchy_execute(
project_name: str,
pcap_name: str
) -> Dict[str, Any]:
"""
Get protocol hierarchy statistics from a PCAP file in a project.
Uses tshark's protocol hierarchy statistics (tshark -qz io,phs) to provide
a detailed breakdown of protocol distribution in the capture.
This tool provides:
- Protocol distribution by packet count and bytes
- Protocol hierarchy showing encapsulation layers
- Percentage breakdown of each protocol
Args:
project_name: Name of the project containing the PCAP
pcap_name: Name of the PCAP file in the project
Returns:
Dictionary containing:
- ok: Success status
- project_name: Name of the project
- pcap_name: Name of the PCAP file
- hierarchy_text: Raw tshark protocol hierarchy output
- protocols: Parsed protocol list with statistics
- error: Error message if analysis failed
Example:
result = get_protocol_hierarchy(
project_name="network_analysis",
pcap_name="capture.pcap"
)
"""
try:
from .workspace_sync import get_pcap_path as get_project_pcap_path
pcap_path = get_project_pcap_path(project_name, pcap_name)
if not pcap_path:
return {
"ok": False,
"error": f"PCAP '{pcap_name}' not found in project '{project_name}'"
}
logger.info(f"Getting protocol hierarchy for: {pcap_path}")
# Run tshark protocol hierarchy statistics
cmd = [
"tshark",
"-r", pcap_path,
"-qz", "io,phs"
]
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
timeout=60
)
hierarchy_output = result.stdout.decode('utf-8')
logger.info(f"Protocol hierarchy generated successfully")
# Parse the output to extract protocol information
# tshark output format:
# ===================================================================
# Protocol Hierarchy Statistics
# Filter:
#
# frame frames:1312 bytes:398176
# eth frames:1312 bytes:398176
# ===================================================================
protocols = []
lines = hierarchy_output.split('\n')
for line in lines:
if 'frames:' not in line or 'bytes:' not in line:
continue
parts = line.split('frames:')
if len(parts) != 2:
continue
protocol_name = parts[0].strip()
if not protocol_name:
continue
stats_part = parts[1]
frame_count = 0
byte_count = 0
try:
frame_str = stats_part.split('bytes:')[0].strip()
byte_str = stats_part.split('bytes:')[1].strip()
frame_count = int(frame_str)
byte_count = int(byte_str)
except (IndexError, ValueError):
pass
# Determine nesting depth from leading whitespace
stripped = line.lstrip()
indent = len(line) - len(stripped)
depth = indent // 2
protocols.append({
"protocol": protocol_name,
"frames": frame_count,
"bytes": byte_count,
"depth": depth,
})
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"hierarchy_text": hierarchy_output,
"protocols": protocols,
"total_protocols": len(protocols)
}
except subprocess.TimeoutExpired:
logger.error("tshark command timed out")
return {
"ok": False,
"error": "Analysis timed out after 60 seconds"
}
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode('utf-8') if e.stderr else str(e)
logger.error(f"tshark command failed: {error_msg}")
return {
"ok": False,
"error": f"tshark analysis failed: {error_msg}"
}
except Exception as e:
logger.error(f"Error analyzing protocol hierarchy: {e}", exc_info=True)
return {
"ok": False,
"error": f"Analysis failed: {str(e)}"
}