Skip to main content
Glama
PreistlyPython

Wireshark MCP Server

advanced_tools_integration.py25.2 kB
#!/usr/bin/env python3 """ Advanced Tools Integration for Wireshark MCP Server ================================================== This module integrates the 10 advanced Wireshark tools into the MCP server. Tools included: 1. PCAP Time Slicer - Extract specific time windows from captures 2. PCAP Splitter - Split large captures by packets/time/size 3. PCAP Merger - Intelligently merge multiple captures 4. Hex-to-PCAP Converter - Convert hex dumps to analyzable format 5. HTTP Deep Analyzer - Deep HTTP/HTTPS transaction analysis 6. DNS Query Analyzer - DNS traffic intelligence and tunneling detection 7. SSL/TLS Inspector - Certificate analysis and traffic decryption 8. Latency Profiler - Network performance profiling 9. Threat Detector - AI-powered security threat detection 10. Remote Capture - Distributed packet capture via SSH """ from typing import List, Dict, Any from mcp.types import Tool # Import the advanced tools implementations from advanced_tools_implementation import ( WiresharkPCAPTimeSlicer, WiresharkPCAPSplitter, WiresharkPCAPMerger, WiresharkHexToPCAP, WiresharkHTTPAnalyzer, WiresharkDNSAnalyzer, WiresharkSSLInspector, WiresharkLatencyProfiler, WiresharkThreatDetector, WiresharkRemoteCapture ) # Initialize tool instances pcap_slicer = WiresharkPCAPTimeSlicer() pcap_splitter = WiresharkPCAPSplitter() pcap_merger = WiresharkPCAPMerger() hex_converter = WiresharkHexToPCAP() http_analyzer = WiresharkHTTPAnalyzer() dns_analyzer = WiresharkDNSAnalyzer() ssl_inspector = WiresharkSSLInspector() latency_profiler = WiresharkLatencyProfiler() threat_detector = WiresharkThreatDetector() remote_capture = WiresharkRemoteCapture() def get_advanced_tool_definitions() -> List[Tool]: """Get MCP tool definitions for all advanced tools.""" return [ # 1. PCAP Time Slicer Tool( name="wireshark_pcap_time_slice", description="Extract specific time windows from PCAP captures using editcap", inputSchema={ "type": "object", "properties": { "input_file": { "type": "string", "description": "Input PCAP file path" }, "start_time": { "type": "string", "description": "Start time (ISO format: YYYY-MM-DDThh:mm:ss or Unix epoch)" }, "end_time": { "type": "string", "description": "End time (ISO format: YYYY-MM-DDThh:mm:ss or Unix epoch)" }, "output_file": { "type": "string", "description": "Output file path (optional)", "default": None }, "mode": { "type": "string", "enum": ["time_range", "duration", "relative"], "description": "Slicing mode", "default": "time_range" }, "duration_seconds": { "type": "integer", "description": "Duration in seconds (for duration mode)", "default": 60 }, "start_offset": { "type": "number", "description": "Start offset in seconds (for relative mode)", "default": 0 }, "end_offset": { "type": "number", "description": "End offset in seconds (for relative mode)", "default": 60 } }, "required": ["input_file", "start_time"], "additionalProperties": False } ), # 2. PCAP Splitter Tool( name="wireshark_pcap_split", description="Split large PCAP files by packets, time, or size", inputSchema={ "type": "object", "properties": { "input_file": { "type": "string", "description": "Input PCAP file path" }, "split_by": { "type": "string", "enum": ["packets", "time", "size"], "description": "Split criteria", "default": "packets" }, "value": { "type": "integer", "description": "Split value (packets count, seconds, or MB)", "default": 1000 }, "output_prefix": { "type": "string", "description": "Output file prefix (optional)", "default": None } }, "required": ["input_file"], "additionalProperties": False } ), # 3. PCAP Merger Tool( name="wireshark_pcap_merge", description="Intelligently merge multiple PCAP files", inputSchema={ "type": "object", "properties": { "input_files": { "type": "array", "items": {"type": "string"}, "description": "List of PCAP files to merge" }, "output_file": { "type": "string", "description": "Output file path (optional)", "default": None }, "mode": { "type": "string", "enum": ["chronological", "append"], "description": "Merge mode", "default": "chronological" }, "remove_duplicates": { "type": "boolean", "description": "Remove duplicate packets", "default": False } }, "required": ["input_files"], "additionalProperties": False } ), # 4. Hex-to-PCAP Converter Tool( name="wireshark_hex_to_pcap", description="Convert hex dumps to analyzable PCAP format", inputSchema={ "type": "object", "properties": { "hex_input": { "type": "string", "description": "Hex dump string or file path" }, "output_file": { "type": "string", "description": "Output PCAP file path (optional)", "default": None }, "encapsulation": { "type": "string", "enum": ["ethernet", "ip", "udp", "tcp"], "description": "Encapsulation type", "default": "ethernet" }, "add_fake_headers": { "type": "boolean", "description": "Add fake headers for incomplete data", "default": True } }, "required": ["hex_input"], "additionalProperties": False } ), # 5. HTTP Deep Analyzer Tool( name="wireshark_http_analyze", description="Deep HTTP/HTTPS transaction analysis", inputSchema={ "type": "object", "properties": { "input_file": { "type": "string", "description": "Input PCAP file path" }, "analysis_type": { "type": "string", "enum": ["flows", "performance", "objects"], "description": "Type of HTTP analysis", "default": "flows" }, "include_bodies": { "type": "boolean", "description": "Include HTTP request/response bodies", "default": True }, "decode_gzip": { "type": "boolean", "description": "Decode gzip-compressed content", "default": True }, "output_dir": { "type": "string", "description": "Output directory for extracted objects", "default": None } }, "required": ["input_file"], "additionalProperties": False } ), # 6. DNS Query Analyzer Tool( name="wireshark_dns_analyze", description="DNS traffic intelligence and tunneling detection", inputSchema={ "type": "object", "properties": { "input_file": { "type": "string", "description": "Input PCAP file path" }, "analysis_type": { "type": "string", "enum": ["queries", "tunneling", "response"], "description": "Type of DNS analysis", "default": "queries" }, "group_by_domain": { "type": "boolean", "description": "Group results by domain", "default": True }, "entropy_threshold": { "type": "number", "description": "Entropy threshold for tunneling detection", "default": 3.5 } }, "required": ["input_file"], "additionalProperties": False } ), # 7. SSL/TLS Inspector Tool( name="wireshark_ssl_inspect", description="SSL/TLS certificate analysis and traffic decryption", inputSchema={ "type": "object", "properties": { "input_file": { "type": "string", "description": "Input PCAP file path" }, "analysis_type": { "type": "string", "enum": ["handshakes", "decrypt", "certificates"], "description": "Type of SSL/TLS analysis", "default": "handshakes" }, "keylog_file": { "type": "string", "description": "SSL keylog file for decryption", "default": None }, "rsa_key_file": { "type": "string", "description": "RSA private key file for decryption", "default": None } }, "required": ["input_file"], "additionalProperties": False } ), # 8. Latency Profiler Tool( name="wireshark_latency_profile", description="Network latency and performance profiling", inputSchema={ "type": "object", "properties": { "input_file": { "type": "string", "description": "Input PCAP file path" }, "analysis_type": { "type": "string", "enum": ["tcp", "application", "heatmap"], "description": "Type of latency analysis", "default": "tcp" }, "protocol": { "type": "string", "description": "Application protocol (for application analysis)", "default": "http" }, "percentiles": { "type": "array", "items": {"type": "integer"}, "description": "Percentiles to calculate", "default": [50, 90, 95, 99] }, "time_bucket_seconds": { "type": "integer", "description": "Time bucket size for heatmap", "default": 60 } }, "required": ["input_file"], "additionalProperties": False } ), # 9. Threat Detector Tool( name="wireshark_threat_detect", description="AI-powered network threat detection", inputSchema={ "type": "object", "properties": { "input_file": { "type": "string", "description": "Input PCAP file path" }, "threat_type": { "type": "string", "enum": ["port_scan", "ddos", "anomaly"], "description": "Type of threat to detect", "default": "port_scan" }, "threshold_ports": { "type": "integer", "description": "Port threshold for scan detection", "default": 10 }, "time_window": { "type": "integer", "description": "Time window in seconds", "default": 60 }, "model_path": { "type": "string", "description": "ML model path for anomaly detection", "default": None } }, "required": ["input_file"], "additionalProperties": False } ), # 10. Remote Capture Tool( name="wireshark_remote_capture", description="Distributed packet capture via SSH", inputSchema={ "type": "object", "properties": { "capture_mode": { "type": "string", "enum": ["single", "multi"], "description": "Single or multi-host capture", "default": "single" }, "host": { "type": "string", "description": "Remote host for single capture" }, "hosts": { "type": "array", "items": { "type": "object", "properties": { "host": {"type": "string"}, "username": {"type": "string"}, "interface": {"type": "string", "default": "any"} } }, "description": "Host list for multi capture" }, "username": { "type": "string", "description": "SSH username" }, "password": { "type": "string", "description": "SSH password (optional)", "default": None }, "key_file": { "type": "string", "description": "SSH key file path (optional)", "default": None }, "interface": { "type": "string", "description": "Network interface", "default": "any" }, "filter": { "type": "string", "description": "Capture filter", "default": "" }, "duration": { "type": "integer", "description": "Capture duration in seconds", "default": 60 }, "synchronized": { "type": "boolean", "description": "Synchronize multi-host capture", "default": True } }, "required": ["username"], "additionalProperties": False } ) ] async def handle_advanced_tool(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle advanced tool calls.""" # 1. PCAP Time Slicer if tool_name == "wireshark_pcap_time_slice": mode = arguments.get("mode", "time_range") if mode == "time_range": return await pcap_slicer.slice_by_time_range( arguments["input_file"], arguments["start_time"], arguments.get("end_time", arguments["start_time"]), arguments.get("output_file") ) elif mode == "duration": return await pcap_slicer.slice_by_duration( arguments["input_file"], arguments["start_time"], arguments.get("duration_seconds", 60), arguments.get("output_file") ) elif mode == "relative": return await pcap_slicer.slice_relative_time( arguments["input_file"], arguments.get("start_offset", 0), arguments.get("end_offset", 60), arguments.get("output_file") ) # 2. PCAP Splitter elif tool_name == "wireshark_pcap_split": split_by = arguments.get("split_by", "packets") value = arguments.get("value", 1000) if split_by == "packets": return await pcap_splitter.split_by_packets( arguments["input_file"], value, arguments.get("output_prefix") ) elif split_by == "time": return await pcap_splitter.split_by_time( arguments["input_file"], value, arguments.get("output_prefix") ) elif split_by == "size": return await pcap_splitter.split_by_size( arguments["input_file"], value, arguments.get("output_prefix") ) # 3. PCAP Merger elif tool_name == "wireshark_pcap_merge": mode = arguments.get("mode", "chronological") if mode == "chronological": return await pcap_merger.merge_chronological( arguments["input_files"], arguments.get("output_file"), arguments.get("remove_duplicates", False) ) else: # append mode return await pcap_merger.merge_append( arguments["input_files"], arguments.get("output_file") ) # 4. Hex-to-PCAP Converter elif tool_name == "wireshark_hex_to_pcap": return await hex_converter.convert_hex_dump( arguments["hex_input"], arguments.get("output_file"), arguments.get("encapsulation", "ethernet"), arguments.get("add_fake_headers", True) ) # 5. HTTP Deep Analyzer elif tool_name == "wireshark_http_analyze": analysis_type = arguments.get("analysis_type", "flows") if analysis_type == "flows": return await http_analyzer.extract_http_flows( arguments["input_file"], arguments.get("include_bodies", True), arguments.get("decode_gzip", True) ) elif analysis_type == "performance": return await http_analyzer.analyze_http_performance( arguments["input_file"] ) elif analysis_type == "objects": return await http_analyzer.extract_http_objects( arguments["input_file"], arguments.get("output_dir") ) # 6. DNS Query Analyzer elif tool_name == "wireshark_dns_analyze": analysis_type = arguments.get("analysis_type", "queries") if analysis_type == "queries": return await dns_analyzer.analyze_dns_queries( arguments["input_file"], arguments.get("group_by_domain", True) ) elif analysis_type == "tunneling": return await dns_analyzer.detect_dns_tunneling( arguments["input_file"], arguments.get("entropy_threshold", 3.5) ) elif analysis_type == "response": return await dns_analyzer.dns_response_analysis( arguments["input_file"] ) # 7. SSL/TLS Inspector elif tool_name == "wireshark_ssl_inspect": analysis_type = arguments.get("analysis_type", "handshakes") if analysis_type == "handshakes": return await ssl_inspector.analyze_ssl_handshakes( arguments["input_file"] ) elif analysis_type == "decrypt": return await ssl_inspector.decrypt_ssl_traffic( arguments["input_file"], arguments.get("keylog_file"), arguments.get("rsa_key_file") ) elif analysis_type == "certificates": return await ssl_inspector.ssl_certificate_analysis( arguments["input_file"] ) # 8. Latency Profiler elif tool_name == "wireshark_latency_profile": analysis_type = arguments.get("analysis_type", "tcp") if analysis_type == "tcp": return await latency_profiler.analyze_tcp_latency( arguments["input_file"], arguments.get("percentiles", [50, 90, 95, 99]) ) elif analysis_type == "application": return await latency_profiler.analyze_application_latency( arguments["input_file"], arguments.get("protocol", "http") ) elif analysis_type == "heatmap": return await latency_profiler.generate_latency_heatmap( arguments["input_file"], arguments.get("time_bucket_seconds", 60) ) # 9. Threat Detector elif tool_name == "wireshark_threat_detect": threat_type = arguments.get("threat_type", "port_scan") if threat_type == "port_scan": return await threat_detector.detect_port_scans( arguments["input_file"], arguments.get("threshold_ports", 10), arguments.get("time_window", 60) ) elif threat_type == "ddos": return await threat_detector.detect_ddos_patterns( arguments["input_file"] ) elif threat_type == "anomaly": return await threat_detector.ml_anomaly_detection( arguments["input_file"], arguments.get("model_path") ) # 10. Remote Capture elif tool_name == "wireshark_remote_capture": capture_mode = arguments.get("capture_mode", "single") if capture_mode == "single": return await remote_capture.capture_single_host( arguments.get("host", "localhost"), arguments["username"], arguments.get("password"), arguments.get("key_file"), arguments.get("interface", "any"), arguments.get("filter", ""), arguments.get("duration", 60) ) else: # multi mode return await remote_capture.capture_multi_host( arguments.get("hosts", []), arguments.get("synchronized", True) ) return {"status": "❌ Error", "error": f"Unknown tool: {tool_name}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PreistlyPython/wireshark-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server