"""
Follow Stream Tool
Follow and extract TCP/UDP stream payloads from a PCAP file.
"""
import logging
import subprocess
from typing import Dict, Any
logger = logging.getLogger(__name__)
def follow_stream_execute(
project_name: str,
pcap_name: str,
stream_id: int = 0,
stream_type: str = "tcp",
) -> Dict[str, Any]:
"""
Follow a TCP or UDP stream and extract the payload data.
Reconstructs the application-layer conversation for a given stream ID,
showing the full back-and-forth exchange (e.g., HTTP request/response,
FTP login, DNS query/response).
Args:
project_name: Name of the project containing the PCAP
pcap_name: Name of the PCAP file in the project
stream_id: Stream index to follow (default: 0)
stream_type: Protocol stream type - "tcp" or "udp" (default: "tcp")
"""
try:
valid_types = ["tcp", "udp"]
if stream_type.lower() not in valid_types:
return {
"ok": False,
"error": f"Invalid stream_type: {stream_type}. Must be one of: {', '.join(valid_types)}"
}
from .workspace_sync import get_pcap_path as get_project_pcap_path
pcap_path = get_project_pcap_path(project_name, pcap_name)
if not pcap_path:
return {
"ok": False,
"error": f"PCAP '{pcap_name}' not found in project '{project_name}'"
}
logger.info(f"Following {stream_type} stream {stream_id} in {pcap_path}")
# Get ASCII representation of the stream
cmd_ascii = [
"tshark",
"-r", pcap_path,
"-qz", f"follow,{stream_type.lower()},ascii,{stream_id}"
]
result_ascii = subprocess.run(
cmd_ascii,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
timeout=60
)
stream_ascii = result_ascii.stdout.decode('utf-8', errors='replace')
# Also get hex representation for binary analysis
cmd_hex = [
"tshark",
"-r", pcap_path,
"-qz", f"follow,{stream_type.lower()},hex,{stream_id}"
]
result_hex = subprocess.run(
cmd_hex,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
timeout=60
)
stream_hex = result_hex.stdout.decode('utf-8', errors='replace')
# Get packet count and endpoints for this stream
filter_expr = f"{stream_type.lower()}.stream eq {stream_id}"
cmd_stats = [
"tshark",
"-r", pcap_path,
"-Y", filter_expr,
"-T", "fields",
"-e", "ip.src",
"-e", "ip.dst",
"-e", f"{stream_type.lower()}.srcport",
"-e", f"{stream_type.lower()}.dstport",
"-e", "frame.len",
]
result_stats = subprocess.run(
cmd_stats,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
timeout=60
)
stats_output = result_stats.stdout.decode('utf-8')
packet_count = 0
total_bytes = 0
endpoints = set()
for line in stats_output.strip().split('\n'):
if not line.strip():
continue
packet_count += 1
parts = line.split('\t')
if len(parts) >= 4:
src = f"{parts[0]}:{parts[2]}" if parts[0] and parts[2] else parts[0]
dst = f"{parts[1]}:{parts[3]}" if parts[1] and parts[3] else parts[1]
if src:
endpoints.add(src)
if dst:
endpoints.add(dst)
if len(parts) >= 5:
try:
total_bytes += int(parts[4])
except ValueError:
pass
# Check if stream exists
if packet_count == 0 and "====" not in stream_ascii:
return {
"ok": False,
"error": f"Stream {stream_type.upper()} #{stream_id} not found or empty. Try a different stream_id.",
"stream_type": stream_type,
"stream_id": stream_id,
}
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"stream_type": stream_type,
"stream_id": stream_id,
"stream_ascii": stream_ascii,
"stream_hex": stream_hex,
"packet_count": packet_count,
"total_bytes": total_bytes,
"endpoints": sorted(list(endpoints)),
}
except subprocess.TimeoutExpired:
logger.error("tshark command timed out")
return {"ok": False, "error": "Analysis timed out after 60 seconds"}
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode('utf-8') if e.stderr else str(e)
logger.error(f"tshark command failed: {error_msg}")
return {"ok": False, "error": f"tshark failed: {error_msg}"}
except Exception as e:
logger.error(f"Error following stream: {e}", exc_info=True)
return {"ok": False, "error": f"Failed: {str(e)}"}