"""
Workspace Sync Module
Manages PCAP files between GitHub and user's local project workspace.
Uses credentials from HTTP headers (X-GitHub-*) following the SDWAN/CML pattern.
Project name is always derived from the GitHub username for per-user isolation.
"""
import logging
import os
import shutil
import subprocess
import tempfile
import json
from datetime import datetime
from typing import Dict, Any, Optional, List
logger = logging.getLogger(__name__)
WIRESHARK_WORKSPACE = os.path.join(tempfile.gettempdir(), "wireshark_workspace")
PROJECT_TTL_SECONDS = int(os.getenv("WIRESHARK_PROJECT_TTL", str(24 * 60 * 60)))
def _ensure_workspace_exists():
os.makedirs(WIRESHARK_WORKSPACE, exist_ok=True)
def _get_project_path(project_name: str) -> str:
return os.path.join(WIRESHARK_WORKSPACE, project_name)
def _get_metadata_path(project_name: str) -> str:
return os.path.join(_get_project_path(project_name), ".metadata.json")
def _save_metadata(project_name: str, metadata: Dict[str, Any]):
path = _get_metadata_path(project_name)
with open(path, 'w') as f:
json.dump(metadata, f, indent=2)
def _load_metadata(project_name: str) -> Dict[str, Any]:
path = _get_metadata_path(project_name)
if os.path.exists(path):
with open(path, 'r') as f:
return json.load(f)
return {"pcaps": {}, "created_at": None}
def _parse_github_url(url: str) -> Dict[str, str]:
url = url.strip().rstrip('/')
if url.endswith('.git'):
url = url[:-4]
if url.startswith('git@github.com:'):
path = url.replace('git@github.com:', '')
parts = path.split('/')
if len(parts) >= 2:
return {'owner': parts[0], 'repo': parts[1]}
if 'github.com/' in url:
path = url.split('github.com/')[-1]
parts = path.split('/')
if len(parts) >= 2:
return {'owner': parts[0], 'repo': parts[1]}
raise ValueError(f"Could not parse GitHub URL: {url}")
def _validate_creds(github_creds: Dict[str, Optional[str]]) -> Optional[Dict[str, Any]]:
"""Validate GitHub creds. Returns error dict if invalid, None if ok."""
if not github_creds.get('username') or not github_creds.get('pat') or not github_creds.get('repo'):
return {
"ok": False,
"error": "GitHub credentials incomplete. Required MCP headers: X-GitHub-Username, X-GitHub-PAT, X-GitHub-Repo"
}
return None
def _list_remote_pcaps(github_creds: Dict[str, Optional[str]]) -> Dict[str, Any]:
"""
List PCAP files available in the GitHub repo.
Returns dict with:
- ok: bool
- files: list of dicts with name, size, download_url, path
- error: str if failed
"""
pat = github_creds['pat']
repo = github_creds['repo']
github_path = github_creds.get('path', '')
branch = github_creds.get('branch', 'main')
try:
parsed = _parse_github_url(repo)
owner = parsed['owner']
repo_name = parsed['repo']
except ValueError as e:
return {"ok": False, "files": [], "error": str(e)}
api_url = f"https://api.github.com/repos/{owner}/{repo_name}/contents/{github_path}?ref={branch}"
cmd = [
"curl", "-s", "-L",
"-H", f"Authorization: token {pat}",
"-H", "Accept: application/vnd.github.v3+json",
api_url
]
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, timeout=30)
contents = json.loads(result.stdout.decode('utf-8'))
if isinstance(contents, dict) and contents.get('message'):
return {"ok": False, "files": [], "error": f"GitHub API error: {contents.get('message')}"}
if not isinstance(contents, list):
contents = [contents]
pcap_files = []
for item in contents:
name = item.get('name', '')
if name.endswith('.pcap') or name.endswith('.pcapng'):
download_url = item.get('download_url')
if not download_url:
file_path = item.get('path', f"{github_path}/{name}")
download_url = f"https://raw.githubusercontent.com/{owner}/{repo_name}/{branch}/{file_path}"
pcap_files.append({
"name": name,
"size": item.get('size', 0),
"download_url": download_url,
"path": item.get('path', '')
})
return {"ok": True, "files": pcap_files}
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode('utf-8') if e.stderr else str(e)
return {"ok": False, "files": [], "error": f"GitHub API call failed: {error_msg}"}
except json.JSONDecodeError as e:
return {"ok": False, "files": [], "error": f"Failed to parse GitHub API response: {str(e)}"}
except Exception as e:
return {"ok": False, "files": [], "error": str(e)}
def list_pcaps(github_creds: Dict[str, Optional[str]]) -> Dict[str, Any]:
"""
List PCAP files: both locally synced and available in the GitHub repo.
Uses the GitHub username as the project name.
Shows which files are local (synced) and which are remote-only (not synced).
"""
err = _validate_creds(github_creds)
if err:
return err
username = github_creds['username']
project_name = username
_ensure_workspace_exists()
project_path = _get_project_path(project_name)
# Get local files
local_files = {}
if os.path.exists(project_path):
for f in os.listdir(project_path):
if f.endswith('.pcap') or f.endswith('.pcapng'):
file_path = os.path.join(project_path, f)
local_files[f] = {
"file_size_bytes": os.path.getsize(file_path),
}
metadata = _load_metadata(project_name)
# Get remote files from GitHub
remote_result = _list_remote_pcaps(github_creds)
remote_files = {}
if remote_result.get("ok"):
for item in remote_result.get("files", []):
remote_files[item["name"]] = {
"file_size_bytes": item["size"],
"github_path": item["path"],
}
# Build combined view
synced = []
not_synced = []
all_names = set(list(local_files.keys()) + list(remote_files.keys()))
for name in sorted(all_names):
is_local = name in local_files
is_remote = name in remote_files
local_info = local_files.get(name, {})
remote_info = remote_files.get(name, {})
pcap_meta = metadata.get("pcaps", {}).get(name, {})
synced_at = pcap_meta.get("synced_at", "")
try:
synced_at_human = datetime.fromisoformat(synced_at).strftime("%Y-%m-%d %H:%M:%S") if synced_at else ""
except (ValueError, TypeError):
synced_at_human = synced_at
if is_local:
synced.append({
"pcap_name": name,
"local_size_bytes": local_info.get("file_size_bytes", 0),
"remote_size_bytes": remote_info.get("file_size_bytes", 0) if is_remote else None,
"synced_at": synced_at_human or "Unknown",
"in_github": is_remote,
})
else:
not_synced.append({
"pcap_name": name,
"remote_size_bytes": remote_info.get("file_size_bytes", 0),
"github_path": remote_info.get("github_path", ""),
})
return {
"ok": True,
"project_name": project_name,
"synced_pcaps": synced,
"total_synced": len(synced),
"available_pcaps": not_synced,
"total_available": len(not_synced),
"github_repo": github_creds.get('repo', ''),
"github_path": github_creds.get('path', ''),
"github_error": remote_result.get("error") if not remote_result.get("ok") else None,
}
def sync_pcap(github_creds: Dict[str, Optional[str]], pcap_name: str) -> Dict[str, Any]:
"""
Sync a single PCAP file from GitHub to the local project workspace.
"""
err = _validate_creds(github_creds)
if err:
return err
username = github_creds['username']
pat = github_creds['pat']
project_name = username
_ensure_workspace_exists()
project_path = _get_project_path(project_name)
os.makedirs(project_path, exist_ok=True)
# Find the file in the remote repo
remote_result = _list_remote_pcaps(github_creds)
if not remote_result.get("ok"):
return {"ok": False, "error": remote_result.get("error", "Failed to list remote files")}
target = None
for item in remote_result.get("files", []):
if item["name"] == pcap_name:
target = item
break
if not target:
available = [f["name"] for f in remote_result.get("files", [])]
return {
"ok": False,
"error": f"PCAP '{pcap_name}' not found in GitHub repo. Available: {', '.join(available) if available else 'none'}"
}
dest_path = os.path.join(project_path, pcap_name)
# Download
dl_cmd = [
"curl", "-s", "-L", "-f",
"-H", f"Authorization: token {pat}",
"-o", dest_path,
target["download_url"]
]
try:
subprocess.run(dl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, timeout=120)
if not os.path.exists(dest_path) or os.path.getsize(dest_path) == 0:
return {"ok": False, "error": f"Download completed but file is empty or missing"}
file_size = os.path.getsize(dest_path)
now = datetime.now().isoformat()
logger.info(f"Synced: {pcap_name} ({file_size} bytes)")
# Update metadata
metadata = _load_metadata(project_name)
metadata["project_name"] = project_name
metadata["created_at"] = metadata.get("created_at") or now
metadata["github_repo"] = github_creds.get('repo', '')
metadata["github_path"] = github_creds.get('path', '')
metadata["github_branch"] = github_creds.get('branch', 'main')
if "pcaps" not in metadata:
metadata["pcaps"] = {}
metadata["pcaps"][pcap_name] = {
"source": "github",
"synced_at": now,
"file_size_bytes": file_size
}
_save_metadata(project_name, metadata)
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"file_size_bytes": file_size,
"synced_at": now,
"message": f"Synced '{pcap_name}' ({file_size} bytes)"
}
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode('utf-8') if e.stderr else str(e)
if os.path.exists(dest_path):
try:
os.unlink(dest_path)
except Exception:
pass
return {"ok": False, "error": f"Download failed: {error_msg}"}
except Exception as e:
logger.error(f"Error syncing PCAP: {e}", exc_info=True)
return {"ok": False, "error": f"Sync failed: {str(e)}"}
def sync_all_pcaps(github_creds: Dict[str, Optional[str]]) -> Dict[str, Any]:
"""
Sync all PCAP files from GitHub to the local project workspace.
Skips files that already exist with matching size.
"""
err = _validate_creds(github_creds)
if err:
return err
username = github_creds['username']
project_name = username
_ensure_workspace_exists()
project_path = _get_project_path(project_name)
os.makedirs(project_path, exist_ok=True)
remote_result = _list_remote_pcaps(github_creds)
if not remote_result.get("ok"):
return {"ok": False, "error": remote_result.get("error", "Failed to list remote files")}
remote_files = remote_result.get("files", [])
if not remote_files:
return {
"ok": True,
"project_name": project_name,
"synced": [],
"skipped": [],
"message": "No PCAP files found in GitHub repo"
}
synced = []
skipped = []
errors = []
for item in remote_files:
pcap_name = item["name"]
dest_path = os.path.join(project_path, pcap_name)
existing_size = os.path.getsize(dest_path) if os.path.exists(dest_path) else 0
remote_size = item.get("size", 0)
if os.path.exists(dest_path) and existing_size == remote_size:
skipped.append(pcap_name)
continue
result = sync_pcap(github_creds, pcap_name)
if result.get("ok"):
synced.append(pcap_name)
else:
errors.append({"pcap_name": pcap_name, "error": result.get("error", "Unknown")})
return {
"ok": True,
"project_name": project_name,
"synced": synced,
"skipped": skipped,
"errors": errors if errors else None,
"message": f"Synced {len(synced)}, skipped {len(skipped)} (already up to date)"
}
def remove_pcap(github_creds: Dict[str, Optional[str]], pcap_name: str) -> Dict[str, Any]:
"""
Remove a single PCAP file from the local project workspace.
"""
err = _validate_creds(github_creds)
if err:
return err
username = github_creds['username']
project_name = username
project_path = _get_project_path(project_name)
if not os.path.exists(project_path):
return {"ok": False, "error": f"Project '{project_name}' not found"}
pcap_path = os.path.join(project_path, pcap_name)
if not os.path.exists(pcap_path):
local_files = [f for f in os.listdir(project_path) if f.endswith('.pcap') or f.endswith('.pcapng')]
return {
"ok": False,
"error": f"PCAP '{pcap_name}' not found locally. Local files: {', '.join(local_files) if local_files else 'none'}"
}
try:
os.unlink(pcap_path)
logger.info(f"Removed PCAP: {pcap_path}")
metadata = _load_metadata(project_name)
if pcap_name in metadata.get("pcaps", {}):
del metadata["pcaps"][pcap_name]
_save_metadata(project_name, metadata)
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"message": f"Removed '{pcap_name}' from project '{project_name}'"
}
except Exception as e:
logger.error(f"Error removing PCAP: {e}", exc_info=True)
return {"ok": False, "error": f"Failed to remove PCAP: {str(e)}"}
def get_pcap_path(project_name: str, pcap_name: str) -> Optional[str]:
"""Get the full path to a PCAP file in a project."""
pcap_path = os.path.join(_get_project_path(project_name), pcap_name)
if os.path.exists(pcap_path):
return pcap_path
return None
def clean_project(project_name: str) -> Dict[str, Any]:
"""Remove an entire project directory."""
project_path = _get_project_path(project_name)
if not os.path.exists(project_path):
return {"ok": False, "error": f"Project '{project_name}' not found"}
try:
pcap_count = len([f for f in os.listdir(project_path) if f.endswith('.pcap') or f.endswith('.pcapng')])
shutil.rmtree(project_path)
logger.info(f"Removed project: {project_path}")
return {
"ok": True,
"project_name": project_name,
"pcaps_removed": pcap_count,
"message": f"Project '{project_name}' and {pcap_count} PCAP(s) removed"
}
except Exception as e:
logger.error(f"Error cleaning project: {e}", exc_info=True)
return {"ok": False, "error": f"Failed to clean project: {str(e)}"}
def cleanup_stale_projects():
"""Remove projects that haven't been used within the TTL period."""
_ensure_workspace_exists()
if not os.path.exists(WIRESHARK_WORKSPACE):
return
now = datetime.now()
for proj_name in os.listdir(WIRESHARK_WORKSPACE):
project_path = os.path.join(WIRESHARK_WORKSPACE, proj_name)
if not os.path.isdir(project_path):
continue
metadata = _load_metadata(proj_name)
last_activity = None
for pcap_info in metadata.get("pcaps", {}).values():
synced_at = pcap_info.get("synced_at")
if synced_at:
try:
dt = datetime.fromisoformat(synced_at)
if not last_activity or dt > last_activity:
last_activity = dt
except (ValueError, TypeError):
pass
if not last_activity:
created_at = metadata.get("created_at")
if created_at:
try:
last_activity = datetime.fromisoformat(created_at)
except (ValueError, TypeError):
pass
if last_activity:
elapsed = (now - last_activity).total_seconds()
if elapsed > PROJECT_TTL_SECONDS:
shutil.rmtree(project_path)
logger.info(f"Cleaned stale project '{proj_name}' (idle {elapsed:.0f}s)")