"""
Wireshark Python MCP Server
Main server implementation using Python's fastmcp library and tshark.
"""
import logging
import os
import sys
from typing import Any, Dict, List, Optional
from dotenv import load_dotenv
from fastmcp import Context, FastMCP
load_dotenv(override=True)
try:
from .utilities import configure_auth, get_github_credentials, validate_github_credentials, log_user_access
except ImportError:
from utilities import configure_auth, get_github_credentials, validate_github_credentials, log_user_access
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
try:
logger.info("Trying relative imports...")
try:
from .tools import (
analyze_pcap_file_execute,
get_protocol_hierarchy_execute,
get_conversations_execute,
extract_credentials_execute,
check_threats_execute,
check_ip_threat_execute,
display_filter_execute,
follow_stream_execute,
pcap_triage_execute,
tcp_health_execute,
dns_analysis_execute,
http_summary_execute,
tls_analysis_execute,
top_talkers_execute,
list_pcaps,
sync_pcap,
sync_all_pcaps,
remove_pcap,
get_pcap_path,
clean_project,
cleanup_stale_projects,
)
logger.info("Successfully imported tools with relative imports")
except ImportError as e:
logger.warning(f"Error importing with relative imports: {e}")
raise
except ImportError:
logger.info("Falling back to absolute imports...")
from tools import (
analyze_pcap_file_execute,
get_protocol_hierarchy_execute,
get_conversations_execute,
extract_credentials_execute,
check_threats_execute,
check_ip_threat_execute,
display_filter_execute,
follow_stream_execute,
pcap_triage_execute,
tcp_health_execute,
dns_analysis_execute,
http_summary_execute,
tls_analysis_execute,
top_talkers_execute,
list_pcaps,
sync_pcap,
sync_all_pcaps,
remove_pcap,
get_pcap_path,
clean_project,
cleanup_stale_projects,
)
logger.info("Successfully imported tools with absolute imports")
def _get_validated_credentials() -> Dict[str, str]:
"""Validate and return GitHub credentials. Raises ValueError if missing."""
return validate_github_credentials()
def _get_project_name() -> str:
"""Derive project name from GitHub username. Raises ValueError if not set."""
creds = _get_validated_credentials()
return creds['username']
def create_server() -> FastMCP:
"""Create and configure the FastMCP server."""
auth_provider = configure_auth()
mcp = FastMCP(
name="Wireshark Python MCP Server", version="1.0.0", auth=auth_provider
)
# ========== PCAP Management Tools ==========
@mcp.tool(
name="wireshark_list_pcaps",
description="List PCAP files available for analysis. Shows two categories: (1) locally synced PCAPs ready for analysis with file sizes and sync timestamps, and (2) PCAPs available in the GitHub repo that have not been synced yet. Uses GitHub credentials from MCP headers (X-GitHub-Username, X-GitHub-PAT, X-GitHub-Repo, X-GitHub-Path, X-GitHub-Branch). No parameters required.",
)
def wireshark_list_pcaps(
ctx: Context = None,
) -> Dict[str, Any]:
"""List local and remote PCAP files."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_list_pcaps")
creds = _get_validated_credentials()
cleanup_stale_projects()
return list_pcaps(creds)
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_sync_pcap",
description="Sync a single PCAP file from GitHub to the local workspace for analysis. Downloads the specified file from the configured GitHub repo. Use wireshark_list_pcaps first to see available files. Required: pcap_name (filename like 'capture.pcap'). Uses GitHub credentials from MCP headers.",
)
def wireshark_sync_pcap(
pcap_name: str,
ctx: Context = None,
) -> Dict[str, Any]:
"""Sync a single PCAP file from GitHub."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_sync_pcap")
creds = _get_validated_credentials()
return sync_pcap(creds, pcap_name)
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_sync_all_pcaps",
description="Sync all PCAP files from GitHub to the local workspace. Downloads all .pcap/.pcapng files from the configured repo path. Skips files that are already synced with matching size. Uses GitHub credentials from MCP headers.",
)
def wireshark_sync_all_pcaps(
ctx: Context = None,
) -> Dict[str, Any]:
"""Sync all PCAP files from GitHub."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_sync_all_pcaps")
creds = _get_validated_credentials()
return sync_all_pcaps(creds)
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_remove_pcap",
description="Remove a single PCAP file from the local workspace. Does not affect the GitHub repo - only removes the local copy. File can be re-synced later. Required: pcap_name. Uses GitHub credentials from MCP headers.",
)
def wireshark_remove_pcap(
pcap_name: str,
ctx: Context = None,
) -> Dict[str, Any]:
"""Remove a single PCAP from local workspace."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_remove_pcap")
creds = _get_validated_credentials()
return remove_pcap(creds, pcap_name)
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_clean_project",
description="Remove entire project directory and all locally synced PCAP files. Use for full cleanup when analysis is complete. Does not affect GitHub repo. Uses GitHub credentials from MCP headers to determine project name, or accepts explicit project_name. Optional: project_name.",
)
def wireshark_clean_project(
project_name: Optional[str] = None,
ctx: Context = None,
) -> Dict[str, Any]:
"""Clean entire project."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_clean_project")
if not project_name:
project_name = _get_project_name()
return clean_project(project_name=project_name)
except ValueError as e:
return {"ok": False, "error": str(e)}
# ========== Analysis Tools ==========
# These operate on locally synced PCAPs. Sync files first using the management tools above.
@mcp.tool(
name="wireshark_analyze_pcap",
description="Comprehensive analysis of a locally synced PCAP file. Returns protocol distribution, packet statistics (top talkers, top ports, packet sizes), security findings (port scans, suspicious ports, unencrypted protocols), flow analysis (conversation tracking, significant flows), and sample packets for LLM inspection. Sync the PCAP first using wireshark_sync_pcap. Required: pcap_name. Optional: max_packets (default: 100), focus_protocols (list like ['HTTP','DNS']), include_statistics (default: true), include_security_analysis (default: true), include_flow_analysis (default: true).",
)
def wireshark_analyze_pcap(
pcap_name: str,
max_packets: int = 100,
focus_protocols: Optional[List[str]] = None,
include_statistics: bool = True,
include_security_analysis: bool = True,
include_flow_analysis: bool = True,
ctx: Context = None,
) -> Dict[str, Any]:
"""Comprehensive analysis of a locally synced PCAP file."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_analyze_pcap")
project_name = _get_project_name()
pcap_path = get_pcap_path(project_name, pcap_name)
if not pcap_path:
return {"ok": False, "error": f"PCAP '{pcap_name}' not found in project '{project_name}'. Sync it first with wireshark_sync_pcap."}
result = analyze_pcap_file_execute(
pcap_path=pcap_path,
max_packets=max_packets,
focus_protocols=focus_protocols,
include_statistics=include_statistics,
include_security_analysis=include_security_analysis,
include_flow_analysis=include_flow_analysis,
)
result["project_name"] = project_name
result["pcap_name"] = pcap_name
return result
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_protocol_hierarchy",
description="Get protocol hierarchy statistics from a locally synced PCAP file. Uses tshark to provide detailed protocol distribution breakdown showing packet counts and bytes per protocol in encapsulation layers (eth->ip->tcp->http). Sync the PCAP first using wireshark_sync_pcap. Required: pcap_name (filename like 'capture.pcap').",
)
def wireshark_protocol_hierarchy(
pcap_name: str,
ctx: Context = None,
) -> Dict[str, Any]:
"""Get protocol hierarchy statistics from PCAP file."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_protocol_hierarchy")
project_name = _get_project_name()
return get_protocol_hierarchy_execute(project_name=project_name, pcap_name=pcap_name)
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_conversations",
description="Get conversation/flow statistics from a locally synced PCAP file. Tracks TCP/UDP/IP conversations showing source/destination pairs, packet counts, byte counts, and most active connections. Sync the PCAP first using wireshark_sync_pcap. Required: pcap_name. Optional: conversation_type (tcp/udp/ip, default: tcp).",
)
def wireshark_conversations(
pcap_name: str,
conversation_type: str = "tcp",
ctx: Context = None,
) -> Dict[str, Any]:
"""Get conversation statistics from PCAP file."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_conversations")
project_name = _get_project_name()
return get_conversations_execute(project_name=project_name, pcap_name=pcap_name, conversation_type=conversation_type)
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_display_filter",
description="Apply a Wireshark display filter to a locally synced PCAP and return only matching packets as structured JSON. Useful for drilling into specific traffic patterns during troubleshooting. Supports all standard Wireshark filter expressions. Examples: 'tcp.port == 80' (HTTP), 'dns' (DNS queries), 'ip.addr == 10.0.0.1' (specific host), 'tcp.flags.syn == 1' (SYN packets), 'http.request.method == \"GET\"' (HTTP GETs), 'tcp.analysis.retransmission' (retransmits). Sync the PCAP first using wireshark_sync_pcap. Required: pcap_name, display_filter. Optional: max_packets (default: 100).",
)
def wireshark_display_filter(
pcap_name: str,
display_filter: str,
max_packets: int = 100,
ctx: Context = None,
) -> Dict[str, Any]:
"""Apply a display filter and return matching packets."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_display_filter")
project_name = _get_project_name()
return display_filter_execute(
project_name=project_name,
pcap_name=pcap_name,
display_filter=display_filter,
max_packets=max_packets,
)
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_follow_stream",
description="Follow a TCP or UDP stream in a locally synced PCAP and extract the full payload exchange. Reconstructs the application-layer conversation showing the complete back-and-forth (e.g., HTTP request/response, FTP login, DNS exchange). Returns both ASCII and hex representations. Use wireshark_conversations first to identify interesting stream IDs. Sync the PCAP first using wireshark_sync_pcap. Required: pcap_name. Optional: stream_id (default: 0), stream_type ('tcp' or 'udp', default: 'tcp').",
)
def wireshark_follow_stream(
pcap_name: str,
stream_id: int = 0,
stream_type: str = "tcp",
ctx: Context = None,
) -> Dict[str, Any]:
"""Follow and extract a TCP/UDP stream payload."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_follow_stream")
project_name = _get_project_name()
return follow_stream_execute(
project_name=project_name,
pcap_name=pcap_name,
stream_id=stream_id,
stream_type=stream_type,
)
except ValueError as e:
return {"ok": False, "error": str(e)}
# ========== Triage & Health Tools ==========
@mcp.tool(
name="wireshark_pcap_triage",
description="Automated first-pass triage of a PCAP file. Produces a single structured report combining: capture overview (packets, bytes, duration, rates), protocol breakdown, top talkers (sources, destinations, conversations, ports), TCP health summary (retransmissions, dup ACKs, zero windows, out-of-order, RSTs with health score), DNS summary (queries, NXDOMAIN rate, top domains), TLS summary (SNI, versions), anomaly indicators, and recommended next tools to run. This should be the FIRST tool called on any new PCAP. Sync the PCAP first using wireshark_sync_pcap. Required: pcap_name.",
)
def wireshark_pcap_triage(
pcap_name: str,
ctx: Context = None,
) -> Dict[str, Any]:
"""Automated triage producing a full structured report."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_pcap_triage")
project_name = _get_project_name()
return pcap_triage_execute(project_name=project_name, pcap_name=pcap_name)
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_tcp_health",
description="Deep TCP health analysis of a locally synced PCAP. Counts and categorizes: retransmissions, fast retransmissions, duplicate ACKs, zero window events, out-of-order packets, window updates, RSTs, and SYN retransmissions. Returns per-category counts with percentage of total TCP traffic, affected stream IDs, sample packets, an overall health score (0-100), and human-readable findings. Use after wireshark_pcap_triage flags TCP issues. Sync the PCAP first using wireshark_sync_pcap. Required: pcap_name. Optional: include_details (default: true), detail_limit (sample packets per category, default: 10).",
)
def wireshark_tcp_health(
pcap_name: str,
include_details: bool = True,
detail_limit: int = 10,
ctx: Context = None,
) -> Dict[str, Any]:
"""Deep TCP health analysis."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_tcp_health")
project_name = _get_project_name()
return tcp_health_execute(
project_name=project_name,
pcap_name=pcap_name,
include_details=include_details,
detail_limit=detail_limit,
)
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_dns_analysis",
description="Deep DNS traffic analysis of a locally synced PCAP. Returns: top queried domains, query type distribution (A, AAAA, MX, TXT, etc.), response code breakdown (NOERROR, NXDOMAIN, SERVFAIL), NXDOMAIN domains, DNS client/server IPs, resolved IPs, response time statistics (min/max/avg/median/p95), TTL stats, and anomaly detection (high NXDOMAIN rates, long domain names suggesting tunneling, high subdomain variety, excessive TXT queries, slow resolution). Use after wireshark_pcap_triage flags DNS issues. Sync the PCAP first using wireshark_sync_pcap. Required: pcap_name.",
)
def wireshark_dns_analysis(
pcap_name: str,
ctx: Context = None,
) -> Dict[str, Any]:
"""Deep DNS traffic analysis."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_dns_analysis")
project_name = _get_project_name()
return dns_analysis_execute(project_name=project_name, pcap_name=pcap_name)
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_http_summary",
description="Deep HTTP traffic analysis of a locally synced PCAP. Returns: request methods, status codes with category breakdown (2xx/3xx/4xx/5xx), error rate, per-host breakdown with sample URIs, top URIs, user agents, content types, client/server IPs, response time statistics (min/max/avg/median/p95 in ms), response size statistics, and error details with frame references. Use after wireshark_pcap_triage flags HTTP traffic. Sync the PCAP first using wireshark_sync_pcap. Required: pcap_name.",
)
def wireshark_http_summary(
pcap_name: str,
ctx: Context = None,
) -> Dict[str, Any]:
"""Deep HTTP traffic analysis."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_http_summary")
project_name = _get_project_name()
return http_summary_execute(project_name=project_name, pcap_name=pcap_name)
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_tls_analysis",
description="Deep TLS/SSL analysis of a locally synced PCAP. Returns: SNI values (server names), TLS versions offered and negotiated, cipher suites selected, ALPN protocols, certificate subject/SAN info, TLS alerts, client/server IPs, and security findings (deprecated versions, weak ciphers, missing TLS 1.3). Use after wireshark_pcap_triage flags TLS traffic. Sync the PCAP first using wireshark_sync_pcap. Required: pcap_name.",
)
def wireshark_tls_analysis(
pcap_name: str,
ctx: Context = None,
) -> Dict[str, Any]:
"""Deep TLS/SSL traffic analysis."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_tls_analysis")
project_name = _get_project_name()
return tls_analysis_execute(project_name=project_name, pcap_name=pcap_name)
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_top_talkers",
description="Identify highest-volume traffic sources in a locally synced PCAP. Returns ranked lists of: top source IPs, top destination IPs, top conversations (src->dst pairs), top destination ports, top protocols, and top 5-tuple flows - each with packet count, byte count, and percentage of total. Use to quickly identify what is generating the most traffic. Sync the PCAP first using wireshark_sync_pcap. Required: pcap_name. Optional: top_n (entries per category, default: 15).",
)
def wireshark_top_talkers(
pcap_name: str,
top_n: int = 15,
ctx: Context = None,
) -> Dict[str, Any]:
"""Identify top traffic sources and destinations."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_top_talkers")
project_name = _get_project_name()
return top_talkers_execute(project_name=project_name, pcap_name=pcap_name, top_n=top_n)
except ValueError as e:
return {"ok": False, "error": str(e)}
# ========== Security Tools ==========
@mcp.tool(
name="wireshark_extract_credentials",
description="Extract credentials from a locally synced PCAP file for security audits. Scans for HTTP Basic Auth, FTP, Telnet plaintext credentials, and Kerberos hashes. Returns plaintext credentials immediately usable and encrypted credentials with offline cracking commands. Sync the PCAP first using wireshark_sync_pcap. Required: pcap_name.",
)
def wireshark_extract_credentials(
pcap_name: str,
ctx: Context = None,
) -> Dict[str, Any]:
"""Extract credentials from PCAP file."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_extract_credentials")
project_name = _get_project_name()
return extract_credentials_execute(project_name=project_name, pcap_name=pcap_name)
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_check_threats",
description="Check IP addresses from a locally synced PCAP against threat intelligence feeds. Extracts all unique IPs from the capture and checks against URLhaus malware blacklist. Returns list of threatening IPs with severity and context. Sync the PCAP first using wireshark_sync_pcap. Required: pcap_name. Optional: check_urlhaus (default: true).",
)
async def wireshark_check_threats(
pcap_name: str,
check_urlhaus: bool = True,
ctx: Context = None,
) -> Dict[str, Any]:
"""Check threats from PCAP file."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_check_threats")
project_name = _get_project_name()
return await check_threats_execute(project_name=project_name, pcap_name=pcap_name, check_urlhaus=check_urlhaus)
except ValueError as e:
return {"ok": False, "error": str(e)}
@mcp.tool(
name="wireshark_check_ip_threat",
description="Check a specific IP address against threat intelligence feeds. Queries URLhaus malware database for IP reputation. Returns threat status, severity, and recommendations. Does not require a PCAP file. Required: ip_address (e.g., '192.0.2.1'). Optional: check_urlhaus (default: true).",
)
async def wireshark_check_ip_threat(
ip_address: str,
check_urlhaus: bool = True,
ctx: Context = None,
) -> Dict[str, Any]:
"""Check IP address threat status."""
try:
if ctx and hasattr(ctx, "request"):
log_user_access(ctx.request, "wireshark_check_ip_threat")
_get_validated_credentials()
return await check_ip_threat_execute(ip_address=ip_address, check_urlhaus=check_urlhaus)
except ValueError as e:
return {"ok": False, "error": str(e)}
return mcp
mcp = create_server()
def main():
"""Main entry point for the Wireshark MCP server."""
transport = os.getenv("TRANSPORT", "streamable-http").lower()
port = int(os.getenv("PORT", "3020"))
host = os.getenv("HOST", "0.0.0.0")
mcp_server = create_server()
logger.info(f"Starting Wireshark MCP Server on {host}:{port} with {transport} transport")
try:
if transport == "stdio":
mcp_server.run(transport="stdio")
else:
mcp_server.run(transport="streamable-http", host=host, port=port)
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Server error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()