Skip to main content
Glama
danohn

mcpcap

by danohn

analyze_icmp_packets

Analyze ICMP packets in PCAP files to identify network issues and monitor connectivity. Use URL or local file paths for packet capture analysis.

Instructions

Analyze ICMP packets from a PCAP file and return comprehensive analysis results.

⚠️ FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through Claude's web interface. Files must be accessible via URL or local file path.

SUPPORTED INPUT FORMATS:

  • Remote files: "https://example.com/capture.pcap"

  • Local files: "/absolute/path/to/capture.pcap"

UNSUPPORTED:

  • Files uploaded through Claude's file upload feature

  • Base64 file content

  • Relative file paths

Args: pcap_file: HTTP URL or absolute local file path to PCAP file

Returns: A structured dictionary containing ICMP packet analysis results

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pcap_fileYes

Implementation Reference

  • Primary MCP tool handler function for analyzing ICMP packets. Delegates to base analyze_packets for file handling.
    def analyze_icmp_packets(self, pcap_file: str) -> dict[str, Any]:
        """
        Analyze ICMP packets from a PCAP file and return comprehensive analysis results.
    
        ⚠️  FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through
        Claude's web interface. Files must be accessible via URL or local file path.
    
        SUPPORTED INPUT FORMATS:
        - Remote files: "https://example.com/capture.pcap"
        - Local files: "/absolute/path/to/capture.pcap"
    
        UNSUPPORTED:
        - Files uploaded through Claude's file upload feature
        - Base64 file content
        - Relative file paths
    
        Args:
            pcap_file: HTTP URL or absolute local file path to PCAP file
    
        Returns:
            A structured dictionary containing ICMP packet analysis results
        """
        return self.analyze_packets(pcap_file)
  • Tool registration in MCPServer._register_tools method, specifically the line registering module.analyze_icmp_packets for the ICMP module.
    def _register_tools(self) -> None:
        """Register all available tools with the MCP server."""
        # Register tools for each loaded module
        for module_name, module in self.modules.items():
            if module_name == "dns":
                self.mcp.tool(module.analyze_dns_packets)
            elif module_name == "dhcp":
                self.mcp.tool(module.analyze_dhcp_packets)
            elif module_name == "icmp":
                self.mcp.tool(module.analyze_icmp_packets)
            elif module_name == "capinfos":
                self.mcp.tool(module.analyze_capinfos)
            elif module_name == "tcp":
                self.mcp.tool(module.analyze_tcp_connections)
                self.mcp.tool(module.analyze_tcp_anomalies)
                self.mcp.tool(module.analyze_tcp_retransmissions)
                self.mcp.tool(module.analyze_traffic_flow)
  • Core analysis logic: loads PCAP, filters ICMP packets, analyzes each, generates statistics.
    def _analyze_protocol_file(self, pcap_file: str) -> dict[str, Any]:
        """Perform the actual ICMP packet analysis on a local PCAP file."""
        try:
            packets = rdpcap(pcap_file)
            icmp_packets = [pkt for pkt in packets if pkt.haslayer(ICMP)]
    
            if not icmp_packets:
                return {
                    "file": pcap_file,
                    "total_packets": len(packets),
                    "icmp_packets_found": 0,
                    "message": "No ICMP packets found in this capture",
                }
    
            # Apply max_packets limit if specified
            packets_to_analyze = icmp_packets
            limited = False
            if self.config.max_packets and len(icmp_packets) > self.config.max_packets:
                packets_to_analyze = icmp_packets[: self.config.max_packets]
                limited = True
    
            packet_details = []
            for i, pkt in enumerate(packets_to_analyze, 1):
                packet_info = self._analyze_icmp_packet(pkt, i)
                packet_details.append(packet_info)
    
            # Generate statistics
            stats = self._generate_statistics(packet_details)
    
            result = {
                "file": pcap_file,
                "analysis_timestamp": datetime.now().isoformat(),
                "total_packets": len(packets),
                "icmp_packets_found": len(icmp_packets),
                "icmp_packets_analyzed": len(packet_details),
                "statistics": stats,
                "packets": packet_details,
            }
    
            if limited:
                result["note"] = (
                    f"Analysis limited to first {self.config.max_packets} ICMP packets due to --max-packets setting"
                )
    
            return result
    
        except Exception as e:
            return {
                "error": f"Error reading PCAP file '{pcap_file}': {str(e)}",
                "file": pcap_file,
            }
  • Base class method called by the handler to dispatch local/remote PCAP analysis.
    def analyze_packets(self, pcap_file: str) -> dict[str, Any]:
        """Analyze packets from a PCAP file (local or remote).
    
        Args:
            pcap_file: Path to local PCAP file or HTTP URL to remote PCAP file
    
        Returns:
            A structured dictionary containing packet analysis results
        """
        # Check if this is a remote URL or local file
        if pcap_file.startswith(("http://", "https://")):
            return self._handle_remote_analysis(pcap_file)
        else:
            return self._handle_local_analysis(pcap_file)
  • Per-packet analysis extracting IP/ICMP details, type/code mappings, etc.
    def _analyze_icmp_packet(self, packet, packet_num: int) -> dict[str, Any]:
        """Analyze a single ICMP packet and extract relevant information."""
        info = {
            "packet_number": packet_num,
            "timestamp": datetime.fromtimestamp(float(packet.time)).isoformat(),
        }
    
        # Basic IP information
        if packet.haslayer(IP):
            ip_layer = packet[IP]
            info.update(
                {
                    "src_ip": ip_layer.src,
                    "dst_ip": ip_layer.dst,
                    "ip_version": 4,
                    "ttl": ip_layer.ttl,
                    "packet_size": len(packet),
                }
            )
        elif packet.haslayer(IPv6):
            ipv6_layer = packet[IPv6]
            info.update(
                {
                    "src_ip": ipv6_layer.src,
                    "dst_ip": ipv6_layer.dst,
                    "ip_version": 6,
                    "hop_limit": ipv6_layer.hlim,
                    "packet_size": len(packet),
                }
            )
    
        # ICMP information
        if packet.haslayer(ICMP):
            icmp_layer = packet[ICMP]
    
            # Map ICMP types to human-readable names
            icmp_types = {
                0: "Echo Reply",
                3: "Destination Unreachable",
                4: "Source Quench",
                5: "Redirect",
                8: "Echo Request",
                11: "Time Exceeded",
                12: "Parameter Problem",
                13: "Timestamp Request",
                14: "Timestamp Reply",
                15: "Information Request",
                16: "Information Reply",
            }
    
            # Map destination unreachable codes
            dest_unreach_codes = {
                0: "Network Unreachable",
                1: "Host Unreachable",
                2: "Protocol Unreachable",
                3: "Port Unreachable",
                4: "Fragmentation Required",
                5: "Source Route Failed",
            }
    
            # Map time exceeded codes
            time_exceeded_codes = {
                0: "TTL Exceeded in Transit",
                1: "Fragment Reassembly Time Exceeded",
            }
    
            icmp_type = icmp_layer.type
            icmp_code = icmp_layer.code
    
            info.update(
                {
                    "icmp_type": icmp_type,
                    "icmp_code": icmp_code,
                    "icmp_type_name": icmp_types.get(
                        icmp_type, f"Unknown Type ({icmp_type})"
                    ),
                    "icmp_id": getattr(icmp_layer, "id", None),
                    "icmp_seq": getattr(icmp_layer, "seq", None),
                    "checksum": icmp_layer.chksum,
                }
            )
    
            # Add code descriptions for specific types
            if icmp_type == 3:  # Destination Unreachable
                info["icmp_code_name"] = dest_unreach_codes.get(
                    icmp_code, f"Unknown Code ({icmp_code})"
                )
            elif icmp_type == 11:  # Time Exceeded
                info["icmp_code_name"] = time_exceeded_codes.get(
                    icmp_code, f"Unknown Code ({icmp_code})"
                )
            else:
                info["icmp_code_name"] = f"Code {icmp_code}"
    
        return info

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danohn/mcpcap'

If you have feedback or need assistance with the MCP directory API, please join our Discord server