Skip to main content
Glama
danohn

mcpacket

by danohn

analyze_icmp_packets

Analyze ICMP packets in PCAP files to identify network issues and security anomalies. Process files via URL or local path for comprehensive traffic analysis.

Instructions

Analyze ICMP packets from a PCAP file and return comprehensive analysis results.

⚠️ FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through Claude's web interface. Files must be accessible via URL or local file path.

SUPPORTED INPUT FORMATS:

  • Remote files: "https://example.com/capture.pcap"

  • Local files: "/absolute/path/to/capture.pcap"

UNSUPPORTED:

  • Files uploaded through Claude's file upload feature

  • Base64 file content

  • Relative file paths

Args: pcap_file: HTTP URL or absolute local file path to PCAP file

Returns: A structured dictionary containing ICMP packet analysis results

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pcap_fileYes

Implementation Reference

  • The primary handler function for the analyze_icmp_packets tool. Includes detailed documentation on input formats and delegates to the base class's analyze_packets method for execution.
    def analyze_icmp_packets(self, pcap_file: str) -> dict[str, Any]:
        """
        Analyze ICMP packets from a PCAP file and return comprehensive analysis results.
    
        ⚠️  FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through
        Claude's web interface. Files must be accessible via URL or local file path.
    
        SUPPORTED INPUT FORMATS:
        - Remote files: "https://example.com/capture.pcap"
        - Local files: "/absolute/path/to/capture.pcap"
    
        UNSUPPORTED:
        - Files uploaded through Claude's file upload feature
        - Base64 file content
        - Relative file paths
    
        Args:
            pcap_file: HTTP URL or absolute local file path to PCAP file
    
        Returns:
            A structured dictionary containing ICMP packet analysis results
        """
        return self.analyze_packets(pcap_file)
  • Registers the analyze_icmp_packets tool from the ICMPModule instance with the FastMCP server.
    elif module_name == "icmp":
        self.mcp.tool(module.analyze_icmp_packets)
  • Core analysis logic: reads PCAP file using scapy.rdpcap, filters ICMP packets, applies limits, analyzes each packet, generates statistics, and structures the output.
    def _analyze_protocol_file(self, pcap_file: str) -> dict[str, Any]:
        """Perform the actual ICMP packet analysis on a local PCAP file."""
        try:
            packets = rdpcap(pcap_file)
            icmp_packets = [pkt for pkt in packets if pkt.haslayer(ICMP)]
    
            if not icmp_packets:
                return {
                    "file": pcap_file,
                    "total_packets": len(packets),
                    "icmp_packets_found": 0,
                    "message": "No ICMP packets found in this capture",
                }
    
            # Apply max_packets limit if specified
            packets_to_analyze = icmp_packets
            limited = False
            if self.config.max_packets and len(icmp_packets) > self.config.max_packets:
                packets_to_analyze = icmp_packets[: self.config.max_packets]
                limited = True
    
            packet_details = []
            for i, pkt in enumerate(packets_to_analyze, 1):
                packet_info = self._analyze_icmp_packet(pkt, i)
                packet_details.append(packet_info)
    
            # Generate statistics
            stats = self._generate_statistics(packet_details)
    
            result = {
                "file": pcap_file,
                "analysis_timestamp": datetime.now().isoformat(),
                "total_packets": len(packets),
                "icmp_packets_found": len(icmp_packets),
                "icmp_packets_analyzed": len(packet_details),
                "statistics": stats,
                "packets": packet_details,
            }
    
            if limited:
                result["note"] = (
                    f"Analysis limited to first {self.config.max_packets} ICMP packets due to --max-packets setting"
                )
    
            return result
    
        except Exception as e:
            return {
                "error": f"Error reading PCAP file '{pcap_file}': {str(e)}",
                "file": pcap_file,
            }
  • Detailed single ICMP packet analyzer: extracts timestamps, IP layer info (IPv4/IPv6), ICMP type/code with human-readable names, and specific code mappings.
    def _analyze_icmp_packet(self, packet, packet_num: int) -> dict[str, Any]:
        """Analyze a single ICMP packet and extract relevant information."""
        info = {
            "packet_number": packet_num,
            "timestamp": datetime.fromtimestamp(float(packet.time)).isoformat(),
        }
    
        # Basic IP information
        if packet.haslayer(IP):
            ip_layer = packet[IP]
            info.update(
                {
                    "src_ip": ip_layer.src,
                    "dst_ip": ip_layer.dst,
                    "ip_version": 4,
                    "ttl": ip_layer.ttl,
                    "packet_size": len(packet),
                }
            )
        elif packet.haslayer(IPv6):
            ipv6_layer = packet[IPv6]
            info.update(
                {
                    "src_ip": ipv6_layer.src,
                    "dst_ip": ipv6_layer.dst,
                    "ip_version": 6,
                    "hop_limit": ipv6_layer.hlim,
                    "packet_size": len(packet),
                }
            )
    
        # ICMP information
        if packet.haslayer(ICMP):
            icmp_layer = packet[ICMP]
    
            # Map ICMP types to human-readable names
            icmp_types = {
                0: "Echo Reply",
                3: "Destination Unreachable",
                4: "Source Quench",
                5: "Redirect",
                8: "Echo Request",
                11: "Time Exceeded",
                12: "Parameter Problem",
                13: "Timestamp Request",
                14: "Timestamp Reply",
                15: "Information Request",
                16: "Information Reply",
            }
    
            # Map destination unreachable codes
            dest_unreach_codes = {
                0: "Network Unreachable",
                1: "Host Unreachable",
                2: "Protocol Unreachable",
                3: "Port Unreachable",
                4: "Fragmentation Required",
                5: "Source Route Failed",
            }
    
            # Map time exceeded codes
            time_exceeded_codes = {
                0: "TTL Exceeded in Transit",
                1: "Fragment Reassembly Time Exceeded",
            }
    
            icmp_type = icmp_layer.type
            icmp_code = icmp_layer.code
    
            info.update(
                {
                    "icmp_type": icmp_type,
                    "icmp_code": icmp_code,
                    "icmp_type_name": icmp_types.get(
                        icmp_type, f"Unknown Type ({icmp_type})"
                    ),
                    "icmp_id": getattr(icmp_layer, "id", None),
                    "icmp_seq": getattr(icmp_layer, "seq", None),
                    "checksum": icmp_layer.chksum,
                }
            )
    
            # Add code descriptions for specific types
            if icmp_type == 3:  # Destination Unreachable
                info["icmp_code_name"] = dest_unreach_codes.get(
                    icmp_code, f"Unknown Code ({icmp_code})"
                )
            elif icmp_type == 11:  # Time Exceeded
                info["icmp_code_name"] = time_exceeded_codes.get(
                    icmp_code, f"Unknown Code ({icmp_code})"
                )
            else:
                info["icmp_code_name"] = f"Code {icmp_code}"
    
        return info
  • Generates comprehensive statistics including ICMP type counts, unique IPs, echo request/reply pairs, and unreachable destinations.
    def _generate_statistics(self, packets: list[dict[str, Any]]) -> dict[str, Any]:
        """Generate statistics from analyzed ICMP packets."""
        stats = {
            "icmp_types": {},
            "unique_sources": set(),
            "unique_destinations": set(),
            "echo_pairs": {},
            "unreachable_destinations": set(),
        }
    
        for pkt in packets:
            # Count ICMP types
            if "icmp_type_name" in pkt:
                type_name = pkt["icmp_type_name"]
                stats["icmp_types"][type_name] = (
                    stats["icmp_types"].get(type_name, 0) + 1
                )
    
            # Track unique IPs
            if "src_ip" in pkt:
                stats["unique_sources"].add(pkt["src_ip"])
            if "dst_ip" in pkt:
                stats["unique_destinations"].add(pkt["dst_ip"])
    
            # Track echo request/reply pairs
            if pkt.get("icmp_type") in [0, 8] and pkt.get("icmp_id") is not None:
                echo_id = pkt["icmp_id"]
                if echo_id not in stats["echo_pairs"]:
                    stats["echo_pairs"][echo_id] = {"requests": 0, "replies": 0}
    
                if pkt["icmp_type"] == 8:  # Echo Request
                    stats["echo_pairs"][echo_id]["requests"] += 1
                elif pkt["icmp_type"] == 0:  # Echo Reply
                    stats["echo_pairs"][echo_id]["replies"] += 1
    
            # Track unreachable destinations
            if pkt.get("icmp_type") == 3:  # Destination Unreachable
                if "dst_ip" in pkt:
                    stats["unreachable_destinations"].add(pkt["dst_ip"])
    
        # Convert sets to lists for JSON serialization
        return {
            "icmp_type_counts": stats["icmp_types"],
            "unique_sources_count": len(stats["unique_sources"]),
            "unique_destinations_count": len(stats["unique_destinations"]),
            "unique_sources": list(stats["unique_sources"]),
            "unique_destinations": list(stats["unique_destinations"]),
            "echo_sessions": len(stats["echo_pairs"]),
            "echo_pairs": stats["echo_pairs"],
            "unreachable_destinations_count": len(stats["unreachable_destinations"]),
            "unreachable_destinations": list(stats["unreachable_destinations"]),
        }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danohn/mcpacket'

If you have feedback or need assistance with the MCP directory API, please join our Discord server