"""
GitHub PCAP Analysis Tool
Download and analyze pcap files directly from GitHub repositories.
Supports both public and private repositories (with authentication).
This tool:
1. Accepts a GitHub repository URL and path to pcap file
2. Downloads the pcap file from GitHub (supports private repos with PAT)
3. Performs comprehensive packet analysis
4. Returns structured analysis results
5. Cleans up temporary files
"""
import os
import tempfile
import logging
import subprocess
import base64
from typing import Dict, Any, Optional, List
from pydantic import BaseModel, Field
from .analyze_pcap_tool import (
TsharkExtractor,
_calculate_summary,
_generate_statistics,
_perform_security_analysis,
_perform_flow_analysis,
_analyze_protocols
)
# Configure logging
logger = logging.getLogger(__name__)
class GitHubPcapInput(BaseModel):
"""Input model for GitHub pcap analysis."""
repo_url: str = Field(..., description="GitHub repository URL (https://github.com/owner/repo)")
pcap_path: str = Field(..., description="Path to pcap file within repository (e.g., 'captures/network.pcap' or 'folder/subfolder/capture.pcap')")
github_username: str = Field(None, description="GitHub username (required for private repos)")
github_pat: str = Field(None, description="GitHub Personal Access Token (required for private repos)")
branch: str = Field("main", description="Branch to download from (default: main)")
max_packets: int = Field(100, description="Maximum number of packets to analyze (default: 100)")
focus_protocols: Optional[List[str]] = Field(None, description="List of protocols to focus on (e.g., ['HTTP', 'DNS', 'TLS'])")
include_statistics: bool = Field(True, description="Include network statistics")
include_security_analysis: bool = Field(True, description="Include security findings")
include_flow_analysis: bool = Field(True, description="Include flow analysis")
def parse_github_url(url: str) -> Dict[str, str]:
"""
Parse a GitHub URL to extract owner and repo.
Supports formats:
- https://github.com/owner/repo
- https://github.com/owner/repo.git
- git@github.com:owner/repo.git
Args:
url: GitHub URL
Returns:
Dictionary with owner and repo
"""
# Remove trailing slashes and .git
url = url.rstrip('/').replace('.git', '')
# Handle SSH format
if url.startswith('git@'):
# git@github.com:owner/repo
parts = url.split(':')[1].split('/')
return {
'owner': parts[0],
'repo': parts[1],
'base_url': f"https://github.com/{parts[0]}/{parts[1]}"
}
# Handle HTTPS format
parts = url.split('/')
try:
github_idx = parts.index('github.com')
return {
'owner': parts[github_idx + 1],
'repo': parts[github_idx + 2],
'base_url': f"https://github.com/{parts[github_idx + 1]}/{parts[github_idx + 2]}"
}
except (ValueError, IndexError):
raise ValueError(f"Invalid GitHub URL format: {url}")
def download_pcap_from_github(
repo_url: str,
pcap_path: str,
branch: str = "main",
username: Optional[str] = None,
pat: Optional[str] = None
) -> str:
"""
Download a specific pcap file from GitHub.
Args:
repo_url: GitHub repository URL
pcap_path: Path to pcap file within the repo
branch: Branch to download from
username: GitHub username for private repos
pat: Personal Access Token for private repos
Returns:
Path to downloaded pcap file
"""
# Parse the GitHub URL
parsed = parse_github_url(repo_url)
owner = parsed['owner']
repo = parsed['repo']
# Construct raw GitHub URL
raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{pcap_path}"
logger.info(f"Downloading pcap from: {raw_url}")
# Create temp file for the pcap
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap')
temp_file.close()
try:
# Build curl command with authentication if provided
if username and pat:
# Use GitHub API with authentication for private repos
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{pcap_path}?ref={branch}"
cmd = [
'curl',
'-L',
'-H', f'Authorization: token {pat}',
'-H', 'Accept: application/vnd.github.v3.raw',
'-o', temp_file.name,
api_url
]
logger.info(f"Using authenticated download for private repository")
else:
# Public repo - use raw URL
cmd = [
'curl',
'-L',
'-o', temp_file.name,
raw_url
]
logger.info(f"Using unauthenticated download for public repository")
# Execute download
result = subprocess.run(
cmd,
check=True,
capture_output=True,
text=True,
timeout=60
)
# Verify file was downloaded and has content
if not os.path.exists(temp_file.name) or os.path.getsize(temp_file.name) == 0:
raise ValueError("Downloaded file is empty or does not exist")
file_size = os.path.getsize(temp_file.name)
logger.info(f"Successfully downloaded pcap: {file_size} bytes")
return temp_file.name
except subprocess.CalledProcessError as e:
logger.error(f"Failed to download pcap: {e.stderr}")
# Clean up temp file on error
if os.path.exists(temp_file.name):
os.unlink(temp_file.name)
raise ValueError(f"Failed to download pcap from GitHub: {e.stderr}")
except Exception as e:
logger.error(f"Error downloading pcap: {e}")
# Clean up temp file on error
if os.path.exists(temp_file.name):
os.unlink(temp_file.name)
raise
def github_analyze_pcap_execute(
repo_url: str,
pcap_path: str,
github_username: Optional[str] = None,
github_pat: Optional[str] = None,
branch: str = "main",
max_packets: int = 100,
focus_protocols: Optional[List[str]] = None,
include_statistics: bool = True,
include_security_analysis: bool = True,
include_flow_analysis: bool = True
) -> Dict[str, Any]:
"""
Download and analyze a pcap file from a GitHub repository.
This tool downloads a pcap file directly from GitHub (public or private)
and performs comprehensive network packet analysis. Supports authentication
for private repositories using GitHub username and Personal Access Token.
Workflow:
1. Downloads pcap file from GitHub (with auth if provided)
2. Extracts packets using tshark
3. Performs security, flow, and protocol analysis
4. Returns structured results
5. Cleans up temporary files
Args:
repo_url: GitHub repository URL (e.g., "https://github.com/owner/repo")
pcap_path: Path to pcap file within repo (e.g., "captures/network.pcap")
github_username: GitHub username (required for private repos)
github_pat: GitHub Personal Access Token (required for private repos)
branch: Branch to download from (default: "main")
max_packets: Maximum number of packets to analyze (default: 100)
focus_protocols: Optional list of protocols for detailed analysis
include_statistics: Include network statistics (default: True)
include_security_analysis: Include security findings (default: True)
include_flow_analysis: Include flow analysis (default: True)
Returns:
Dictionary containing:
- ok: Success status
- repo_url: GitHub repository URL
- pcap_path: Path to pcap within repo
- branch: Branch downloaded from
- file_size_bytes: Size of downloaded file
- summary: Packet capture summary
- statistics: Network statistics (if enabled)
- security_findings: Security analysis results (if enabled)
- flow_analysis: Flow/conversation analysis (if enabled)
- protocol_details: Detailed protocol analysis (if focus_protocols specified)
- error: Error message if analysis failed
Example - Public Repository:
result = github_analyze_pcap(
repo_url="https://github.com/user/network-captures",
pcap_path="captures/traffic.pcap",
branch="main",
max_packets=500,
focus_protocols=["HTTP", "DNS"]
)
Example - Private Repository:
result = github_analyze_pcap(
repo_url="https://github.com/company/private-captures",
pcap_path="security/incident-2024.pcap",
github_username="your-username",
github_pat="ghp_xxxxxxxxxxxxx",
branch="main",
include_security_analysis=True
)
"""
temp_pcap_path = None
try:
# Download the pcap file from GitHub
logger.info(f"Downloading pcap from GitHub")
logger.info(f"Repository: {repo_url}")
logger.info(f"PCAP path: {pcap_path}")
logger.info(f"Branch: {branch}")
temp_pcap_path = download_pcap_from_github(
repo_url=repo_url,
pcap_path=pcap_path,
branch=branch,
username=github_username,
pat=github_pat
)
file_size = os.path.getsize(temp_pcap_path)
logger.info(f"Downloaded pcap file: {file_size} bytes")
# Extract packets using tshark
extractor = TsharkExtractor()
packets = extractor.extract_packets(temp_pcap_path, max_packets=max_packets)
if not packets:
return {
"ok": True,
"repo_url": repo_url,
"pcap_path": pcap_path,
"branch": branch,
"file_size_bytes": file_size,
"summary": {
"total_packets": 0,
"message": "No packets found or pcap file is empty"
}
}
# Calculate basic summary
summary = _calculate_summary(packets)
# Extract unique IPs and URLs for quick reference
unique_ips = set()
urls = []
for packet in packets:
if 'ip' in packet:
if packet['ip'].get('src'):
unique_ips.add(packet['ip']['src'])
if packet['ip'].get('dst'):
unique_ips.add(packet['ip']['dst'])
# Extract HTTP URLs
if 'http' in packet:
host = packet['http'].get('host', '')
uri = packet['http'].get('request_uri', '')
if host and uri:
urls.append(f"http://{host}{uri}")
result = {
"ok": True,
"repo_url": repo_url,
"pcap_path": pcap_path,
"branch": branch,
"file_size_bytes": file_size,
"summary": summary,
"unique_ips": sorted(list(unique_ips)),
"urls": urls[:50], # Limit to first 50 URLs
"packet_samples": packets[:20] # Include first 20 packets for LLM inspection
}
# Add statistics if requested
if include_statistics:
result["statistics"] = _generate_statistics(packets)
# Add security analysis if requested
if include_security_analysis:
result["security_findings"] = _perform_security_analysis(packets)
# Add flow analysis if requested
if include_flow_analysis:
result["flow_analysis"] = _perform_flow_analysis(packets)
# Add protocol-specific analysis if requested
if focus_protocols:
result["protocol_details"] = _analyze_protocols(packets, focus_protocols)
logger.info(f"Analysis completed successfully")
return result
except ValueError as e:
logger.error(f"Validation error: {e}")
return {
"ok": False,
"error": str(e),
"repo_url": repo_url,
"pcap_path": pcap_path
}
except subprocess.SubprocessError as e:
logger.error(f"Download error: {e}")
return {
"ok": False,
"error": f"Failed to download from GitHub: {str(e)}",
"repo_url": repo_url,
"pcap_path": pcap_path
}
except Exception as e:
logger.error(f"Error analyzing GitHub pcap: {e}", exc_info=True)
return {
"ok": False,
"error": f"Analysis failed: {str(e)}",
"repo_url": repo_url,
"pcap_path": pcap_path
}
finally:
# Clean up temporary pcap file
if temp_pcap_path and os.path.exists(temp_pcap_path):
try:
os.unlink(temp_pcap_path)
logger.info(f"Cleaned up temporary file: {temp_pcap_path}")
except Exception as e:
logger.warning(f"Failed to clean up temporary file: {e}")