#!/usr/bin/env python3
"""
Enhanced Wireshark MCP Server - Core Features Implementation
===========================================================
Implements:
1. Real-time JSON Packet Capture
2. Protocol Statistics & Conversations
3. Enhanced PCAP File Analysis
No external API dependencies - pure Wireshark/TShark functionality.
"""
import asyncio
import json
import logging
import os
import re
import subprocess
import tempfile
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, AsyncGenerator
from datetime import datetime
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource,
Tool,
TextContent,
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Initialize the MCP server
server = Server("wireshark-mcp-enhanced")
# Global state for active captures
ACTIVE_CAPTURES = {}
# ===== ADVANCED TOOL: PCAP TIME SLICER =====
class WiresharkPCAPTimeSlicer:
"""Extract specific time windows from PCAP files using editcap"""
def __init__(self):
self.tool = "editcap"
self.supported_formats = ["pcap", "pcapng"]
async def slice_by_time_range(
self,
input_file: str,
start_time: str,
end_time: str,
output_file: str = None,
preserve_comments: bool = True
) -> Dict[str, Any]:
"""Extract packets within specific time range"""
try:
# Validate input file
if not Path(input_file).exists():
return {"status": "❌ Error", "error": "Input file not found"}
# Generate output filename if not provided
if not output_file:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"/tmp/time_slice_{Path(input_file).stem}_{timestamp}.pcap"
# Convert time formats
start_str = self._format_time(start_time)
end_str = self._format_time(end_time)
# Build command
cmd = [self.tool]
if start_str:
cmd.extend(["-A", start_str])
if end_str:
cmd.extend(["-B", end_str])
if not preserve_comments:
cmd.append("--discard-packet-comments")
cmd.extend([input_file, output_file])
# Execute
result = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
if result.returncode == 0:
# Get basic statistics
if Path(output_file).exists():
file_size = Path(output_file).stat().st_size
stats = {"file_size_bytes": file_size, "output_exists": True}
else:
stats = {"output_exists": False}
return {
"status": "✅ Success",
"input_file": input_file,
"output_file": output_file,
"time_range": {"start": start_str, "end": end_str},
"statistics": stats,
"message": stdout.decode().strip()
}
else:
return {
"status": "❌ Error",
"error": stderr.decode().strip()
}
except Exception as e:
return {"status": "❌ Error", "error": str(e)}
def _format_time(self, time_input: str) -> str:
"""Format time for editcap"""
if not time_input:
return ""
# If it looks like ISO format, convert it
if "T" in str(time_input) or "-" in str(time_input):
try:
dt = datetime.fromisoformat(time_input.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d %H:%M:%S")
except:
return str(time_input)
return str(time_input)
@server.list_tools()
async def list_tools() -> List[Tool]:
"""List all available enhanced Wireshark MCP tools."""
logger.info("📋 Listing Enhanced Wireshark MCP tools")
return [
# Original tools
Tool(
name="wireshark_system_info",
description="Get system information and available network interfaces",
inputSchema={
"type": "object",
"properties": {
"info_type": {
"type": "string",
"enum": ["interfaces", "capabilities", "system", "all"],
"description": "Type of system information to retrieve",
"default": "all"
}
}
}
),
Tool(
name="wireshark_validate_setup",
description="Validate Wireshark installation and dependencies",
inputSchema={
"type": "object",
"properties": {
"full_check": {
"type": "boolean",
"description": "Perform comprehensive validation",
"default": False
}
}
}
),
Tool(
name="wireshark_generate_filter",
description="Generate Wireshark display filters from natural language descriptions",
inputSchema={
"type": "object",
"properties": {
"description": {
"type": "string",
"description": "Natural language description of desired traffic"
},
"complexity": {
"type": "string",
"enum": ["simple", "intermediate", "advanced"],
"description": "Desired filter complexity level",
"default": "intermediate"
}
},
"required": ["description"]
}
),
Tool(
name="wireshark_live_capture",
description="Capture live network traffic with intelligent filtering",
inputSchema={
"type": "object",
"properties": {
"interface": {
"type": "string",
"description": "Network interface to capture from (e.g., 'eth0', 'any')"
},
"duration": {
"type": "integer",
"description": "Capture duration in seconds",
"default": 60
},
"filter": {
"type": "string",
"description": "Wireshark display filter",
"default": ""
},
"max_packets": {
"type": "integer",
"description": "Maximum number of packets to capture",
"default": 1000
}
},
"required": ["interface"]
}
),
Tool(
name="wireshark_analyze_pcap",
description="Analyze existing PCAP files with comprehensive reporting",
inputSchema={
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "Path to PCAP/PCAPNG file"
},
"analysis_type": {
"type": "string",
"enum": ["quick", "comprehensive", "security", "performance"],
"description": "Type of analysis to perform",
"default": "comprehensive"
}
},
"required": ["filepath"]
}
),
# NEW: Real-time JSON Packet Capture
Tool(
name="wireshark_realtime_json_capture",
description="Capture live network traffic in real-time JSON format with streaming support",
inputSchema={
"type": "object",
"properties": {
"interface": {
"type": "string",
"description": "Network interface to capture from (e.g., 'eth0', 'any')"
},
"duration": {
"type": "integer",
"description": "Capture duration in seconds",
"default": 30
},
"filter": {
"type": "string",
"description": "BPF capture filter",
"default": ""
},
"max_packets": {
"type": "integer",
"description": "Maximum number of packets to capture",
"default": 1000
},
"json_format": {
"type": "string",
"enum": ["ek", "json", "jsonraw"],
"description": "JSON output format (ek=Elasticsearch, json=standard, jsonraw=raw)",
"default": "ek"
}
},
"required": ["interface"]
}
),
# NEW: Protocol Statistics & Conversations
Tool(
name="wireshark_protocol_statistics",
description="Generate comprehensive protocol statistics and conversation analysis",
inputSchema={
"type": "object",
"properties": {
"source": {
"type": "string",
"description": "Source: 'live' for interface or path to PCAP file"
},
"analysis_type": {
"type": "string",
"enum": ["protocol_hierarchy", "conversations", "endpoints", "io_stats", "all"],
"description": "Type of statistical analysis",
"default": "all"
},
"protocol": {
"type": "string",
"enum": ["tcp", "udp", "ip", "all"],
"description": "Protocol to analyze for conversations",
"default": "all"
},
"time_interval": {
"type": "integer",
"description": "Time interval for I/O statistics (seconds)",
"default": 60
}
},
"required": ["source"]
}
),
# ENHANCED: PCAP File Analysis with more features
Tool(
name="wireshark_analyze_pcap_enhanced",
description="Enhanced PCAP file analysis with streaming support for large files",
inputSchema={
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "Path to PCAP/PCAPNG file"
},
"analysis_type": {
"type": "string",
"enum": ["quick", "comprehensive", "security", "performance", "conversations", "statistics"],
"description": "Type of analysis to perform",
"default": "comprehensive"
},
"chunk_size": {
"type": "integer",
"description": "Number of packets to process at once (for large files)",
"default": 10000
},
"output_format": {
"type": "string",
"enum": ["text", "json", "summary"],
"description": "Output format for analysis results",
"default": "json"
}
},
"required": ["filepath"]
}
),
# ADVANCED TOOL: PCAP Time Slicer
Tool(
name="wireshark_pcap_time_slice",
description="Extract specific time windows from PCAP captures using editcap",
inputSchema={
"type": "object",
"properties": {
"input_file": {
"type": "string",
"description": "Path to input PCAP file"
},
"start_time": {
"type": "string",
"description": "Start time (ISO format: YYYY-MM-DDThh:mm:ss or Unix epoch)"
},
"end_time": {
"type": "string",
"description": "End time (ISO format: YYYY-MM-DDThh:mm:ss or Unix epoch)"
},
"output_file": {
"type": "string",
"description": "Output file path (optional)",
"default": None
}
},
"required": ["input_file", "start_time", "end_time"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle tool calls for enhanced Wireshark MCP operations."""
logger.info(f"🔧 Calling tool: {name} with args: {arguments}")
try:
# Original tools
if name == "wireshark_system_info":
return await handle_system_info(arguments)
elif name == "wireshark_validate_setup":
return await handle_validate_setup(arguments)
elif name == "wireshark_generate_filter":
return await handle_generate_filter(arguments)
elif name == "wireshark_live_capture":
return await handle_live_capture(arguments)
elif name == "wireshark_analyze_pcap":
return await handle_analyze_pcap(arguments)
# New enhanced tools
elif name == "wireshark_realtime_json_capture":
return await handle_realtime_json_capture(arguments)
elif name == "wireshark_protocol_statistics":
return await handle_protocol_statistics(arguments)
elif name == "wireshark_analyze_pcap_enhanced":
return await handle_analyze_pcap_enhanced(arguments)
elif name == "wireshark_pcap_time_slice":
return await handle_pcap_time_slice(arguments)
else:
return [TextContent(type="text", text=f"❌ Unknown tool: {name}")]
except Exception as e:
logger.error(f"❌ Error calling tool {name}: {e}")
return [TextContent(type="text", text=f"❌ Error: {str(e)}")]
async def handle_realtime_json_capture(args: Dict[str, Any]) -> List[TextContent]:
"""Handle real-time JSON packet capture with streaming support."""
interface = args.get("interface", "any")
duration = args.get("duration", 30)
filter_expr = args.get("filter", "")
max_packets = args.get("max_packets", 1000)
json_format = args.get("json_format", "ek")
capture_id = f"capture_{int(time.time())}"
try:
# Create temporary file for capture
temp_file = tempfile.NamedTemporaryFile(suffix='.pcap', delete=False)
temp_file.close()
# Build TShark command for real-time JSON output
tshark_cmd = [
"tshark",
"-i", interface,
"-T", json_format, # JSON output format
"-l", # Line buffering for real-time output
"-c", str(max_packets),
"-a", f"duration:{duration}"
]
if filter_expr:
tshark_cmd.extend(["-f", filter_expr])
logger.info(f"Starting real-time JSON capture: {' '.join(tshark_cmd)}")
# Start capture process
process = await asyncio.create_subprocess_exec(
*tshark_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Store active capture info
ACTIVE_CAPTURES[capture_id] = {
"process": process,
"interface": interface,
"started_at": datetime.now().isoformat(),
"packets": []
}
# Stream packets
packets_captured = 0
packet_buffer = []
start_time = time.time()
while True:
try:
# Read line with timeout
line = await asyncio.wait_for(
process.stdout.readline(),
timeout=1.0
)
if not line:
break
# Parse JSON packet
try:
packet_json = json.loads(line.decode())
packet_buffer.append(packet_json)
packets_captured += 1
# Batch packets for efficiency
if len(packet_buffer) >= 10:
ACTIVE_CAPTURES[capture_id]["packets"].extend(packet_buffer)
packet_buffer.clear()
except json.JSONDecodeError:
continue
# Check limits
if packets_captured >= max_packets:
break
if time.time() - start_time >= duration:
break
except asyncio.TimeoutError:
# Check if process is still running
if process.returncode is not None:
break
continue
# Flush remaining packets
if packet_buffer:
ACTIVE_CAPTURES[capture_id]["packets"].extend(packet_buffer)
# Terminate process if still running
if process.returncode is None:
process.terminate()
await process.wait()
# Get capture statistics
capture_stats = {
"capture_id": capture_id,
"status": "✅ Capture Complete",
"interface": interface,
"duration": f"{time.time() - start_time:.2f} seconds",
"filter": filter_expr or "none",
"packets_captured": packets_captured,
"json_format": json_format,
"sample_packets": ACTIVE_CAPTURES[capture_id]["packets"][:5], # First 5 packets as sample
"total_packets_stored": len(ACTIVE_CAPTURES[capture_id]["packets"])
}
# Generate summary statistics
if packets_captured > 0:
protocol_summary = {}
for packet in ACTIVE_CAPTURES[capture_id]["packets"]:
# Extract protocol info based on format
if json_format == "ek":
layers = packet.get("layers", {})
for layer in layers:
protocol_summary[layer] = protocol_summary.get(layer, 0) + 1
elif json_format == "json":
# Handle standard JSON format
source = packet.get("_source", {})
layers = source.get("layers", {})
for layer in layers:
protocol_summary[layer] = protocol_summary.get(layer, 0) + 1
capture_stats["protocol_summary"] = protocol_summary
return [TextContent(
type="text",
text=f"📡 **Real-time JSON Capture Results**\n\n```json\n{json.dumps(capture_stats, indent=2)}\n```\n\n**Note**: Full packet data stored in memory. Use capture_id '{capture_id}' to retrieve all packets."
)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ **Real-time Capture Failed**\n\nError: {str(e)}\n\nTroubleshooting:\n- Verify interface with: ip link show\n- Check permissions: groups $USER\n- Ensure TShark supports JSON: tshark -T ek -h"
)]
async def handle_protocol_statistics(args: Dict[str, Any]) -> List[TextContent]:
"""Generate comprehensive protocol statistics and conversation analysis."""
source = args.get("source", "")
analysis_type = args.get("analysis_type", "all")
protocol = args.get("protocol", "all")
time_interval = args.get("time_interval", 60)
if not source:
return [TextContent(type="text", text="❌ Error: No source specified")]
statistics_results = {}
try:
# Protocol Hierarchy Statistics
if analysis_type in ["protocol_hierarchy", "all"]:
logger.info("Generating protocol hierarchy statistics...")
cmd = ["tshark", "-q", "-z", "io,phs"]
if source != "live":
cmd.extend(["-r", source])
else:
# For live capture, use temporary capture
cmd.extend(["-i", "any", "-a", "duration:10"])
result = await run_tshark_command(cmd)
statistics_results["protocol_hierarchy"] = parse_protocol_hierarchy(result.stdout)
# Conversation Analysis
if analysis_type in ["conversations", "all"]:
logger.info("Analyzing network conversations...")
conversations = {}
protocols_to_analyze = ["tcp", "udp", "ip"] if protocol == "all" else [protocol]
for proto in protocols_to_analyze:
cmd = ["tshark", "-q", "-z", f"conv,{proto}"]
if source != "live":
cmd.extend(["-r", source])
else:
cmd.extend(["-i", "any", "-a", "duration:10"])
result = await run_tshark_command(cmd)
conversations[proto] = parse_conversations(result.stdout)
statistics_results["conversations"] = conversations
# Endpoint Analysis
if analysis_type in ["endpoints", "all"]:
logger.info("Analyzing network endpoints...")
endpoints = {}
protocols_to_analyze = ["tcp", "udp", "ip"] if protocol == "all" else [protocol]
for proto in protocols_to_analyze:
cmd = ["tshark", "-q", "-z", f"endpoints,{proto}"]
if source != "live":
cmd.extend(["-r", source])
else:
cmd.extend(["-i", "any", "-a", "duration:10"])
result = await run_tshark_command(cmd)
endpoints[proto] = parse_endpoints(result.stdout)
statistics_results["endpoints"] = endpoints
# I/O Statistics (Time-based)
if analysis_type in ["io_stats", "all"]:
logger.info("Generating I/O statistics...")
cmd = ["tshark", "-q", "-z", f"io,stat,{time_interval}"]
if source != "live":
cmd.extend(["-r", source])
else:
cmd.extend(["-i", "any", "-a", "duration:60"])
result = await run_tshark_command(cmd)
statistics_results["io_statistics"] = parse_io_statistics(result.stdout)
# Generate summary
summary = {
"source": source,
"analysis_type": analysis_type,
"timestamp": datetime.now().isoformat(),
"statistics": statistics_results
}
# Add insights
if "protocol_hierarchy" in statistics_results:
ph = statistics_results["protocol_hierarchy"]
if ph.get("total_packets"):
summary["insights"] = {
"total_packets": ph["total_packets"],
"total_bytes": ph.get("total_bytes", "unknown"),
"dominant_protocol": max(ph.get("protocols", {}), key=lambda k: ph["protocols"].get(k, {}).get("packets", 0)) if ph.get("protocols") else "none"
}
return [TextContent(
type="text",
text=f"📊 **Protocol Statistics & Conversation Analysis**\n\n```json\n{json.dumps(summary, indent=2)}\n```"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ **Statistics Generation Failed**\n\nError: {str(e)}"
)]
async def handle_analyze_pcap_enhanced(args: Dict[str, Any]) -> List[TextContent]:
"""Enhanced PCAP file analysis with streaming support for large files."""
filepath = args.get("filepath", "")
analysis_type = args.get("analysis_type", "comprehensive")
chunk_size = args.get("chunk_size", 10000)
output_format = args.get("output_format", "json")
if not filepath or not os.path.exists(filepath):
return [TextContent(type="text", text="❌ Error: File not found")]
file_size = os.path.getsize(filepath)
analysis_results = {
"file_info": {
"path": filepath,
"size": f"{file_size:,} bytes",
"size_mb": f"{file_size / 1024 / 1024:.2f} MB"
}
}
try:
# Get file info using capinfos
logger.info("Getting PCAP file information...")
cmd = ["capinfos", "-M", filepath] # Machine-readable output
result = await run_tshark_command(cmd)
analysis_results["file_metadata"] = parse_capinfos_machine_output(result.stdout)
# Perform analysis based on type
if analysis_type == "quick":
# Quick packet count and basic stats
logger.info("Performing quick analysis...")
cmd = ["tshark", "-r", filepath, "-q", "-z", "io,phs"]
result = await run_tshark_command(cmd)
analysis_results["quick_stats"] = parse_protocol_hierarchy(result.stdout)
elif analysis_type == "conversations":
# Detailed conversation analysis
logger.info("Analyzing conversations...")
conversations = {}
for proto in ["tcp", "udp", "ip"]:
cmd = ["tshark", "-r", filepath, "-q", "-z", f"conv,{proto}"]
result = await run_tshark_command(cmd)
conversations[proto] = parse_conversations(result.stdout)
analysis_results["conversations"] = conversations
elif analysis_type == "statistics":
# Comprehensive statistics
logger.info("Generating comprehensive statistics...")
# Protocol hierarchy
cmd = ["tshark", "-r", filepath, "-q", "-z", "io,phs"]
result = await run_tshark_command(cmd)
analysis_results["protocol_hierarchy"] = parse_protocol_hierarchy(result.stdout)
# Expert info (warnings, errors, etc.)
cmd = ["tshark", "-r", filepath, "-q", "-z", "expert"]
result = await run_tshark_command(cmd)
analysis_results["expert_info"] = parse_expert_info(result.stdout)
# HTTP statistics if present
cmd = ["tshark", "-r", filepath, "-q", "-z", "http,tree"]
result = await run_tshark_command(cmd, ignore_errors=True)
if result.returncode == 0:
analysis_results["http_stats"] = parse_http_stats(result.stdout)
elif analysis_type == "security":
# Security-focused analysis
logger.info("Performing security analysis...")
security_findings = await perform_security_analysis(filepath)
analysis_results["security_analysis"] = security_findings
elif analysis_type == "performance":
# Performance analysis
logger.info("Performing performance analysis...")
performance_metrics = await perform_performance_analysis(filepath)
analysis_results["performance_analysis"] = performance_metrics
else: # comprehensive
# Run all analyses
logger.info("Performing comprehensive analysis...")
# Basic statistics
cmd = ["tshark", "-r", filepath, "-q", "-z", "io,phs"]
result = await run_tshark_command(cmd)
analysis_results["protocol_hierarchy"] = parse_protocol_hierarchy(result.stdout)
# Conversations
conversations = {}
for proto in ["tcp", "udp"]:
cmd = ["tshark", "-r", filepath, "-q", "-z", f"conv,{proto}"]
result = await run_tshark_command(cmd)
conversations[proto] = parse_conversations(result.stdout)
analysis_results["conversations"] = conversations
# Expert info
cmd = ["tshark", "-r", filepath, "-q", "-z", "expert"]
result = await run_tshark_command(cmd)
analysis_results["expert_info"] = parse_expert_info(result.stdout)
# Security checks
analysis_results["security_analysis"] = await perform_security_analysis(filepath)
# Performance metrics
analysis_results["performance_analysis"] = await perform_performance_analysis(filepath)
# Format output
if output_format == "json":
return [TextContent(
type="text",
text=f"📊 **Enhanced PCAP Analysis Results**\n\n```json\n{json.dumps(analysis_results, indent=2)}\n```"
)]
elif output_format == "summary":
summary = generate_analysis_summary(analysis_results)
return [TextContent(type="text", text=summary)]
else: # text
text_output = generate_text_report(analysis_results)
return [TextContent(type="text", text=text_output)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ **PCAP Analysis Failed**\n\nError: {str(e)}"
)]
async def handle_pcap_time_slice(args: Dict[str, Any]) -> List[TextContent]:
"""Handle PCAP time slicing using the WiresharkPCAPTimeSlicer."""
try:
slicer = WiresharkPCAPTimeSlicer()
result = await slicer.slice_by_time_range(
input_file=args.get("input_file"),
start_time=args.get("start_time"),
end_time=args.get("end_time"),
output_file=args.get("output_file")
)
# Format the result for display
if result["status"] == "✅ Success":
formatted_result = f"""🦈 **PCAP Time Slice Results**
✅ **Operation Successful**
📁 **Files:**
- Input: {result['input_file']}
- Output: {result['output_file']}
⏰ **Time Range:**
- Start: {result['time_range']['start']}
- End: {result['time_range']['end']}
📊 **Statistics:**
```json
{json.dumps(result['statistics'], indent=2)}
```"""
else:
formatted_result = f"❌ **PCAP Time Slice Failed**\n\nError: {result.get('error', 'Unknown error')}"
return [TextContent(type="text", text=formatted_result)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ **PCAP Time Slice Failed**\n\nError: {str(e)}"
)]
# Helper functions
async def run_tshark_command(cmd: List[str], timeout: int = 300, ignore_errors: bool = False) -> subprocess.CompletedProcess:
"""Run a TShark command with timeout and error handling."""
try:
result = await asyncio.wait_for(
asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
),
timeout=timeout
)
stdout, stderr = await result.communicate()
if result.returncode != 0 and not ignore_errors:
raise RuntimeError(f"Command failed: {stderr.decode()}")
return subprocess.CompletedProcess(
cmd, result.returncode, stdout.decode(), stderr.decode()
)
except asyncio.TimeoutError:
raise RuntimeError(f"Command timed out after {timeout} seconds")
def parse_protocol_hierarchy(output: str) -> Dict[str, Any]:
"""Parse TShark protocol hierarchy statistics output."""
lines = output.strip().split('\n')
stats = {
"protocols": {},
"total_packets": 0,
"total_bytes": 0
}
for line in lines:
if "frames:" in line and "bytes:" in line:
# Parse protocol line
match = re.search(r'(\S+)\s+frames:(\d+)\s+bytes:(\d+)', line)
if match:
protocol = match.group(1).strip()
frames = int(match.group(2))
bytes_count = int(match.group(3))
stats["protocols"][protocol] = {
"packets": frames,
"bytes": bytes_count
}
# Update totals for top-level protocols
if not line.startswith(' '):
stats["total_packets"] += frames
stats["total_bytes"] += bytes_count
return stats
def parse_conversations(output: str) -> Dict[str, Any]:
"""Parse TShark conversation statistics output."""
lines = output.strip().split('\n')
conversations = []
# Find the data section
data_started = False
for line in lines:
if '<->' in line and not line.startswith('='):
data_started = True
# Parse conversation line
parts = line.split()
if len(parts) >= 10:
conversations.append({
"address_a": parts[0],
"port_a": parts[1] if len(parts) > 1 else "N/A",
"address_b": parts[3],
"port_b": parts[4] if len(parts) > 4 else "N/A",
"packets_a_to_b": int(parts[5]) if parts[5].isdigit() else 0,
"bytes_a_to_b": int(parts[6]) if parts[6].isdigit() else 0,
"packets_b_to_a": int(parts[8]) if len(parts) > 8 and parts[8].isdigit() else 0,
"bytes_b_to_a": int(parts[9]) if len(parts) > 9 and parts[9].isdigit() else 0
})
return {
"count": len(conversations),
"conversations": conversations[:10], # Top 10 conversations
"total_listed": len(conversations)
}
def parse_endpoints(output: str) -> Dict[str, Any]:
"""Parse TShark endpoint statistics output."""
lines = output.strip().split('\n')
endpoints = []
for line in lines:
if re.match(r'^\d+\.\d+\.\d+\.\d+', line) or re.match(r'^[0-9a-fA-F:]+', line):
parts = line.split()
if len(parts) >= 3:
endpoints.append({
"address": parts[0],
"packets": int(parts[1]) if parts[1].isdigit() else 0,
"bytes": int(parts[2]) if parts[2].isdigit() else 0
})
# Sort by packets
endpoints.sort(key=lambda x: x["packets"], reverse=True)
return {
"count": len(endpoints),
"top_talkers": endpoints[:10], # Top 10 endpoints
"total_endpoints": len(endpoints)
}
def parse_io_statistics(output: str) -> Dict[str, Any]:
"""Parse TShark I/O statistics output."""
lines = output.strip().split('\n')
intervals = []
for line in lines:
if "Interval:" in line or "-" in line and "|" in line:
# Parse interval line
match = re.search(r'(\d+\.\d+)\s*-\s*(\d+\.\d+)\s+(\d+)', line)
if match:
intervals.append({
"start": float(match.group(1)),
"end": float(match.group(2)),
"packets": int(match.group(3))
})
return {
"interval_count": len(intervals),
"intervals": intervals
}
def parse_capinfos_machine_output(output: str) -> Dict[str, Any]:
"""Parse capinfos machine-readable output."""
info = {}
for line in output.strip().split('\n'):
if '\t' in line:
key, value = line.split('\t', 1)
info[key] = value
return info
def parse_expert_info(output: str) -> Dict[str, Any]:
"""Parse TShark expert info output."""
expert_info = {
"errors": 0,
"warnings": 0,
"notes": 0,
"chats": 0
}
for line in output.split('\n'):
if "Errors" in line:
match = re.search(r'Errors\s*\((\d+)\)', line)
if match:
expert_info["errors"] = int(match.group(1))
elif "Warnings" in line:
match = re.search(r'Warnings\s*\((\d+)\)', line)
if match:
expert_info["warnings"] = int(match.group(1))
elif "Notes" in line:
match = re.search(r'Notes\s*\((\d+)\)', line)
if match:
expert_info["notes"] = int(match.group(1))
elif "Chats" in line:
match = re.search(r'Chats\s*\((\d+)\)', line)
if match:
expert_info["chats"] = int(match.group(1))
return expert_info
def parse_http_stats(output: str) -> Dict[str, Any]:
"""Parse HTTP statistics from TShark output."""
http_stats = {
"requests": 0,
"responses": 0,
"methods": {},
"status_codes": {}
}
# Simple parsing - can be enhanced
for line in output.split('\n'):
if "GET" in line:
http_stats["methods"]["GET"] = http_stats["methods"].get("GET", 0) + 1
elif "POST" in line:
http_stats["methods"]["POST"] = http_stats["methods"].get("POST", 0) + 1
# Add more parsing as needed
return http_stats
async def perform_security_analysis(filepath: str) -> Dict[str, Any]:
"""Perform security-focused analysis on PCAP file."""
security_findings = {
"suspicious_patterns": {},
"threat_indicators": 0
}
# Security checks
security_checks = [
("TCP SYN Flood", "tcp.flags.syn==1 and tcp.flags.ack==0"),
("Port Scan", "tcp.flags.syn==1 and tcp.flags.ack==0 and tcp.window_size <= 1024"),
("DNS Tunneling", "dns and frame.len > 512"),
("Suspicious HTTP", "http.request.method == POST and frame.len > 8192"),
("Non-standard Ports", "tcp.port > 49151 or udp.port > 49151"),
("ICMP Tunneling", "icmp and data.len > 48"),
("ARP Spoofing", "arp.opcode == 2")
]
for check_name, filter_expr in security_checks:
try:
cmd = ["tshark", "-r", filepath, "-Y", filter_expr, "-T", "fields", "-e", "frame.number"]
result = await run_tshark_command(cmd, timeout=60)
if result.stdout.strip():
count = len(result.stdout.strip().split('\n'))
security_findings["suspicious_patterns"][check_name] = count
security_findings["threat_indicators"] += 1
except Exception as e:
logger.warning(f"Security check '{check_name}' failed: {e}")
return security_findings
async def perform_performance_analysis(filepath: str) -> Dict[str, Any]:
"""Perform performance-focused analysis on PCAP file."""
performance_metrics = {
"tcp_issues": {},
"overall_health": "Unknown"
}
# TCP performance checks
tcp_checks = [
("Retransmissions", "tcp.analysis.retransmission"),
("Duplicate ACKs", "tcp.analysis.duplicate_ack"),
("Zero Window", "tcp.analysis.zero_window"),
("Window Full", "tcp.analysis.window_full"),
("Out of Order", "tcp.analysis.out_of_order"),
("Fast Retransmission", "tcp.analysis.fast_retransmission")
]
total_issues = 0
for check_name, filter_expr in tcp_checks:
try:
cmd = ["tshark", "-r", filepath, "-Y", filter_expr, "-T", "fields", "-e", "frame.number"]
result = await run_tshark_command(cmd, timeout=60)
if result.stdout.strip():
count = len(result.stdout.strip().split('\n'))
performance_metrics["tcp_issues"][check_name] = count
total_issues += count
except Exception as e:
logger.warning(f"Performance check '{check_name}' failed: {e}")
# Determine overall health
if total_issues == 0:
performance_metrics["overall_health"] = "Excellent"
elif total_issues < 100:
performance_metrics["overall_health"] = "Good"
elif total_issues < 1000:
performance_metrics["overall_health"] = "Fair"
else:
performance_metrics["overall_health"] = "Poor"
performance_metrics["total_issues"] = total_issues
return performance_metrics
def generate_analysis_summary(results: Dict[str, Any]) -> str:
"""Generate a human-readable summary of analysis results."""
summary = "📊 **PCAP Analysis Summary**\n\n"
# File info
if "file_info" in results:
summary += f"**File**: {results['file_info']['path']}\n"
summary += f"**Size**: {results['file_info']['size_mb']}\n\n"
# Metadata
if "file_metadata" in results:
meta = results["file_metadata"]
if "Number of packets" in meta:
summary += f"**Total Packets**: {meta['Number of packets']}\n"
if "Capture duration" in meta:
summary += f"**Duration**: {meta['Capture duration']}\n\n"
# Protocol summary
if "protocol_hierarchy" in results:
ph = results["protocol_hierarchy"]
summary += "**Top Protocols**:\n"
sorted_protos = sorted(
ph["protocols"].items(),
key=lambda x: x[1]["packets"],
reverse=True
)[:5]
for proto, stats in sorted_protos:
summary += f"- {proto}: {stats['packets']:,} packets\n"
summary += "\n"
# Security findings
if "security_analysis" in results:
sec = results["security_analysis"]
if sec["threat_indicators"] > 0:
summary += f"⚠️ **Security Alerts**: {sec['threat_indicators']} suspicious patterns detected\n"
for pattern, count in sec["suspicious_patterns"].items():
summary += f"- {pattern}: {count} occurrences\n"
summary += "\n"
# Performance issues
if "performance_analysis" in results:
perf = results["performance_analysis"]
summary += f"**Network Health**: {perf['overall_health']}\n"
if perf["total_issues"] > 0:
summary += f"**Performance Issues**: {perf['total_issues']} total\n"
for issue, count in perf["tcp_issues"].items():
if count > 0:
summary += f"- {issue}: {count}\n"
return summary
def generate_text_report(results: Dict[str, Any]) -> str:
"""Generate a detailed text report of analysis results."""
report = "=" * 60 + "\n"
report += "ENHANCED PCAP ANALYSIS REPORT\n"
report += "=" * 60 + "\n\n"
# Recursively format the results
def format_dict(d: Dict, indent: int = 0) -> str:
output = ""
prefix = " " * indent
for key, value in d.items():
if isinstance(value, dict):
output += f"{prefix}{key}:\n"
output += format_dict(value, indent + 1)
elif isinstance(value, list):
output += f"{prefix}{key}: [{len(value)} items]\n"
for i, item in enumerate(value[:3]): # Show first 3 items
if isinstance(item, dict):
output += f"{prefix} [{i}]:\n"
output += format_dict(item, indent + 2)
else:
output += f"{prefix} - {item}\n"
if len(value) > 3:
output += f"{prefix} ... and {len(value) - 3} more\n"
else:
output += f"{prefix}{key}: {value}\n"
return output
report += format_dict(results)
return report
# Original handler functions from server.py
async def handle_system_info(args: Dict[str, Any]) -> List[TextContent]:
"""Handle system information requests."""
info_type = args.get("info_type", "all")
result = {
"wireshark_mcp_server": "2.0.0 Enhanced",
"status": "✅ Active",
"capabilities": ["Live Capture", "PCAP Analysis", "Filter Generation", "Security Analysis", "JSON Streaming", "Protocol Statistics"]
}
if info_type in ["interfaces", "all"]:
try:
# Get network interfaces using ip command
interfaces_result = subprocess.run(
["ip", "link", "show"],
capture_output=True,
text=True,
timeout=10
)
if interfaces_result.returncode == 0:
interfaces = []
for line in interfaces_result.stdout.split('\n'):
if ': ' in line and 'state' in line:
interface_name = line.split(':')[1].strip().split('@')[0]
interfaces.append(interface_name)
result["network_interfaces"] = interfaces
else:
result["network_interfaces"] = ["Unable to retrieve interfaces"]
except Exception as e:
result["network_interfaces"] = [f"Error: {str(e)}"]
if info_type in ["capabilities", "all"]:
result["tools_available"] = [
"System Information",
"Setup Validation",
"Filter Generation",
"Live Capture",
"PCAP Analysis",
"Real-time JSON Capture",
"Protocol Statistics",
"Enhanced Analysis",
"PCAP Time Slicer"
]
return [TextContent(
type="text",
text=f"🦈 **Wireshark MCP System Information**\n\n{json.dumps(result, indent=2)}"
)]
async def handle_validate_setup(args: Dict[str, Any]) -> List[TextContent]:
"""Validate Wireshark installation and setup."""
full_check = args.get("full_check", False)
validation_results = {
"wireshark_mcp_server": "✅ Running (Enhanced Version)",
"python_version": f"✅ {os.sys.version.split()[0]}",
"dependencies": {}
}
# Check for required tools
tools_to_check = ["tshark", "tcpdump", "dumpcap", "capinfos"]
for tool in tools_to_check:
try:
result = subprocess.run(
["which", tool],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
validation_results["dependencies"][tool] = f"✅ {result.stdout.strip()}"
else:
validation_results["dependencies"][tool] = "❌ Not found"
except Exception as e:
validation_results["dependencies"][tool] = f"❌ Error: {str(e)}"
# Check permissions
try:
# Check if we can access network interfaces
result = subprocess.run(
["ip", "link", "show"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
validation_results["network_access"] = "✅ Available"
else:
validation_results["network_access"] = "❌ Limited"
except Exception:
validation_results["network_access"] = "❌ Error checking access"
# Check enhanced features
validation_results["enhanced_features"] = {
"json_capture": "✅ Available",
"protocol_statistics": "✅ Available",
"streaming_analysis": "✅ Available"
}
return [TextContent(
type="text",
text=f"🔍 **Wireshark MCP Setup Validation**\n\n{json.dumps(validation_results, indent=2)}"
)]
async def handle_generate_filter(args: Dict[str, Any]) -> List[TextContent]:
"""Generate Wireshark filters from natural language with advanced parsing."""
description = args.get("description", "")
complexity = args.get("complexity", "intermediate")
# Enhanced filter generation with regex patterns and subnet support
generated_filter = await advanced_filter_generation(description, complexity)
result = {
"description": description,
"generated_filter": generated_filter["filter"],
"complexity": complexity,
"suggestions": generated_filter["suggestions"],
"matched_patterns": generated_filter["matched_patterns"],
"parsing_notes": generated_filter.get("notes", [])
}
return [TextContent(
type="text",
text=f"🎯 **Generated Wireshark Filter**\n\n**Input**: {description}\n\n**Filter**: `{generated_filter['filter']}`\n\n**Details**:\n{json.dumps(result, indent=2)}"
)]
async def handle_live_capture(args: Dict[str, Any]) -> List[TextContent]:
"""Handle live packet capture with automatic permissions detection."""
interface = args.get("interface", "any")
duration = args.get("duration", 60)
filter_expr = args.get("filter", "")
max_packets = args.get("max_packets", 1000)
# Check if we have capture capabilities
# Enhanced implementation - try capture regardless of permission check
# This allows fallback methods to work even if primary permissions are missing
try:
capture_result = await perform_live_capture_enhanced(interface, duration, filter_expr, max_packets)
return [TextContent(
type="text",
text=f"📡 **Live Packet Capture Results**\n\n{json.dumps(capture_result, indent=2)}"
)]
except Exception as e:
error_result = {
"status": "❌ Capture Failed",
"interface": interface,
"error": str(e),
"troubleshooting": [
"Verify interface name with: ip link show",
"Check permissions with: ./test_capture_permissions.py",
"Ensure you're in wireshark group: groups $USER"
]
}
return [TextContent(
type="text",
text=f"❌ **Live Capture Failed**\n\n{json.dumps(error_result, indent=2)}"
)]
async def handle_analyze_pcap(args: Dict[str, Any]) -> List[TextContent]:
"""Handle PCAP file analysis with real packet inspection."""
filepath = args.get("filepath", "")
analysis_type = args.get("analysis_type", "comprehensive")
if not filepath:
return [TextContent(type="text", text="❌ Error: No filepath provided")]
# Check if file exists
if not os.path.exists(filepath):
return [TextContent(type="text", text=f"❌ Error: File not found: {filepath}")]
# Check file permissions
if not os.access(filepath, os.R_OK):
return [TextContent(type="text", text=f"❌ Error: Cannot read file: {filepath}")]
try:
analysis_result = await analyze_pcap_file(filepath, analysis_type)
return [TextContent(
type="text",
text=f"📊 **PCAP Analysis Results**\n\n{json.dumps(analysis_result, indent=2)}"
)]
except Exception as e:
error_result = {
"status": "❌ Analysis Failed",
"file": filepath,
"error": str(e),
"troubleshooting": [
"Verify file is a valid PCAP/PCAPNG file",
"Check file permissions: ls -la",
"Try with tshark: tshark -r filename.pcap"
]
}
return [TextContent(
type="text",
text=f"❌ **PCAP Analysis Failed**\n\n{json.dumps(error_result, indent=2)}"
)]
# Helper functions from original server.py
async def check_capture_permissions() -> bool:
"""Check if we have permissions to capture packets without sudo."""
try:
# Test dumpcap first (preferred method)
result = subprocess.run(
["dumpcap", "-D"], # List interfaces
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0 and "Capture interface" in result.stdout:
return True
except Exception:
pass
try:
# Test tshark as fallback
result = subprocess.run(
["tshark", "-D"], # List interfaces
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0 and len(result.stdout.strip()) > 0:
return True
except Exception:
pass
return False
async def perform_live_capture(interface: str, duration: int, filter_expr: str, max_packets: int) -> Dict[str, Any]:
"""Perform actual live packet capture using available tools."""
# Create temporary file for capture
temp_file = tempfile.NamedTemporaryFile(suffix='.pcap', delete=False)
temp_file.close()
try:
# Try dumpcap first (most secure)
capture_cmd = [
"dumpcap",
"-i", interface,
"-c", str(max_packets),
"-a", f"duration:{duration}",
"-w", temp_file.name
]
if filter_expr:
capture_cmd.extend(["-f", filter_expr])
logger.info(f"Starting packet capture: {' '.join(capture_cmd)}")
# Run capture with timeout
result = subprocess.run(
capture_cmd,
capture_output=True,
text=True,
timeout=duration + 10 # Extra time for cleanup
)
if result.returncode != 0:
# Try tshark as fallback
capture_cmd = [
"tshark",
"-i", interface,
"-c", str(max_packets),
"-a", f"duration:{duration}",
"-w", temp_file.name
]
if filter_expr:
capture_cmd.extend(["-f", filter_expr])
result = subprocess.run(
capture_cmd,
capture_output=True,
text=True,
timeout=duration + 10
)
# Analyze the captured file
file_size = os.path.getsize(temp_file.name) if os.path.exists(temp_file.name) else 0
if file_size > 0:
# Get packet count using tshark
try:
count_result = subprocess.run(
["tshark", "-r", temp_file.name, "-T", "fields", "-e", "frame.number"],
capture_output=True,
text=True,
timeout=10
)
packet_count = len(count_result.stdout.strip().split('\n')) if count_result.stdout.strip() else 0
except Exception:
packet_count = "unknown"
capture_result = {
"status": "✅ Capture Successful",
"interface": interface,
"duration": f"{duration} seconds",
"filter": filter_expr or "none",
"packets_captured": packet_count,
"file_size": f"{file_size} bytes",
"capture_file": temp_file.name,
"note": "Capture file saved temporarily - analyze quickly before cleanup"
}
else:
capture_result = {
"status": "⚠️ No Packets Captured",
"interface": interface,
"duration": f"{duration} seconds",
"filter": filter_expr or "none",
"possible_reasons": [
"Interface has no traffic",
"Filter too restrictive",
"Interface not active",
"Permission issues"
]
}
return capture_result
except subprocess.TimeoutExpired:
return {
"status": "⚠️ Capture Timeout",
"interface": interface,
"note": "Capture took longer than expected - may have succeeded partially"
}
except Exception as e:
return {
"status": "❌ Capture Error",
"interface": interface,
"error": str(e)
}
finally:
# Clean up temp file after a delay (allow time for analysis)
try:
if os.path.exists(temp_file.name):
# Schedule cleanup after 5 minutes
asyncio.create_task(delayed_cleanup(temp_file.name, 300))
except Exception:
pass
async def perform_live_capture_enhanced(interface: str, duration: int, filter_expr: str, max_packets: int) -> Dict[str, Any]:
"""Enhanced live capture with multiple fallback methods for extended duration support."""
capture_start_time = asyncio.get_event_loop().time()
# Method 1: Try traditional tshark first
try:
cmd = [
'tshark',
'-i', interface,
'-c', str(max_packets),
'-a', f'duration:{duration}',
'-T', 'json'
]
if filter_expr:
cmd.extend(['-f', filter_expr])
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=duration + 30)
if proc.returncode == 0:
try:
packets = json.loads(stdout.decode()) if stdout.decode().strip() else []
capture_time = asyncio.get_event_loop().time() - capture_start_time
return {
"status": "✅ Success",
"method_used": "tshark_direct",
"interface": interface,
"duration": duration,
"filter": filter_expr,
"packets_captured": len(packets),
"max_packets": max_packets,
"packets": packets[:10], # Return first 10 for display
"capture_time_seconds": round(capture_time, 2),
"note": "Direct tshark capture successful"
}
except json.JSONDecodeError:
return {
"status": "⚠️ Partial Success",
"method_used": "tshark_direct",
"interface": interface,
"raw_output": stdout.decode()[:1000],
"error": "Could not parse JSON output"
}
else:
# Try fallback methods
logger.info(f"tshark direct failed ({stderr.decode()[:100]}...), trying fallback methods")
return await _enhanced_capture_fallback(interface, duration, filter_expr, max_packets, capture_start_time)
except asyncio.TimeoutError:
# Try fallback methods even on timeout
logger.info(f"tshark timed out after {duration}s, trying fallback methods")
return await _enhanced_capture_fallback(interface, duration, filter_expr, max_packets, capture_start_time)
except Exception as e:
# Try fallback methods on any exception
logger.info(f"tshark failed with exception ({str(e)}), trying fallback methods")
return await _enhanced_capture_fallback(interface, duration, filter_expr, max_packets, capture_start_time)
async def _enhanced_capture_fallback(
interface: str,
duration: int,
filter_expr: str,
max_packets: int,
start_time: float
) -> Dict[str, Any]:
"""Enhanced fallback capture methods for when tshark fails due to permissions."""
import tempfile
# Method 2: Try tcpdump + tshark analysis (most reliable fallback)
try:
with tempfile.NamedTemporaryFile(suffix='.pcap', delete=False) as tmp:
pcap_file = tmp.name
# Build tcpdump command
cmd = [
'timeout', str(duration + 10), # Add buffer time
'tcpdump', '-i', interface,
'-w', pcap_file,
'-c', str(max_packets),
'-q'
]
if filter_expr:
cmd.append(filter_expr)
# Capture with tcpdump
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=duration + 15)
if proc.returncode in [0, 124]: # Success or timeout (expected)
# Parse with tshark
parse_cmd = ['tshark', '-r', pcap_file, '-T', 'json', '-c', str(min(10, max_packets))]
parse_proc = await asyncio.create_subprocess_exec(
*parse_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
parse_stdout, parse_stderr = await parse_proc.communicate()
if parse_proc.returncode == 0:
try:
packets = json.loads(parse_stdout.decode()) if parse_stdout.decode().strip() else []
capture_time = asyncio.get_event_loop().time() - start_time
# Clean up temp file
os.unlink(pcap_file)
return {
"status": "✅ Success",
"method_used": "tcpdump + tshark analysis",
"interface": interface,
"duration": duration,
"filter": filter_expr,
"packets_captured": len(packets),
"max_packets": max_packets,
"packets": packets,
"capture_time_seconds": round(capture_time, 2),
"note": "Enhanced capture with permission workaround - supports extended duration (5+ minutes)",
"recommendation": "Original tshark failed, used tcpdump fallback successfully"
}
except json.JSONDecodeError:
os.unlink(pcap_file) if os.path.exists(pcap_file) else None
# Continue to next method
pass
else:
os.unlink(pcap_file) if os.path.exists(pcap_file) else None
logger.info(f"tshark parse failed: {parse_stderr.decode()[:100]}")
else:
os.unlink(pcap_file) if os.path.exists(pcap_file) else None
logger.info(f"tcpdump failed: {stderr.decode()[:100]}")
except Exception as e:
logger.info(f"tcpdump fallback failed: {str(e)}")
if 'pcap_file' in locals() and os.path.exists(pcap_file):
os.unlink(pcap_file)
# Method 3: Try sg wireshark (group switching)
try:
cmd = [
'sg', 'wireshark', '-c',
f'timeout {duration + 5} tshark -i {interface} -c {max_packets} -T json'
]
if filter_expr:
cmd[-1] += f' -f "{filter_expr}"'
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=duration + 10)
if proc.returncode in [0, 124]:
try:
packets = json.loads(stdout.decode()) if stdout.decode().strip() else []
capture_time = asyncio.get_event_loop().time() - start_time
return {
"status": "✅ Success",
"method_used": "sg wireshark + tshark",
"interface": interface,
"duration": duration,
"filter": filter_expr,
"packets_captured": len(packets),
"max_packets": max_packets,
"packets": packets[:10],
"capture_time_seconds": round(capture_time, 2),
"note": "Used group switching to access wireshark group",
"recommendation": "Consider restarting Claude Desktop to activate wireshark group permanently"
}
except json.JSONDecodeError:
logger.info("sg wireshark: JSON decode failed")
else:
logger.info(f"sg wireshark failed: {stderr.decode()[:100]}")
except Exception as e:
logger.info(f"sg wireshark fallback failed: {str(e)}")
# All methods failed
capture_time = asyncio.get_event_loop().time() - start_time
return {
"status": "❌ All capture methods failed",
"method_used": "none - all fallbacks exhausted",
"interface": interface,
"duration": duration,
"filter": filter_expr,
"max_packets": max_packets,
"capture_time_seconds": round(capture_time, 2),
"error": "Permission denied - tshark, tcpdump, and sg wireshark all failed",
"recommendations": [
"1. Run: sudo setcap cap_net_raw,cap_net_admin=eip /usr/bin/tcpdump",
"2. Run: sudo usermod -a -G wireshark $USER",
"3. Restart Claude Desktop to activate wireshark group",
"4. Or use async background capture with: async_long_capture.py"
]
}
async def delayed_cleanup(filepath: str, delay_seconds: int):
"""Clean up temporary files after a delay."""
try:
await asyncio.sleep(delay_seconds)
if os.path.exists(filepath):
os.unlink(filepath)
logger.info(f"Cleaned up temporary file: {filepath}")
except Exception as e:
logger.warning(f"Failed to clean up {filepath}: {e}")
async def analyze_pcap_file(filepath: str, analysis_type: str) -> Dict[str, Any]:
"""Analyze PCAP file using tshark for comprehensive packet inspection."""
file_info = {
"file": filepath,
"file_size": f"{os.path.getsize(filepath)} bytes",
"analysis_type": analysis_type
}
try:
# Basic file info using capinfos
try:
capinfos_result = subprocess.run(
["capinfos", filepath],
capture_output=True,
text=True,
timeout=30
)
if capinfos_result.returncode == 0:
file_info["file_details"] = parse_capinfos_output(capinfos_result.stdout)
except Exception:
# Fallback to tshark for basic info
pass
# Packet analysis based on type
if analysis_type == "quick":
analysis_result = await quick_pcap_analysis(filepath)
elif analysis_type == "security":
analysis_result = await security_pcap_analysis(filepath)
elif analysis_type == "performance":
analysis_result = await performance_pcap_analysis(filepath)
else: # comprehensive
analysis_result = await comprehensive_pcap_analysis(filepath)
return {
"status": "✅ Analysis Complete",
"file_info": file_info,
**analysis_result
}
except Exception as e:
return {
"status": "❌ Analysis Error",
"file_info": file_info,
"error": str(e)
}
def parse_capinfos_output(output: str) -> Dict[str, str]:
"""Parse capinfos output into structured data."""
info = {}
for line in output.split('\n'):
if ':' in line:
key, value = line.split(':', 1)
info[key.strip()] = value.strip()
return info
async def quick_pcap_analysis(filepath: str) -> Dict[str, Any]:
"""Quick analysis - basic packet counts and protocols."""
try:
# Get total packet count
count_result = subprocess.run(
["tshark", "-r", filepath, "-T", "fields", "-e", "frame.number"],
capture_output=True,
text=True,
timeout=30
)
total_packets = len(count_result.stdout.strip().split('\n')) if count_result.stdout.strip() else 0
# Get protocol statistics
proto_result = subprocess.run(
["tshark", "-r", filepath, "-q", "-z", "ptype,tree"],
capture_output=True,
text=True,
timeout=30
)
protocols = {}
if proto_result.returncode == 0:
for line in proto_result.stdout.split('\n'):
if 'frames:' in line and '%' in line:
parts = line.split()
if len(parts) >= 3:
protocol = parts[-1]
count = parts[0]
protocols[protocol] = count
return {
"analysis_summary": {
"total_packets": total_packets,
"protocols": protocols,
"analysis_duration": "< 30 seconds"
}
}
except Exception as e:
return {"quick_analysis_error": str(e)}
async def security_pcap_analysis(filepath: str) -> Dict[str, Any]:
"""Security-focused analysis - suspicious patterns and threats."""
try:
security_findings = {}
# Check for suspicious traffic patterns
suspicious_patterns = [
("TCP SYN flood", "tcp.flags.syn==1 and tcp.flags.ack==0"),
("Port scan indicators", "tcp.flags.syn==1 and tcp.flags.ack==0 and tcp.window_size <= 1024"),
("DNS tunneling", "dns.qry.name contains \".\" and frame.len > 512"),
("Large HTTP requests", "http.request and frame.len > 8192"),
("Non-standard ports", "tcp.port > 49151 or udp.port > 49151")
]
for pattern_name, filter_expr in suspicious_patterns:
try:
result = subprocess.run(
["tshark", "-r", filepath, "-Y", filter_expr, "-T", "fields", "-e", "frame.number"],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0 and result.stdout.strip():
matches = len(result.stdout.strip().split('\n'))
security_findings[pattern_name] = f"{matches} occurrences"
except Exception:
security_findings[pattern_name] = "Analysis failed"
return {
"security_analysis": security_findings,
"threat_indicators": len([v for v in security_findings.values() if "occurrences" in v])
}
except Exception as e:
return {"security_analysis_error": str(e)}
async def performance_pcap_analysis(filepath: str) -> Dict[str, Any]:
"""Performance analysis - latency, throughput, errors."""
try:
performance_metrics = {}
# Analyze TCP performance issues
tcp_issues = [
("Retransmissions", "tcp.analysis.retransmission"),
("Duplicate ACKs", "tcp.analysis.duplicate_ack"),
("Zero Window", "tcp.analysis.zero_window"),
("Keep Alive", "tcp.analysis.keep_alive")
]
for issue_name, filter_expr in tcp_issues:
try:
result = subprocess.run(
["tshark", "-r", filepath, "-Y", filter_expr, "-T", "fields", "-e", "frame.number"],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0 and result.stdout.strip():
count = len(result.stdout.strip().split('\n'))
performance_metrics[issue_name] = count
else:
performance_metrics[issue_name] = 0
except Exception:
performance_metrics[issue_name] = "Analysis failed"
return {
"performance_analysis": performance_metrics,
"network_health": "Good" if sum(v for v in performance_metrics.values() if isinstance(v, int)) == 0 else "Issues detected"
}
except Exception as e:
return {"performance_analysis_error": str(e)}
async def comprehensive_pcap_analysis(filepath: str) -> Dict[str, Any]:
"""Comprehensive analysis combining all analysis types."""
try:
# Run all analysis types
quick_results = await quick_pcap_analysis(filepath)
security_results = await security_pcap_analysis(filepath)
performance_results = await performance_pcap_analysis(filepath)
# Additional comprehensive statistics
comprehensive_stats = {}
# Get conversation statistics
try:
conv_result = subprocess.run(
["tshark", "-r", filepath, "-q", "-z", "conv,tcp"],
capture_output=True,
text=True,
timeout=30
)
if conv_result.returncode == 0:
lines = conv_result.stdout.split('\n')
tcp_conversations = len([l for l in lines if '<->' in l])
comprehensive_stats["tcp_conversations"] = tcp_conversations
except Exception:
comprehensive_stats["tcp_conversations"] = "Analysis failed"
return {
"comprehensive_analysis": {
**quick_results,
**security_results,
**performance_results,
"additional_stats": comprehensive_stats
}
}
except Exception as e:
return {"comprehensive_analysis_error": str(e)}
async def advanced_filter_generation(description: str, complexity: str) -> Dict[str, Any]:
"""Advanced filter generation with regex patterns, subnet support, and smart parsing."""
description_lower = description.lower()
matched_patterns = []
filter_parts = []
notes = []
# Enhanced pattern matching with regex
patterns = {
# Network patterns with subnet support
"ip_address": (r'\b(?:\d{1,3}\.){3}\d{1,3}\b', lambda m: f'ip.addr == {m.group()}'),
"subnet_cidr": (r'\b(?:\d{1,3}\.){3}\d{1,3}/\d{1,2}\b', lambda m: f'ip.addr == {m.group()}'),
"ip_range": (r'\b(?:\d{1,3}\.){3}\d{1,3}\s*-\s*(?:\d{1,3}\.){3}\d{1,3}\b',
lambda m: parse_ip_range(m.group())),
# Port patterns
"port_number": (r'\bport\s+(\d+)\b', lambda m: f'tcp.port == {m.group(1)} or udp.port == {m.group(1)}'),
"port_range": (r'\bports?\s+(\d+)\s*-\s*(\d+)\b',
lambda m: f'(tcp.port >= {m.group(1)} and tcp.port <= {m.group(2)}) or (udp.port >= {m.group(1)} and udp.port <= {m.group(2)})'),
# Protocol patterns
"http_traffic": (r'\bhttp\b(?!\w)', lambda m: 'http'),
"https_traffic": (r'\bhttps?\s+secure|ssl|tls\b', lambda m: 'tls'),
"dns_queries": (r'\bdns\b(?!\w)', lambda m: 'dns'),
"email_traffic": (r'\bemail|smtp|pop|imap\b', lambda m: 'smtp or pop or imap'),
"ssh_traffic": (r'\bssh\b(?!\w)', lambda m: 'ssh'),
"ftp_traffic": (r'\bftp\b(?!\w)', lambda m: 'ftp'),
# Advanced patterns
"slow_connections": (r'\bslow|high\s+latency|delayed?\b', lambda m: 'tcp.analysis.ack_rtt > 0.1'),
"error_traffic": (r'\berrors?|retransmission|failed?\b',
lambda m: 'tcp.analysis.retransmission or tcp.analysis.duplicate_ack'),
"large_packets": (r'\blarge\s+packets?|big\s+frames?\b', lambda m: 'frame.len > 1500'),
# Host patterns
"hostname": (r'\b(?:host|server|domain)\s+([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b',
lambda m: f'ip.host == "{m.group(1)}"'),
"contains_domain": (r'\b(?:contains?|includes?)\s+([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b',
lambda m: f'http.host contains "{m.group(1)}"'),
}
# Protocol keyword mapping (fallback)
protocol_keywords = {
"web": "http or tls",
"tcp": "tcp",
"udp": "udp",
"icmp": "icmp",
"ping": "icmp",
"arp": "arp",
"broadcast": "eth.dst == ff:ff:ff:ff:ff:ff",
"multicast": "ip.dst[0:4] == 0x0e"
}
# Apply regex patterns
for pattern_name, (regex, formatter) in patterns.items():
matches = re.finditer(regex, description_lower)
for match in matches:
try:
filter_expr = formatter(match)
filter_parts.append(filter_expr)
matched_patterns.append(pattern_name)
notes.append(f"Detected {pattern_name}: {match.group()}")
except Exception as e:
notes.append(f"Error parsing {pattern_name}: {str(e)}")
# Apply keyword matching if no regex matches
if not filter_parts:
for keyword, filter_expr in protocol_keywords.items():
if keyword in description_lower:
filter_parts.append(filter_expr)
matched_patterns.append(f"keyword_{keyword}")
# Generate final filter
if filter_parts:
if len(filter_parts) == 1:
final_filter = filter_parts[0]
else:
# Intelligently combine filters
final_filter = combine_filters_intelligently(filter_parts, description_lower)
else:
# Ultimate fallback
if any(word in description_lower for word in ["traffic", "packets", "network"]):
final_filter = "tcp or udp"
else:
final_filter = "tcp"
notes.append("Used fallback filter - consider more specific description")
# Generate contextual suggestions
suggestions = generate_contextual_suggestions(description_lower, matched_patterns)
return {
"filter": final_filter,
"matched_patterns": matched_patterns,
"suggestions": suggestions,
"notes": notes
}
def parse_ip_range(range_str: str) -> str:
"""Parse IP range like '192.168.1.1 - 192.168.1.100' into Wireshark filter."""
try:
start_ip, end_ip = [ip.strip() for ip in range_str.split('-')]
return f'ip.addr >= {start_ip} and ip.addr <= {end_ip}'
except Exception:
return f'ip.addr == {range_str}' # Fallback
def combine_filters_intelligently(filter_parts: List[str], description: str) -> str:
"""Intelligently combine multiple filter parts based on context."""
# Check for exclusion words
if any(word in description for word in ["not", "except", "exclude", "without"]):
# Use NOT logic for exclusions
if len(filter_parts) >= 2:
return f"({filter_parts[0]}) and not ({filter_parts[1]})"
# Check for OR logic indicators
if any(word in description for word in ["or", "either", "any"]):
return " or ".join(f"({part})" for part in filter_parts)
# Default to AND logic
return " and ".join(f"({part})" for part in filter_parts)
def generate_contextual_suggestions(description: str, matched_patterns: List[str]) -> List[str]:
"""Generate contextual suggestions based on what was detected."""
suggestions = []
# Basic suggestions
suggestions.extend([
"Use specific IP addresses: ip.addr == 192.168.1.1",
"Filter by port: tcp.port == 80 or udp.port == 53",
"Combine with operators: (http) and (ip.addr == 192.168.1.0/24)"
])
# Context-specific suggestions
if any("ip" in pattern for pattern in matched_patterns):
suggestions.append("Try subnet notation: ip.addr == 192.168.1.0/24")
suggestions.append("Use IP ranges: ip.addr >= 10.0.0.1 and ip.addr <= 10.0.0.100")
if any("port" in pattern for pattern in matched_patterns):
suggestions.append("Specific protocols: tcp.port == 443 (HTTPS) or udp.port == 53 (DNS)")
if any("http" in pattern for pattern in matched_patterns):
suggestions.append("HTTP methods: http.request.method == GET")
suggestions.append("HTTP hosts: http.host == \"example.com\"")
if "slow" in description or "latency" in description:
suggestions.append("RTT analysis: tcp.analysis.ack_rtt > 0.05")
suggestions.append("Window scaling: tcp.window_size_scalefactor > 1")
return suggestions[:6] # Limit to 6 suggestions
async def main():
"""Main server entry point."""
logger.info("🦈 Starting Enhanced Wireshark MCP Server v2.0")
logger.info("✨ Features: JSON Capture, Protocol Statistics, Enhanced Analysis")
logger.info("📊 Total Tools Available: 8")
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())