"""
Analyze PCAP from URL Tool
Downloads and analyzes packet capture files from URLs (GitHub, S3, HTTP, etc.).
"""
import logging
import os
import tempfile
from typing import Any, Dict, List, Optional
import httpx
from .analyze_pcap_tool import (
TsharkExtractor,
_calculate_summary,
_generate_statistics,
_perform_security_analysis,
_perform_flow_analysis,
_analyze_protocols
)
logger = logging.getLogger(__name__)
def analyze_pcap_from_url_execute(
url: str,
max_packets: int = 100,
focus_protocols: Optional[List[str]] = None,
include_statistics: bool = True,
include_security_analysis: bool = True,
include_flow_analysis: bool = True,
timeout: int = 30
) -> Dict[str, Any]:
"""
Download and analyze a packet capture file from a URL.
This tool downloads a pcap file from a URL (GitHub, S3, HTTP server, etc.)
and performs comprehensive network packet analysis. Useful when pcap files
are stored in repositories or cloud storage.
Supports:
- GitHub raw URLs (e.g., https://raw.githubusercontent.com/user/repo/main/capture.pcap)
- GitHub repo URLs (automatically converts to raw URL)
- S3 presigned URLs
- Any HTTP/HTTPS accessible pcap file
This tool performs comprehensive network packet analysis including:
- Protocol distribution and statistics
- Security analysis (port scanning, suspicious ports, unencrypted traffic)
- Flow analysis (conversation tracking, anomaly detection)
- Top talkers and port usage
Args:
url: URL to the pcap file (required)
max_packets: Maximum number of packets to analyze (default: 100)
focus_protocols: Optional list of protocols to focus on (e.g., ["HTTP", "DNS", "TLS"])
include_statistics: Whether to include statistical summaries (default: True)
include_security_analysis: Whether to perform security analysis (default: True)
include_flow_analysis: Whether to perform flow analysis (default: True)
timeout: Download timeout in seconds (default: 30)
Returns:
Dictionary containing:
- ok: Success status
- url: Original URL provided
- file_size_bytes: Size of downloaded file
- summary: Packet capture summary (packet count, duration, protocols)
- statistics: Network statistics (top talkers, top ports, packet sizes)
- security_findings: Security analysis results (if enabled)
- flow_analysis: Flow analysis results (if enabled)
- protocol_details: Focused protocol analysis (if focus_protocols specified)
- error: Error message if analysis failed
Example:
# Analyze pcap from GitHub
result = analyze_pcap_from_url(
url="https://raw.githubusercontent.com/user/repo/main/capture.pcap",
max_packets=500,
focus_protocols=["HTTP", "DNS"],
include_security_analysis=True
)
# Analyze from S3
result = analyze_pcap_from_url(
url="https://bucket.s3.amazonaws.com/captures/network.pcap",
max_packets=200
)
"""
try:
# Convert GitHub repo URLs to raw URLs if needed
processed_url = _process_github_url(url)
logger.info(f"Downloading pcap from URL: {processed_url}")
# Download the file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap')
try:
# Download with httpx
with httpx.Client(timeout=timeout, follow_redirects=True) as client:
response = client.get(processed_url)
response.raise_for_status()
# Write to temp file
temp_file.write(response.content)
temp_file.close()
file_size = len(response.content)
logger.info(f"Downloaded {file_size} bytes")
# Extract packets using tshark
extractor = TsharkExtractor()
packets = extractor.extract_packets(temp_file.name, max_packets=max_packets)
if not packets:
return {
"ok": True,
"url": url,
"file_size_bytes": file_size,
"summary": {
"total_packets": 0,
"message": "No packets found or pcap file is empty"
}
}
# Calculate basic summary
summary = _calculate_summary(packets)
result = {
"ok": True,
"url": url,
"file_size_bytes": file_size,
"summary": summary
}
# Add statistics if requested
if include_statistics:
result["statistics"] = _generate_statistics(packets)
# Add security analysis if requested
if include_security_analysis:
result["security_findings"] = _perform_security_analysis(packets)
# Add flow analysis if requested
if include_flow_analysis:
result["flow_analysis"] = _perform_flow_analysis(packets)
# Add protocol-specific analysis if requested
if focus_protocols:
result["protocol_details"] = _analyze_protocols(packets, focus_protocols)
return result
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error downloading file: {e.response.status_code}")
return {
"ok": False,
"error": f"Failed to download file: HTTP {e.response.status_code}",
"url": url
}
except httpx.RequestError as e:
logger.error(f"Request error downloading file: {e}")
return {
"ok": False,
"error": f"Failed to download file: {str(e)}",
"url": url
}
finally:
# Clean up the temporary file
if os.path.exists(temp_file.name):
try:
os.unlink(temp_file.name)
except Exception as e:
logger.warning(f"Failed to delete temporary file: {e}")
except Exception as e:
logger.error(f"Error analyzing pcap from URL: {e}", exc_info=True)
return {"ok": False, "error": f"Analysis failed: {str(e)}", "url": url}
def _process_github_url(url: str) -> str:
"""
Convert GitHub repository URLs to raw content URLs.
Converts:
- https://github.com/user/repo/blob/main/file.pcap
To:
- https://raw.githubusercontent.com/user/repo/main/file.pcap
Args:
url: Original URL
Returns:
Processed URL (raw GitHub URL if applicable, otherwise unchanged)
"""
if "github.com" in url and "/blob/" in url:
# Convert blob URL to raw URL
processed = url.replace("github.com", "raw.githubusercontent.com")
processed = processed.replace("/blob/", "/")
logger.info(f"Converted GitHub URL: {url} -> {processed}")
return processed
return url