Skip to main content
Glama
danohn

mcpcap

by danohn

analyze_tcp_retransmissions

Analyze TCP retransmission patterns in PCAP files to identify network performance issues and connection problems by calculating retransmission rates and highlighting threshold violations.

Instructions

Analyze TCP retransmission patterns.

Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Optional filter for server IP address threshold: Retransmission rate threshold (default: 2%)

Returns: A structured dictionary containing: - total_retransmissions: Total number of retransmissions - retransmission_rate: Overall retransmission rate - by_connection: Per-connection retransmission statistics - summary: Worst connections and threshold violations

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pcap_fileYes
server_ipNo
thresholdNo

Implementation Reference

  • Entry point handler for the 'analyze_tcp_retransmissions' tool. Sets analysis_type to 'retransmissions' and delegates to the generic analyze_packets method.
    def analyze_tcp_retransmissions(
        self,
        pcap_file: str,
        server_ip: Optional[str] = None,
        threshold: float = 0.02,
    ) -> dict[str, Any]:
        """
        Analyze TCP retransmission patterns.
    
        Args:
            pcap_file: HTTP URL or absolute local file path to PCAP file
            server_ip: Optional filter for server IP address
            threshold: Retransmission rate threshold (default: 2%)
    
        Returns:
            A structured dictionary containing:
            - total_retransmissions: Total number of retransmissions
            - retransmission_rate: Overall retransmission rate
            - by_connection: Per-connection retransmission statistics
            - summary: Worst connections and threshold violations
        """
        return self.analyze_packets(
            pcap_file,
            analysis_type="retransmissions",
            server_ip=server_ip,
            threshold=threshold,
        )
  • Registration of the analyze_tcp_retransmissions tool (and related TCP tools) in the MCP server when tcp module is loaded.
    self.mcp.tool(module.analyze_tcp_connections)
    self.mcp.tool(module.analyze_tcp_anomalies)
    self.mcp.tool(module.analyze_tcp_retransmissions)
    self.mcp.tool(module.analyze_traffic_flow)
  • Core helper function implementing the retransmission analysis logic, invoked when analysis_type='retransmissions'. Groups packets by connection, computes retransmission rates per connection and overall, identifies worst offenders.
    def _analyze_retrans(
        self, pcap_file: str, tcp_packets: list, all_packets: list
    ) -> dict[str, Any]:
        """Analyze TCP retransmissions."""
        threshold = self._analysis_kwargs.get("threshold", 0.02)
    
        # Group by connection
        connections = defaultdict(list)
        for pkt in tcp_packets:
            conn_key = self._get_connection_key(pkt)
            connections[conn_key].append(pkt)
    
        by_connection = []
        total_retrans = 0
        worst_rate = 0
        worst_conn = ""
    
        for conn_key, pkts in connections.items():
            src_ip, src_port, dst_ip, dst_port = conn_key
            conn_str = f"{src_ip}:{src_port} <-> {dst_ip}:{dst_port}"
    
            conn_info = self._analyze_single_connection(conn_key, pkts)
            retrans_count = conn_info["retransmissions"]
            total_retrans += retrans_count
    
            retrans_rate = retrans_count / len(pkts) if len(pkts) > 0 else 0
    
            by_connection.append(
                {
                    "connection": conn_str,
                    "retrans_count": retrans_count,
                    "total_packets": len(pkts),
                    "retrans_rate": retrans_rate,
                }
            )
    
            if retrans_rate > worst_rate:
                worst_rate = retrans_rate
                worst_conn = conn_str
    
        # Sort by retransmission rate
        by_connection.sort(key=lambda x: x["retrans_rate"], reverse=True)
    
        overall_rate = total_retrans / len(tcp_packets) if len(tcp_packets) > 0 else 0
        connections_above_threshold = sum(
            1 for c in by_connection if c["retrans_rate"] > threshold
        )
    
        return {
            "file": pcap_file,
            "analysis_timestamp": datetime.now().isoformat(),
            "total_packets": len(tcp_packets),
            "total_retransmissions": total_retrans,
            "retransmission_rate": overall_rate,
            "threshold": threshold,
            "exceeds_threshold": overall_rate > threshold,
            "by_connection": by_connection[:10],  # Top 10
            "summary": {
                "worst_connection": worst_conn,
                "worst_retrans_rate": worst_rate,
                "connections_above_threshold": connections_above_threshold,
            },
        }
  • Supporting helper that analyzes individual TCP connections, including simplified retransmission detection by tracking duplicate sequence numbers with payload.
    def _analyze_single_connection(
        self, conn_key: tuple, packets: list
    ) -> dict[str, Any]:
        """Analyze a single TCP connection."""
        src_ip, src_port, dst_ip, dst_port = conn_key
    
        syn_count = 0
        syn_ack_count = 0
        ack_count = 0
        rst_count = 0
        fin_count = 0
        data_packets = 0
        retransmissions = 0
    
        seen_seqs = set()
        handshake_completed = False
    
        for pkt in packets:
            tcp = pkt[TCP]
            flags = tcp.flags
    
            # Count flags
            if flags & 0x02:  # SYN
                syn_count += 1
            if flags & 0x12 == 0x12:  # SYN-ACK
                syn_ack_count += 1
            if flags & 0x10:  # ACK
                ack_count += 1
            if flags & 0x04:  # RST
                rst_count += 1
            if flags & 0x01:  # FIN
                fin_count += 1
    
            # Check for data
            if len(tcp.payload) > 0:
                data_packets += 1
    
            # Detect retransmissions (simplified)
            seq = tcp.seq
            if seq in seen_seqs and len(tcp.payload) > 0:
                retransmissions += 1
            seen_seqs.add(seq)
    
        # Determine handshake completion
        if syn_count > 0 and syn_ack_count > 0 and ack_count > 0:
            handshake_completed = True
    
        # Determine close reason
        close_reason = "unknown"
        if rst_count > 0:
            close_reason = "reset"
        elif fin_count >= 2:
            close_reason = "normal"
        elif len(packets) > 3:
            close_reason = "active"
    
        return {
            "client": f"{src_ip}:{src_port}",
            "server": f"{dst_ip}:{dst_port}",
            "state": "closed" if close_reason in ["reset", "normal"] else "active",
            "handshake_completed": handshake_completed,
            "syn_count": syn_count,
            "syn_ack_count": syn_ack_count,
            "ack_count": ack_count,
            "rst_count": rst_count,
            "fin_count": fin_count,
            "data_packets": data_packets,
            "retransmissions": retransmissions,
            "close_reason": close_reason,
            "packet_count": len(packets),
        }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danohn/mcpcap'

If you have feedback or need assistance with the MCP directory API, please join our Discord server