"""
Conversation Statistics Tool
Provides TCP/UDP conversation tracking and statistics from PCAP data.
"""
import logging
import re
import subprocess
from typing import Dict, Any
logger = logging.getLogger(__name__)
def _parse_conversation_line(line: str) -> Dict[str, Any]:
"""
Parse a tshark conversation output line.
Handles formats like:
192.168.0.10:60225 <-> 52.24.183.209:443 14 1,478 bytes 14 15 kB 28 17 kB 2.416573000 8.8668
"""
match = re.match(
r'\s*(\S+)\s+<->\s+(\S+)\s+(.*)',
line
)
if not match:
return None
address_a = match.group(1)
address_b = match.group(2)
rest = match.group(3).strip()
# Extract all numbers from the rest of the line, ignoring units like "bytes", "kB", "MB"
# tshark format: frames_ab bytes_ab frames_ba bytes_ba frames_total bytes_total rel_start duration
# The byte values can have commas and units
numbers = []
for token in rest.split():
cleaned = token.replace(',', '')
try:
numbers.append(float(cleaned))
except ValueError:
continue
# tshark outputs: frames_a_to_b, bytes_a_to_b, frames_b_to_a, bytes_b_to_a, frames_total, bytes_total, rel_start, duration
frames_ab = int(numbers[0]) if len(numbers) > 0 else 0
bytes_ab = int(numbers[1]) if len(numbers) > 1 else 0
frames_ba = int(numbers[2]) if len(numbers) > 2 else 0
bytes_ba = int(numbers[3]) if len(numbers) > 3 else 0
frames_total = int(numbers[4]) if len(numbers) > 4 else frames_ab + frames_ba
bytes_total = int(numbers[5]) if len(numbers) > 5 else bytes_ab + bytes_ba
rel_start = numbers[6] if len(numbers) > 6 else 0.0
duration = numbers[7] if len(numbers) > 7 else 0.0
return {
"address_a": address_a,
"address_b": address_b,
"frames_a_to_b": frames_ab,
"bytes_a_to_b": bytes_ab,
"frames_b_to_a": frames_ba,
"bytes_b_to_a": bytes_ba,
"frames_total": frames_total,
"bytes_total": bytes_total,
"relative_start": round(rel_start, 6),
"duration_seconds": round(duration, 4),
}
def get_conversations_execute(
project_name: str,
pcap_name: str,
conversation_type: str = "tcp"
) -> Dict[str, Any]:
"""
Get conversation statistics from a PCAP file in a project.
"""
try:
valid_types = ["tcp", "udp", "ip"]
if conversation_type.lower() not in valid_types:
return {
"ok": False,
"error": f"Invalid conversation type: {conversation_type}. Must be one of: {', '.join(valid_types)}"
}
from .workspace_sync import get_pcap_path as get_project_pcap_path
pcap_path = get_project_pcap_path(project_name, pcap_name)
if not pcap_path:
return {
"ok": False,
"error": f"PCAP '{pcap_name}' not found in project '{project_name}'"
}
logger.info(f"Getting {conversation_type} conversations for: {pcap_path}")
cmd = [
"tshark",
"-r", pcap_path,
"-qz", f"conv,{conversation_type.lower()}"
]
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
timeout=60
)
conversation_output = result.stdout.decode('utf-8')
conversations = []
lines = conversation_output.split('\n')
in_data = False
for line in lines:
stripped = line.strip()
if not stripped:
continue
if stripped.startswith('==='):
if in_data:
break
in_data = True
continue
if not in_data:
continue
# Skip header lines (contain "Filter:" or column headers with "|")
if 'Filter:' in stripped or '| Frames' in stripped or '| Bytes' in stripped:
continue
if stripped.startswith('|'):
continue
if '<->' in stripped:
parsed = _parse_conversation_line(stripped)
if parsed:
conversations.append(parsed)
conversations.sort(key=lambda x: x.get('bytes_total', 0), reverse=True)
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"conversation_type": conversation_type,
"conversations_text": conversation_output,
"conversations": conversations[:50],
"total_conversations": len(conversations)
}
except subprocess.TimeoutExpired:
logger.error("tshark command timed out")
return {
"ok": False,
"error": "Analysis timed out after 60 seconds"
}
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode('utf-8') if e.stderr else str(e)
logger.error(f"tshark command failed: {error_msg}")
return {
"ok": False,
"error": f"tshark analysis failed: {error_msg}"
}
except Exception as e:
logger.error(f"Error analyzing conversations: {e}", exc_info=True)
return {
"ok": False,
"error": f"Analysis failed: {str(e)}"
}