Skip to main content
Glama
PreistlyPython

Wireshark MCP Server

async_long_capture.py11.8 kB
#!/usr/bin/env python3 """ Async Long-Duration Packet Capture for Wireshark MCP ==================================================== This script enables 5+ minute packet captures that run in the background and can be analyzed with MCP tools after completion. """ import asyncio import os import json import tempfile import time from typing import Dict, Any, Optional from pathlib import Path class AsyncPacketCapture: """Async packet capture manager for long-duration captures""" def __init__(self): self.capture_dir = Path("/tmp/mcp_captures") self.capture_dir.mkdir(exist_ok=True) self.active_captures = {} async def start_long_capture( self, interface: str = "lo", duration: int = 300, # 5 minutes default filter_expr: str = "", max_packets: int = 10000, capture_name: Optional[str] = None ) -> Dict[str, Any]: """Start a long-duration background capture""" if not capture_name: capture_name = f"capture_{int(time.time())}" pcap_file = self.capture_dir / f"{capture_name}.pcap" status_file = self.capture_dir / f"{capture_name}_status.json" # Build tcpdump command (uses capabilities, not sudo) cmd = [ 'timeout', str(duration), 'tcpdump', '-i', interface, '-w', str(pcap_file), '-c', str(max_packets), '-q' # Quiet mode ] if filter_expr: cmd.append(filter_expr) try: # Start background process process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Store capture info capture_info = { "capture_name": capture_name, "interface": interface, "duration": duration, "filter": filter_expr, "max_packets": max_packets, "pcap_file": str(pcap_file), "status_file": str(status_file), "pid": process.pid, "start_time": time.time(), "status": "running" } self.active_captures[capture_name] = { "process": process, "info": capture_info } # Save status to file with open(status_file, 'w') as f: json.dump(capture_info, f, indent=2) # Start monitoring task asyncio.create_task(self._monitor_capture(capture_name)) return { "status": "✅ Started", "capture_name": capture_name, "interface": interface, "duration": f"{duration} seconds", "filter": filter_expr, "max_packets": max_packets, "output_file": str(pcap_file), "status_file": str(status_file), "pid": process.pid, "note": "Capture running in background. Use check_capture() to monitor." } except Exception as e: return { "status": "❌ Failed", "error": str(e), "note": "Check that tcpdump has proper capabilities: cap_net_raw,cap_net_admin" } async def _monitor_capture(self, capture_name: str): """Monitor a running capture and update status""" if capture_name not in self.active_captures: return capture = self.active_captures[capture_name] process = capture["process"] info = capture["info"] try: # Wait for completion stdout, stderr = await process.communicate() # Update final status info["status"] = "completed" if process.returncode in [0, 124] else "failed" info["end_time"] = time.time() info["return_code"] = process.returncode info["stderr"] = stderr.decode() if stderr else "" # Get file size if successful pcap_file = Path(info["pcap_file"]) if pcap_file.exists(): info["file_size"] = pcap_file.stat().st_size info["packets_captured"] = await self._count_packets(pcap_file) # Update status file with open(info["status_file"], 'w') as f: json.dump(info, f, indent=2) except Exception as e: info["status"] = "error" info["error"] = str(e) info["end_time"] = time.time() finally: # Remove from active captures if capture_name in self.active_captures: del self.active_captures[capture_name] async def _count_packets(self, pcap_file: Path) -> int: """Count packets in PCAP file""" try: proc = await asyncio.create_subprocess_exec( 'tshark', '-r', str(pcap_file), '-q', '-z', 'io,stat,0', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _ = await proc.communicate() if proc.returncode == 0: # Parse packet count from tshark output for line in stdout.decode().split('\n'): if 'frames' in line.lower(): parts = line.split('|') if len(parts) > 1: try: return int(parts[1].strip().split()[0]) except (ValueError, IndexError): pass except Exception: pass return 0 def check_capture(self, capture_name: str) -> Dict[str, Any]: """Check status of a running or completed capture""" status_file = self.capture_dir / f"{capture_name}_status.json" if not status_file.exists(): return { "status": "❌ Not Found", "capture_name": capture_name, "note": "Capture not found or never started" } try: with open(status_file, 'r') as f: info = json.load(f) # Add runtime info if info["status"] == "running": runtime = time.time() - info["start_time"] info["runtime_seconds"] = runtime info["runtime_formatted"] = f"{runtime:.1f}s" # Check if process is still alive if capture_name in self.active_captures: info["process_alive"] = True else: info["process_alive"] = False info["note"] = "Process may have completed, check file" return info except Exception as e: return { "status": "❌ Error", "error": str(e), "capture_name": capture_name } def list_captures(self) -> Dict[str, Any]: """List all captures (active and completed)""" captures = [] # Check for status files for status_file in self.capture_dir.glob("*_status.json"): try: with open(status_file, 'r') as f: info = json.load(f) captures.append({ "name": info["capture_name"], "status": info["status"], "interface": info["interface"], "duration": f"{info['duration']}s", "file_size": info.get("file_size", "unknown"), "packets": info.get("packets_captured", "unknown") }) except Exception as e: captures.append({ "name": status_file.stem.replace("_status", ""), "status": "error", "error": str(e) }) return { "active_captures": len(self.active_captures), "total_captures": len(captures), "captures": captures, "capture_directory": str(self.capture_dir) } async def stop_capture(self, capture_name: str) -> Dict[str, Any]: """Stop a running capture""" if capture_name not in self.active_captures: return { "status": "❌ Not Found", "capture_name": capture_name, "note": "Capture not found or already stopped" } try: capture = self.active_captures[capture_name] process = capture["process"] # Terminate process gracefully process.terminate() # Wait for completion try: await asyncio.wait_for(process.wait(), timeout=5.0) except asyncio.TimeoutError: # Force kill if needed process.kill() await process.wait() return { "status": "✅ Stopped", "capture_name": capture_name, "note": "Capture stopped successfully" } except Exception as e: return { "status": "❌ Error", "error": str(e), "capture_name": capture_name } # Global capture manager capture_manager = AsyncPacketCapture() async def start_5_minute_capture( interface: str = "lo", filter_ports: str = "port 3000 or port 8080 or port 7444" ) -> Dict[str, Any]: """Start a 5-minute capture on specified ports""" return await capture_manager.start_long_capture( interface=interface, duration=300, # 5 minutes filter_expr=filter_ports, max_packets=50000, # Allow more packets for long capture capture_name="five_minute_capture" ) async def start_background_capture( duration_minutes: int = 5, interface: str = "lo", filter_expr: str = "" ) -> Dict[str, Any]: """Start a background capture with custom parameters""" return await capture_manager.start_long_capture( interface=interface, duration=duration_minutes * 60, filter_expr=filter_expr, max_packets=duration_minutes * 10000, # Scale packets with duration capture_name=f"background_{duration_minutes}min" ) # Test function async def test_async_capture(): """Test the async capture functionality""" print("🧪 Testing Async Long Capture") print("=" * 40) # Start a short test capture (30 seconds) result = await capture_manager.start_long_capture( interface="lo", duration=30, filter_expr="port 3000 or port 8080 or port 7444", max_packets=100, capture_name="test_capture" ) print(json.dumps(result, indent=2)) if result["status"] == "✅ Started": print("\n⏳ Waiting 5 seconds then checking status...") await asyncio.sleep(5) status = capture_manager.check_capture("test_capture") print(json.dumps(status, indent=2)) print("\n📋 Listing all captures...") captures = capture_manager.list_captures() print(json.dumps(captures, indent=2)) if __name__ == "__main__": asyncio.run(test_async_capture())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PreistlyPython/wireshark-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server