"""
PCAP Analysis Tool
Analyzes packet capture data with comprehensive network analysis.
"""
import base64
import logging
import os
import subprocess
import tempfile
from datetime import datetime
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
def _parse_timestamp(timestamp_value: Any) -> float:
"""
Parse timestamp from tshark JSON output.
Handles both epoch floats and ISO format strings.
Args:
timestamp_value: Timestamp value from tshark (can be float, string, or list)
Returns:
Float timestamp (seconds since epoch)
"""
if isinstance(timestamp_value, list):
timestamp_value = timestamp_value[0] if timestamp_value else 0
if isinstance(timestamp_value, (int, float)):
return float(timestamp_value)
if isinstance(timestamp_value, str):
# Try parsing as float first
try:
return float(timestamp_value)
except ValueError:
pass
# Try parsing as ISO format
try:
dt = datetime.fromisoformat(timestamp_value.replace('Z', '+00:00'))
return dt.timestamp()
except (ValueError, AttributeError):
pass
return 0.0
class TsharkExtractor:
"""Extract packet data using tshark."""
def __init__(self, tshark_path: str = "tshark"):
"""
Initialize the tshark extractor.
Args:
tshark_path: Path to tshark executable
"""
self.tshark_path = tshark_path
self._verify_tshark()
def _verify_tshark(self):
"""Verify tshark is available."""
try:
result = subprocess.run(
[self.tshark_path, "--version"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
timeout=5
)
if result.returncode != 0:
raise ValueError("tshark not working properly")
except (FileNotFoundError, subprocess.SubprocessError) as e:
raise ValueError(f"tshark not found or not working: {e}")
def extract_packets(self, pcap_path: str, filter_str: str = "", max_packets: int = 100) -> List[Dict[str, Any]]:
"""
Extract packet data from pcap file using tshark.
Args:
pcap_path: Path to pcap file
filter_str: Wireshark display filter
max_packets: Maximum number of packets to extract
Returns:
List of packet dictionaries
"""
cmd = [
self.tshark_path,
"-r", pcap_path,
"-T", "json",
"-c", str(max_packets)
]
if filter_str:
cmd.extend(["-Y", filter_str])
try:
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
timeout=30
)
import json
packets = json.loads(result.stdout.decode('utf-8'))
# Simplify packet structure
simplified_packets = []
for packet in packets:
# tshark -T json format has _source.layers structure
layers = packet.get('_source', {}).get('layers', {})
# Get frame info - these are nested objects in tshark JSON
frame = layers.get('frame', {})
frame_protocols = frame.get('frame.protocols', [''])[0] if isinstance(frame.get('frame.protocols'), list) else frame.get('frame.protocols', '')
frame_len = frame.get('frame.len', ['0'])[0] if isinstance(frame.get('frame.len'), list) else frame.get('frame.len', '0')
frame_time = frame.get('frame.time_epoch', ['0'])
# Parse timestamp safely
timestamp = _parse_timestamp(frame_time)
simplified = {
'timestamp': timestamp,
'length': str(frame_len),
'protocols': str(frame_protocols),
'layers': []
}
# Extract IP info
ip_layer = layers.get('ip', {})
if ip_layer:
ip_src = ip_layer.get('ip.src', [''])[0] if isinstance(ip_layer.get('ip.src'), list) else ip_layer.get('ip.src', '')
ip_dst = ip_layer.get('ip.dst', [''])[0] if isinstance(ip_layer.get('ip.dst'), list) else ip_layer.get('ip.dst', '')
simplified['ip'] = {
'src': str(ip_src),
'dst': str(ip_dst)
}
# Extract TCP info
tcp_layer = layers.get('tcp', {})
if tcp_layer:
tcp_srcport = tcp_layer.get('tcp.srcport', [''])[0] if isinstance(tcp_layer.get('tcp.srcport'), list) else tcp_layer.get('tcp.srcport', '')
tcp_dstport = tcp_layer.get('tcp.dstport', [''])[0] if isinstance(tcp_layer.get('tcp.dstport'), list) else tcp_layer.get('tcp.dstport', '')
simplified['tcp'] = {
'srcport': str(tcp_srcport),
'dstport': str(tcp_dstport)
}
# Extract UDP info
udp_layer = layers.get('udp', {})
if udp_layer:
udp_srcport = udp_layer.get('udp.srcport', [''])[0] if isinstance(udp_layer.get('udp.srcport'), list) else udp_layer.get('udp.srcport', '')
udp_dstport = udp_layer.get('udp.dstport', [''])[0] if isinstance(udp_layer.get('udp.dstport'), list) else udp_layer.get('udp.dstport', '')
simplified['udp'] = {
'srcport': str(udp_srcport),
'dstport': str(udp_dstport)
}
# Extract HTTP info
http_layer = layers.get('http', {})
if http_layer:
http_method = http_layer.get('http.request.method', [''])[0] if isinstance(http_layer.get('http.request.method'), list) else http_layer.get('http.request.method', '')
http_uri = http_layer.get('http.request.uri', [''])[0] if isinstance(http_layer.get('http.request.uri'), list) else http_layer.get('http.request.uri', '')
http_code = http_layer.get('http.response.code', [''])[0] if isinstance(http_layer.get('http.response.code'), list) else http_layer.get('http.response.code', '')
http_host = http_layer.get('http.host', [''])[0] if isinstance(http_layer.get('http.host'), list) else http_layer.get('http.host', '')
http_ua = http_layer.get('http.user_agent', [''])[0] if isinstance(http_layer.get('http.user_agent'), list) else http_layer.get('http.user_agent', '')
simplified['http'] = {
'request_method': str(http_method),
'request_uri': str(http_uri),
'response_code': str(http_code),
'host': str(http_host),
'user_agent': str(http_ua)
}
simplified_packets.append(simplified)
return simplified_packets
except subprocess.TimeoutExpired:
logger.error("tshark command timed out")
return []
except subprocess.CalledProcessError as e:
logger.error(f"tshark command failed: {e.stderr.decode('utf-8')}")
return []
except Exception as e:
logger.error(f"Error extracting packets: {e}")
return []
def analyze_pcap_data_execute(
pcap_data: str,
encoding: str = "base64",
max_packets: int = 100,
focus_protocols: Optional[List[str]] = None,
include_statistics: bool = True,
include_security_analysis: bool = True,
include_flow_analysis: bool = True
) -> Dict[str, Any]:
"""
Analyze packet capture data passed directly as encoded string.
This tool performs comprehensive network packet analysis including:
- Protocol distribution and statistics
- Security analysis (port scanning, suspicious ports, unencrypted traffic)
- Flow analysis (conversation tracking, anomaly detection)
- Top talkers and port usage
Args:
pcap_data: Base64 or hex-encoded pcap file data
encoding: Encoding format - "base64" (default) or "hex"
max_packets: Maximum number of packets to analyze (default: 100)
focus_protocols: Optional list of protocols to focus on (e.g., ["HTTP", "DNS", "TLS"])
include_statistics: Whether to include statistical summaries (default: True)
include_security_analysis: Whether to perform security analysis (default: True)
include_flow_analysis: Whether to perform flow analysis (default: True)
Returns:
Dictionary containing:
- ok: Success status
- summary: Packet capture summary (packet count, duration, protocols)
- statistics: Network statistics (top talkers, top ports, packet sizes)
- security_findings: Security analysis results (if enabled)
- flow_analysis: Flow analysis results (if enabled)
- protocol_details: Focused protocol analysis (if focus_protocols specified)
- error: Error message if analysis failed
Example:
# Analyze a pcap file with security and flow analysis
with open("capture.pcap", "rb") as f:
pcap_bytes = f.read()
pcap_base64 = base64.b64encode(pcap_bytes).decode('utf-8')
result = analyze_pcap_data(
pcap_data=pcap_base64,
encoding="base64",
max_packets=500,
focus_protocols=["HTTP", "DNS"],
include_security_analysis=True
)
"""
try:
# Decode the data
if encoding == "base64":
try:
binary_data = base64.b64decode(pcap_data)
except Exception as e:
return {"ok": False, "error": f"Failed to decode base64 data: {str(e)}"}
elif encoding == "hex":
try:
binary_data = bytes.fromhex(pcap_data)
except Exception as e:
return {"ok": False, "error": f"Failed to decode hex data: {str(e)}"}
else:
return {"ok": False, "error": f"Unsupported encoding: {encoding}. Use 'base64' or 'hex'"}
# Validate we have data
if not binary_data or len(binary_data) == 0:
return {"ok": False, "error": "No data provided or data is empty"}
# Write to temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap')
try:
temp_file.write(binary_data)
temp_file.close()
logger.info(f"Processing pcap file: {len(binary_data)} bytes, max_packets: {max_packets}")
# Extract packets using tshark
extractor = TsharkExtractor()
packets = extractor.extract_packets(temp_file.name, max_packets=max_packets)
if not packets:
return {
"ok": True,
"summary": {
"total_packets": 0,
"message": "No packets found or pcap file is empty"
}
}
# Calculate basic summary
summary = _calculate_summary(packets)
# Extract unique IPs and URLs for quick reference
unique_ips = set()
urls = []
for packet in packets:
if 'ip' in packet:
if packet['ip'].get('src'):
unique_ips.add(packet['ip']['src'])
if packet['ip'].get('dst'):
unique_ips.add(packet['ip']['dst'])
# Extract HTTP URLs
if 'http' in packet:
host = packet['http'].get('host', '')
uri = packet['http'].get('request_uri', '')
if host and uri:
urls.append(f"http://{host}{uri}")
result = {
"ok": True,
"summary": summary,
"unique_ips": sorted(list(unique_ips)),
"urls": urls[:50], # Limit to first 50 URLs
"packet_samples": packets[:20] # Include first 20 packets for LLM inspection
}
# Add statistics if requested
if include_statistics:
result["statistics"] = _generate_statistics(packets)
# Add security analysis if requested
if include_security_analysis:
result["security_findings"] = _perform_security_analysis(packets)
# Add flow analysis if requested
if include_flow_analysis:
result["flow_analysis"] = _perform_flow_analysis(packets)
# Add protocol-specific analysis if requested
if focus_protocols:
result["protocol_details"] = _analyze_protocols(packets, focus_protocols)
return result
finally:
# Clean up the temporary file
if os.path.exists(temp_file.name):
try:
os.unlink(temp_file.name)
except Exception as e:
logger.warning(f"Failed to delete temporary file: {e}")
except Exception as e:
logger.error(f"Error analyzing pcap data: {e}", exc_info=True)
return {"ok": False, "error": f"Analysis failed: {str(e)}"}
def _calculate_summary(packets: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Calculate summary information from packets."""
if not packets:
return {"total_packets": 0}
protocols = {}
for packet in packets:
proto_str = packet.get('protocols', '')
for proto in proto_str.split(':'):
proto = proto.strip().upper()
if proto:
protocols[proto] = protocols.get(proto, 0) + 1
# Calculate duration
timestamps = [p.get('timestamp', 0) for p in packets if p.get('timestamp')]
timestamps = [t if isinstance(t, (int, float)) else 0 for t in timestamps]
duration = max(timestamps) - min(timestamps) if len(timestamps) > 1 else 0
return {
"total_packets": len(packets),
"capture_duration_seconds": round(duration, 2),
"protocols": protocols,
"protocols_count": len(protocols)
}
def _generate_statistics(packets: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate statistical summaries from packet data."""
if not packets:
return {}
ip_counts = {}
port_counts = {}
packet_sizes = []
for packet in packets:
# Collect packet sizes
packet_sizes.append(int(packet.get('length', 0)))
# Collect IP addresses
if 'ip' in packet:
src_ip = packet['ip'].get('src')
dst_ip = packet['ip'].get('dst')
if src_ip:
ip_counts[src_ip] = ip_counts.get(src_ip, 0) + 1
if dst_ip:
ip_counts[dst_ip] = ip_counts.get(dst_ip, 0) + 1
# Collect port information
if 'tcp' in packet:
src_port = packet['tcp'].get('srcport')
dst_port = packet['tcp'].get('dstport')
if src_port:
port_counts[f"TCP:{src_port}"] = port_counts.get(f"TCP:{src_port}", 0) + 1
if dst_port:
port_counts[f"TCP:{dst_port}"] = port_counts.get(f"TCP:{dst_port}", 0) + 1
if 'udp' in packet:
src_port = packet['udp'].get('srcport')
dst_port = packet['udp'].get('dstport')
if src_port:
port_counts[f"UDP:{src_port}"] = port_counts.get(f"UDP:{src_port}", 0) + 1
if dst_port:
port_counts[f"UDP:{dst_port}"] = port_counts.get(f"UDP:{dst_port}", 0) + 1
# Find top talkers
top_talkers = dict(sorted(ip_counts.items(), key=lambda x: x[1], reverse=True)[:10])
# Find top ports
top_ports = dict(sorted(port_counts.items(), key=lambda x: x[1], reverse=True)[:10])
# Calculate packet size statistics
avg_size = sum(packet_sizes) / len(packet_sizes) if packet_sizes else 0
min_size = min(packet_sizes) if packet_sizes else 0
max_size = max(packet_sizes) if packet_sizes else 0
return {
"top_talkers": top_talkers,
"top_ports": top_ports,
"packet_sizes": {
"average": round(avg_size, 2),
"min": min_size,
"max": max_size
}
}
def _perform_security_analysis(packets: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Perform security-focused analysis on packets."""
findings = []
# Suspicious ports
suspicious_ports = {
'4444': 'Metasploit default',
'1080': 'SOCKS proxy',
'6667': 'IRC (often used by botnets)',
'31337': 'Back Orifice',
'12345': 'NetBus',
'5900': 'VNC'
}
# Track IPs and their behaviors
ip_data = {}
unencrypted_protocols = set()
for packet in packets:
# Check for suspicious ports
for proto_key in ['tcp', 'udp']:
if proto_key in packet:
dst_port = packet[proto_key].get('dstport', '')
if dst_port in suspicious_ports:
findings.append({
"type": "suspicious_port",
"severity": "high",
"port": dst_port,
"description": suspicious_ports[dst_port],
"protocol": proto_key.upper()
})
# Track IP behavior for port scanning detection
if 'ip' in packet and 'tcp' in packet:
src_ip = packet['ip'].get('src')
dst_port = packet['tcp'].get('dstport')
if src_ip and dst_port:
if src_ip not in ip_data:
ip_data[src_ip] = {'dst_ports': set(), 'dst_ips': set()}
ip_data[src_ip]['dst_ports'].add(dst_port)
if 'ip' in packet:
dst_ip = packet['ip'].get('dst')
if dst_ip:
ip_data[src_ip]['dst_ips'].add(dst_ip)
# Check for unencrypted protocols
protocols = packet.get('protocols', '').upper()
if 'HTTP' in protocols and 'TLS' not in protocols and 'SSL' not in protocols:
unencrypted_protocols.add('HTTP')
if 'FTP' in protocols:
unencrypted_protocols.add('FTP')
if 'TELNET' in protocols:
unencrypted_protocols.add('TELNET')
# Detect port scanning
for src_ip, data in ip_data.items():
unique_ports = len(data['dst_ports'])
unique_ips = len(data['dst_ips'])
if unique_ports > 20 and unique_ips > 1:
findings.append({
"type": "port_scan_detected",
"severity": "critical",
"src_ip": src_ip,
"unique_ports_contacted": unique_ports,
"unique_ips_contacted": unique_ips,
"description": f"Possible port scanning activity from {src_ip}"
})
# Report unencrypted protocols
if unencrypted_protocols:
findings.append({
"type": "unencrypted_protocols",
"severity": "medium",
"protocols": list(unencrypted_protocols),
"description": "Unencrypted protocols detected in traffic"
})
return {
"total_findings": len(findings),
"findings": findings
}
def _perform_flow_analysis(packets: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Perform flow analysis on packets."""
flows = {}
for packet in packets:
if 'ip' in packet:
src_ip = packet['ip'].get('src')
dst_ip = packet['ip'].get('dst')
src_port = None
dst_port = None
if 'tcp' in packet:
src_port = packet['tcp'].get('srcport')
dst_port = packet['tcp'].get('dstport')
elif 'udp' in packet:
src_port = packet['udp'].get('srcport')
dst_port = packet['udp'].get('dstport')
if src_ip and dst_ip:
flow_key = f"{src_ip}:{src_port or 'any'} -> {dst_ip}:{dst_port or 'any'}"
if flow_key not in flows:
ts = packet.get('timestamp', 0)
ts = ts if isinstance(ts, (int, float)) else 0
flows[flow_key] = {
'packet_count': 0,
'src_ip': src_ip,
'dst_ip': dst_ip,
'src_port': src_port,
'dst_port': dst_port,
'first_seen': ts,
'last_seen': ts
}
flows[flow_key]['packet_count'] += 1
ts = packet.get('timestamp', 0)
ts = ts if isinstance(ts, (int, float)) else 0
flows[flow_key]['last_seen'] = ts
# Calculate flow durations and identify significant flows
significant_flows = []
for flow_key, flow_data in flows.items():
duration = flow_data['last_seen'] - flow_data['first_seen']
flow_data['duration_seconds'] = round(duration, 2)
# Consider flows with more than 5 packets as significant
if flow_data['packet_count'] >= 5:
significant_flows.append(flow_data)
# Sort by packet count
significant_flows.sort(key=lambda x: x['packet_count'], reverse=True)
return {
"total_flows": len(flows),
"significant_flows": significant_flows[:10], # Top 10
"flow_summary": {
"total_conversations": len(flows),
"flows_with_5plus_packets": len(significant_flows)
}
}
def _analyze_protocols(packets: List[Dict[str, Any]], focus_protocols: List[str]) -> Dict[str, Any]:
"""Analyze specific protocols in detail."""
protocol_data = {}
for protocol in focus_protocols:
protocol_upper = protocol.upper()
protocol_packets = []
for packet in packets:
protocols = packet.get('protocols', '').upper()
if protocol_upper in protocols:
protocol_packets.append(packet)
if protocol_upper == 'HTTP':
http_analysis = _analyze_http(protocol_packets)
protocol_data['HTTP'] = http_analysis
elif protocol_upper == 'DNS':
dns_analysis = _analyze_dns(protocol_packets)
protocol_data['DNS'] = dns_analysis
else:
protocol_data[protocol_upper] = {
"packet_count": len(protocol_packets),
"message": f"Basic analysis for {protocol_upper}"
}
return protocol_data
def _analyze_http(packets: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze HTTP traffic."""
methods = {}
status_codes = {}
hosts = {}
for packet in packets:
if 'http' in packet:
http = packet['http']
method = http.get('request_method')
if method:
methods[method] = methods.get(method, 0) + 1
status = http.get('response_code')
if status:
status_codes[status] = status_codes.get(status, 0) + 1
host = http.get('host')
if host:
hosts[host] = hosts.get(host, 0) + 1
return {
"packet_count": len(packets),
"request_methods": methods,
"response_codes": status_codes,
"hosts_contacted": hosts,
"unique_hosts": len(hosts)
}
def _analyze_dns(packets: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze DNS traffic."""
# Basic DNS analysis
return {
"packet_count": len(packets),
"message": "DNS queries and responses detected",
"note": "Detailed DNS analysis requires additional packet parsing"
}
def analyze_pcap_file_execute(
pcap_path: str,
max_packets: int = 100,
focus_protocols: Optional[List[str]] = None,
include_statistics: bool = True,
include_security_analysis: bool = True,
include_flow_analysis: bool = True
) -> Dict[str, Any]:
"""
Analyze a locally stored PCAP file with comprehensive network analysis.
Reads directly from a file path on the container - no data transfer through MCP.
Args:
pcap_path: Full path to the PCAP file on disk
max_packets: Maximum number of packets to analyze (default: 100)
focus_protocols: Optional list of protocols to focus on
include_statistics: Include statistical summaries (default: True)
include_security_analysis: Perform security analysis (default: True)
include_flow_analysis: Perform flow analysis (default: True)
"""
try:
if not os.path.exists(pcap_path):
return {"ok": False, "error": f"PCAP file not found: {pcap_path}"}
file_size = os.path.getsize(pcap_path)
logger.info(f"Analyzing PCAP file: {pcap_path} ({file_size} bytes), max_packets: {max_packets}")
extractor = TsharkExtractor()
packets = extractor.extract_packets(pcap_path, max_packets=max_packets)
if not packets:
return {
"ok": True,
"file_size_bytes": file_size,
"summary": {
"total_packets": 0,
"message": "No packets found or pcap file is empty"
}
}
summary = _calculate_summary(packets)
unique_ips = set()
urls = []
for packet in packets:
if 'ip' in packet:
if packet['ip'].get('src'):
unique_ips.add(packet['ip']['src'])
if packet['ip'].get('dst'):
unique_ips.add(packet['ip']['dst'])
if 'http' in packet:
host = packet['http'].get('host', '')
uri = packet['http'].get('request_uri', '')
if host and uri:
urls.append(f"http://{host}{uri}")
result = {
"ok": True,
"file_size_bytes": file_size,
"summary": summary,
"unique_ips": sorted(list(unique_ips)),
"urls": urls[:50],
"packet_samples": packets[:20]
}
if include_statistics:
result["statistics"] = _generate_statistics(packets)
if include_security_analysis:
result["security_findings"] = _perform_security_analysis(packets)
if include_flow_analysis:
result["flow_analysis"] = _perform_flow_analysis(packets)
if focus_protocols:
result["protocol_details"] = _analyze_protocols(packets, focus_protocols)
return result
except Exception as e:
logger.error(f"Error analyzing pcap file: {e}", exc_info=True)
return {"ok": False, "error": f"Analysis failed: {str(e)}"}
def analyze_pcap_bytes_execute(
pcap_bytes: bytes,
max_packets: int = 100,
focus_protocols: Optional[List[str]] = None,
include_statistics: bool = True,
include_security_analysis: bool = True,
include_flow_analysis: bool = True
) -> Dict[str, Any]:
"""
Analyze packet capture data passed directly as raw bytes.
This is a simpler version of analyze_pcap_data that accepts raw pcap bytes
without requiring base64 encoding. Useful for small pcap files or when
copying/pasting pcap data directly.
This tool performs comprehensive network packet analysis including:
- Protocol distribution and statistics
- Security analysis (port scanning, suspicious ports, unencrypted traffic)
- Flow analysis (conversation tracking, anomaly detection)
- Top talkers and port usage
Args:
pcap_bytes: Raw pcap file bytes
max_packets: Maximum number of packets to analyze (default: 100)
focus_protocols: Optional list of protocols to focus on (e.g., ["HTTP", "DNS", "TLS"])
include_statistics: Whether to include statistical summaries (default: True)
include_security_analysis: Whether to perform security analysis (default: True)
include_flow_analysis: Whether to perform flow analysis (default: True)
Returns:
Dictionary containing:
- ok: Success status
- summary: Packet capture summary (packet count, duration, protocols)
- statistics: Network statistics (top talkers, top ports, packet sizes)
- security_findings: Security analysis results (if enabled)
- flow_analysis: Flow analysis results (if enabled)
- protocol_details: Focused protocol analysis (if focus_protocols specified)
- error: Error message if analysis failed
Example:
# Read pcap file directly
with open("capture.pcap", "rb") as f:
pcap_bytes = f.read()
result = analyze_pcap_bytes(
pcap_bytes=pcap_bytes,
max_packets=500,
focus_protocols=["HTTP", "DNS"],
include_security_analysis=True
)
"""
try:
# Validate we have data
if not pcap_bytes or len(pcap_bytes) == 0:
return {"ok": False, "error": "No data provided or data is empty"}
# Write to temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap')
try:
temp_file.write(pcap_bytes)
temp_file.close()
logger.info(f"Processing pcap file: {len(pcap_bytes)} bytes, max_packets: {max_packets}")
# Extract packets using tshark
extractor = TsharkExtractor()
packets = extractor.extract_packets(temp_file.name, max_packets=max_packets)
if not packets:
return {
"ok": True,
"summary": {
"total_packets": 0,
"message": "No packets found or pcap file is empty"
}
}
# Calculate basic summary
summary = _calculate_summary(packets)
# Extract unique IPs and URLs for quick reference
unique_ips = set()
urls = []
for packet in packets:
if 'ip' in packet:
if packet['ip'].get('src'):
unique_ips.add(packet['ip']['src'])
if packet['ip'].get('dst'):
unique_ips.add(packet['ip']['dst'])
# Extract HTTP URLs
if 'http' in packet:
host = packet['http'].get('host', '')
uri = packet['http'].get('request_uri', '')
if host and uri:
urls.append(f"http://{host}{uri}")
result = {
"ok": True,
"summary": summary,
"unique_ips": sorted(list(unique_ips)),
"urls": urls[:50], # Limit to first 50 URLs
"packet_samples": packets[:20] # Include first 20 packets for LLM inspection
}
# Add statistics if requested
if include_statistics:
result["statistics"] = _generate_statistics(packets)
# Add security analysis if requested
if include_security_analysis:
result["security_findings"] = _perform_security_analysis(packets)
# Add flow analysis if requested
if include_flow_analysis:
result["flow_analysis"] = _perform_flow_analysis(packets)
# Add protocol-specific analysis if requested
if focus_protocols:
result["protocol_details"] = _analyze_protocols(packets, focus_protocols)
return result
finally:
# Clean up the temporary file
if os.path.exists(temp_file.name):
try:
os.unlink(temp_file.name)
except Exception as e:
logger.warning(f"Failed to delete temporary file: {e}")
except Exception as e:
logger.error(f"Error analyzing pcap bytes: {e}", exc_info=True)
return {"ok": False, "error": f"Analysis failed: {str(e)}"}