"""
Wireshark Project Management Tools
Manages PCAP file organization in project directories, similar to Batfish snapshots.
"""
import logging
import os
import shutil
import subprocess
import tempfile
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List, Optional
logger = logging.getLogger(__name__)
# Base directory for all Wireshark projects
WIRESHARK_WORKSPACE = os.path.join(tempfile.gettempdir(), "wireshark_workspace")
def _ensure_workspace_exists():
"""Ensure the base workspace directory exists."""
os.makedirs(WIRESHARK_WORKSPACE, exist_ok=True)
def _get_project_path(project_name: str) -> str:
"""Get the full path to a project directory."""
return os.path.join(WIRESHARK_WORKSPACE, project_name)
def _get_project_metadata_path(project_name: str) -> str:
"""Get the path to a project's metadata file."""
return os.path.join(_get_project_path(project_name), ".metadata.json")
def _save_project_metadata(project_name: str, metadata: Dict[str, Any]):
"""Save project metadata to disk."""
metadata_path = _get_project_metadata_path(project_name)
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
def _load_project_metadata(project_name: str) -> Dict[str, Any]:
"""Load project metadata from disk."""
metadata_path = _get_project_metadata_path(project_name)
if os.path.exists(metadata_path):
with open(metadata_path, 'r') as f:
return json.load(f)
return {"pcaps": {}, "created_at": None}
def parse_github_url(url: str) -> Dict[str, str]:
"""
Parse a GitHub URL to extract owner and repo.
Supports formats:
- https://github.com/owner/repo
- https://github.com/owner/repo.git
- git@github.com:owner/repo.git
"""
url = url.strip()
if url.endswith('.git'):
url = url[:-4]
if url.startswith('git@github.com:'):
path = url.replace('git@github.com:', '')
parts = path.split('/')
if len(parts) >= 2:
return {'owner': parts[0], 'repo': parts[1]}
if 'github.com/' in url:
path = url.split('github.com/')[-1]
parts = path.split('/')
if len(parts) >= 2:
return {'owner': parts[0], 'repo': parts[1]}
raise ValueError(f"Could not parse GitHub URL: {url}")
def create_project_execute(project_name: str) -> Dict[str, Any]:
"""
Create a new Wireshark project directory.
Args:
project_name: Name of the project to create
Returns:
Dictionary containing:
- ok: Success status
- project_name: Name of created project
- project_path: Full path to project directory
- message: Confirmation message
- error: Error message if creation failed
"""
try:
_ensure_workspace_exists()
project_path = _get_project_path(project_name)
if os.path.exists(project_path):
return {
"ok": False,
"error": f"Project '{project_name}' already exists. Use a different name or remove the existing project first."
}
# Create project directory
os.makedirs(project_path, exist_ok=True)
logger.info(f"Created project directory: {project_path}")
# Create metadata
metadata = {
"project_name": project_name,
"created_at": datetime.now().isoformat(),
"pcaps": {}
}
_save_project_metadata(project_name, metadata)
return {
"ok": True,
"project_name": project_name,
"project_path": project_path,
"message": f"Project '{project_name}' created successfully"
}
except Exception as e:
logger.error(f"Error creating project: {e}", exc_info=True)
return {
"ok": False,
"error": f"Failed to create project: {str(e)}"
}
def upload_github_pcap_execute(
repo_url: str,
pcap_path: str,
project_name: Optional[str] = None,
github_username: Optional[str] = None,
github_pat: Optional[str] = None,
branch: str = "main"
) -> Dict[str, Any]:
"""
Upload a PCAP file from GitHub to a project.
Args:
repo_url: GitHub repository URL
pcap_path: Path to PCAP file within the repository
project_name: Project to upload to (auto-generated if None)
github_username: GitHub username for private repos
github_pat: GitHub Personal Access Token for private repos
branch: Branch to download from (default: "main")
Returns:
Dictionary containing:
- ok: Success status
- project_name: Name of the project
- pcap_name: Name of the uploaded PCAP file
- pcap_path: Full path to the PCAP file
- file_size_bytes: Size of the uploaded file
- uploaded_at: Timestamp of upload
- error: Error message if upload failed
"""
try:
_ensure_workspace_exists()
# Auto-generate project name if not provided
if not project_name:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
parsed = parse_github_url(repo_url)
project_name = f"{parsed['repo']}_{timestamp}"
logger.info(f"Auto-generated project name: {project_name}")
project_path = _get_project_path(project_name)
# Create project if it doesn't exist
if not os.path.exists(project_path):
result = create_project_execute(project_name)
if not result["ok"]:
return result
# Parse GitHub URL
parsed = parse_github_url(repo_url)
owner = parsed['owner']
repo = parsed['repo']
# Build raw GitHub URL
raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{pcap_path}"
# Extract filename from pcap_path
pcap_filename = os.path.basename(pcap_path)
dest_path = os.path.join(project_path, pcap_filename)
# Check if file already exists
if os.path.exists(dest_path):
# Add timestamp to make it unique
name, ext = os.path.splitext(pcap_filename)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
pcap_filename = f"{name}_{timestamp}{ext}"
dest_path = os.path.join(project_path, pcap_filename)
logger.info(f"Downloading PCAP from GitHub: {raw_url}")
# Download using curl with optional authentication
if github_username and github_pat:
cmd = [
"curl", "-L", "-f",
"-u", f"{github_username}:{github_pat}",
"-o", dest_path,
raw_url
]
logger.info("Using authenticated download for private repository")
else:
cmd = [
"curl", "-L", "-f",
"-o", dest_path,
raw_url
]
logger.info("Using unauthenticated download for public repository")
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
timeout=120
)
# Verify file was downloaded
if not os.path.exists(dest_path):
return {
"ok": False,
"error": "Failed to download PCAP file"
}
file_size = os.path.getsize(dest_path)
uploaded_at = datetime.now().isoformat()
logger.info(f"Successfully downloaded PCAP: {file_size} bytes")
# Update project metadata
metadata = _load_project_metadata(project_name)
metadata["pcaps"][pcap_filename] = {
"source": "github",
"repo_url": repo_url,
"pcap_path": pcap_path,
"branch": branch,
"file_size_bytes": file_size,
"uploaded_at": uploaded_at
}
_save_project_metadata(project_name, metadata)
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_filename,
"pcap_path": dest_path,
"file_size_bytes": file_size,
"uploaded_at": uploaded_at,
"message": f"PCAP '{pcap_filename}' uploaded to project '{project_name}'"
}
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode('utf-8') if e.stderr else str(e)
logger.error(f"Download failed: {error_msg}")
# Cleanup partial download
if 'dest_path' in locals() and os.path.exists(dest_path):
try:
os.unlink(dest_path)
except Exception:
pass
return {
"ok": False,
"error": f"Failed to download PCAP: {error_msg}"
}
except Exception as e:
logger.error(f"Error uploading PCAP: {e}", exc_info=True)
return {
"ok": False,
"error": f"Failed to upload PCAP: {str(e)}"
}
def list_pcaps_execute(project_name: Optional[str] = None) -> Dict[str, Any]:
"""
List projects and/or PCAPs.
Args:
project_name: Specific project to list PCAPs from (lists all projects if None)
Returns:
Dictionary containing:
- ok: Success status
- projects: List of projects with their PCAPs (if project_name is None)
- project_name: Name of listed project (if project_name provided)
- pcaps: List of PCAPs in the project (if project_name provided)
- error: Error message if listing failed
"""
try:
_ensure_workspace_exists()
if project_name:
# List PCAPs in specific project
project_path = _get_project_path(project_name)
if not os.path.exists(project_path):
return {
"ok": False,
"error": f"Project '{project_name}' not found"
}
metadata = _load_project_metadata(project_name)
# Get all PCAP files in directory
pcap_files = [f for f in os.listdir(project_path)
if f.endswith('.pcap') or f.endswith('.pcapng')]
pcaps = []
for pcap_file in pcap_files:
pcap_info = metadata["pcaps"].get(pcap_file, {})
file_path = os.path.join(project_path, pcap_file)
file_size = os.path.getsize(file_path)
# Get human-readable timestamp
uploaded_at = pcap_info.get("uploaded_at", "")
if uploaded_at:
try:
dt = datetime.fromisoformat(uploaded_at)
uploaded_at_human = dt.strftime("%Y-%m-%d %H:%M:%S")
except:
uploaded_at_human = uploaded_at
else:
uploaded_at_human = "Unknown"
pcaps.append({
"pcap_name": pcap_file,
"file_size_bytes": file_size,
"uploaded_at": uploaded_at_human,
"source": pcap_info.get("source", "unknown"),
"metadata": pcap_info
})
# Sort by upload time (newest first)
pcaps.sort(key=lambda x: x.get("uploaded_at", ""), reverse=True)
return {
"ok": True,
"project_name": project_name,
"pcaps": pcaps,
"total_pcaps": len(pcaps)
}
else:
# List all projects
if not os.path.exists(WIRESHARK_WORKSPACE):
return {
"ok": True,
"projects": [],
"total_projects": 0,
"message": "No projects found. Use wireshark_create_project to create one."
}
project_dirs = [d for d in os.listdir(WIRESHARK_WORKSPACE)
if os.path.isdir(os.path.join(WIRESHARK_WORKSPACE, d))]
projects = []
for proj_name in project_dirs:
metadata = _load_project_metadata(proj_name)
# Count PCAP files
project_path = _get_project_path(proj_name)
pcap_count = len([f for f in os.listdir(project_path)
if f.endswith('.pcap') or f.endswith('.pcapng')])
# Get human-readable created_at
created_at = metadata.get("created_at", "")
if created_at:
try:
dt = datetime.fromisoformat(created_at)
created_at_human = dt.strftime("%Y-%m-%d %H:%M:%S")
except:
created_at_human = created_at
else:
created_at_human = "Unknown"
projects.append({
"project_name": proj_name,
"created_at": created_at_human,
"pcap_count": pcap_count,
"project_path": project_path
})
# Sort by creation time (newest first)
projects.sort(key=lambda x: x.get("created_at", ""), reverse=True)
return {
"ok": True,
"projects": projects,
"total_projects": len(projects)
}
except Exception as e:
logger.error(f"Error listing: {e}", exc_info=True)
return {
"ok": False,
"error": f"Failed to list: {str(e)}"
}
def remove_pcap_execute(project_name: str, pcap_name: str) -> Dict[str, Any]:
"""
Remove a PCAP file from a project.
Args:
project_name: Name of the project
pcap_name: Name of the PCAP file to remove
Returns:
Dictionary containing:
- ok: Success status
- project_name: Name of the project
- pcap_name: Name of removed PCAP
- message: Confirmation message
- error: Error message if removal failed
"""
try:
project_path = _get_project_path(project_name)
if not os.path.exists(project_path):
return {
"ok": False,
"error": f"Project '{project_name}' not found"
}
pcap_path = os.path.join(project_path, pcap_name)
if not os.path.exists(pcap_path):
return {
"ok": False,
"error": f"PCAP '{pcap_name}' not found in project '{project_name}'"
}
# Remove the file
os.unlink(pcap_path)
logger.info(f"Removed PCAP: {pcap_path}")
# Update metadata
metadata = _load_project_metadata(project_name)
if pcap_name in metadata["pcaps"]:
del metadata["pcaps"][pcap_name]
_save_project_metadata(project_name, metadata)
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"message": f"PCAP '{pcap_name}' removed from project '{project_name}'"
}
except Exception as e:
logger.error(f"Error removing PCAP: {e}", exc_info=True)
return {
"ok": False,
"error": f"Failed to remove PCAP: {str(e)}"
}
def clean_project_execute(project_name: str) -> Dict[str, Any]:
"""
Remove an entire project directory and all its PCAPs.
Args:
project_name: Name of the project to remove
Returns:
Dictionary containing:
- ok: Success status
- project_name: Name of removed project
- message: Confirmation message
- error: Error message if removal failed
"""
try:
project_path = _get_project_path(project_name)
if not os.path.exists(project_path):
return {
"ok": False,
"error": f"Project '{project_name}' not found"
}
# Remove entire project directory
shutil.rmtree(project_path)
logger.info(f"Removed project directory: {project_path}")
return {
"ok": True,
"project_name": project_name,
"message": f"Project '{project_name}' and all its PCAPs removed successfully"
}
except Exception as e:
logger.error(f"Error cleaning project: {e}", exc_info=True)
return {
"ok": False,
"error": f"Failed to clean project: {str(e)}"
}
def get_pcap_path(project_name: str, pcap_name: str) -> Optional[str]:
"""
Get the full path to a PCAP file in a project.
Args:
project_name: Name of the project
pcap_name: Name of the PCAP file
Returns:
Full path to PCAP file if it exists, None otherwise
"""
pcap_path = os.path.join(_get_project_path(project_name), pcap_name)
if os.path.exists(pcap_path):
return pcap_path
return None