Skip to main content
Glama
danohn

mcpacket

by danohn
base.py4.16 kB
"""Base module interface for protocol analyzers.""" import os import tempfile from abc import ABC, abstractmethod from typing import Any from ..core.config import Config class BaseModule(ABC): """Base class for protocol analysis modules.""" def __init__(self, config: Config): """Initialize the module. Args: config: Configuration instance """ self.config = config @property @abstractmethod def protocol_name(self) -> str: """Return the name of the protocol this module analyzes.""" pass @abstractmethod def _analyze_protocol_file(self, pcap_file: str) -> dict[str, Any]: """Analyze a local PCAP file for this protocol. This method should be implemented by each module to perform the actual protocol-specific analysis. Args: pcap_file: Path to local PCAP file Returns: Analysis results dictionary """ pass def analyze_packets(self, pcap_file: str) -> dict[str, Any]: """Analyze packets from a PCAP file (local or remote). Args: pcap_file: Path to local PCAP file or HTTP URL to remote PCAP file Returns: A structured dictionary containing packet analysis results """ # Check if this is a remote URL or local file if pcap_file.startswith(("http://", "https://")): return self._handle_remote_analysis(pcap_file) else: return self._handle_local_analysis(pcap_file) def _handle_remote_analysis(self, pcap_url: str) -> dict[str, Any]: """Handle remote PCAP file analysis.""" try: # Download remote file to temporary location with tempfile.NamedTemporaryFile(suffix=".pcap", delete=False) as tmp_file: temp_path = tmp_file.name local_path = self._download_pcap_file(pcap_url, temp_path) result = self._analyze_protocol_file(local_path) # Clean up temporary file try: os.unlink(local_path) except OSError: pass # Ignore cleanup errors return result except Exception as e: return { "error": f"Failed to download PCAP file '{pcap_url}': {str(e)}", "pcap_url": pcap_url, } def _handle_local_analysis(self, pcap_file: str) -> dict[str, Any]: """Handle local PCAP file analysis.""" # Validate file exists if not os.path.exists(pcap_file): return { "error": f"PCAP file not found: {pcap_file}", "pcap_file": pcap_file, } # Validate file extension if not pcap_file.lower().endswith((".pcap", ".pcapng", ".cap")): return { "error": f"File '{pcap_file}' is not a supported PCAP file (.pcap/.pcapng/.cap)", "pcap_file": pcap_file, } try: return self._analyze_protocol_file(pcap_file) except Exception as e: return { "error": f"Failed to analyze PCAP file '{pcap_file}': {str(e)}", "pcap_file": pcap_file, } def _download_pcap_file(self, pcap_url: str, local_path: str) -> str: """Download a remote PCAP file to local storage. Args: pcap_url: URL of the PCAP file to download local_path: Local path to save the file Returns: Local path to the downloaded file """ import requests try: response = requests.get(pcap_url, timeout=60, stream=True) response.raise_for_status() os.makedirs(os.path.dirname(local_path), exist_ok=True) with open(local_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return local_path except requests.RequestException as e: raise ValueError( f"Failed to download PCAP file '{pcap_url}': {str(e)}" ) from e

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danohn/mcpacket'

If you have feedback or need assistance with the MCP directory API, please join our Discord server