Skip to main content
Glama
danohn

mcpacket

by danohn

analyze_traffic_flow

Analyzes bidirectional network traffic from PCAP files to identify traffic direction, asymmetry, RST sources, and data transfer patterns for network security and troubleshooting.

Instructions

Analyze bidirectional traffic flow characteristics.

Identifies traffic direction, asymmetry, RST sources, and data transfer patterns.

Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Server IP address (required) server_port: Optional filter for server port

Returns: A structured dictionary containing: - client_to_server: Client-to-server traffic statistics - server_to_client: Server-to-client traffic statistics - analysis: Asymmetry analysis and interpretations

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pcap_fileYes
server_ipYes
server_portNo

Implementation Reference

  • The main handler function for the 'analyze_traffic_flow' tool. It accepts pcap_file, server_ip, and optional server_port, then delegates to analyze_packets with analysis_type='traffic_flow' to perform the analysis.
    def analyze_traffic_flow(
        self,
        pcap_file: str,
        server_ip: str,
        server_port: Optional[int] = None,
    ) -> dict[str, Any]:
        """
        Analyze bidirectional traffic flow characteristics.
    
        Identifies traffic direction, asymmetry, RST sources, and data transfer patterns.
    
        Args:
            pcap_file: HTTP URL or absolute local file path to PCAP file
            server_ip: Server IP address (required)
            server_port: Optional filter for server port
    
        Returns:
            A structured dictionary containing:
            - client_to_server: Client-to-server traffic statistics
            - server_to_client: Server-to-client traffic statistics
            - analysis: Asymmetry analysis and interpretations
        """
        return self.analyze_packets(
            pcap_file,
            analysis_type="traffic_flow",
            server_ip=server_ip,
            server_port=server_port,
        )
  • Registration of the 'analyze_traffic_flow' tool in the MCP server within the TCP module registration block.
    self.mcp.tool(module.analyze_tcp_retransmissions)
    self.mcp.tool(module.analyze_traffic_flow)
  • Core helper function implementing the traffic flow analysis logic, calculating bidirectional statistics, asymmetry, and RST source analysis.
    def _analyze_flow(
        self, pcap_file: str, tcp_packets: list, all_packets: list
    ) -> dict[str, Any]:
        """Analyze traffic flow."""
        server_ip = self._analysis_kwargs.get("server_ip")
        server_port = self._analysis_kwargs.get("server_port")
    
        if not server_ip:
            return {"error": "server_ip is required for traffic flow analysis"}
    
        client_to_server = {
            "packet_count": 0,
            "byte_count": 0,
            "syn_count": 0,
            "rst_count": 0,
            "fin_count": 0,
            "data_packets": 0,
            "retransmissions": 0,
        }
    
        server_to_client = {
            "packet_count": 0,
            "byte_count": 0,
            "syn_count": 0,
            "rst_count": 0,
            "fin_count": 0,
            "data_packets": 0,
            "retransmissions": 0,
        }
    
        client_seqs = set()
        server_seqs = set()
    
        for pkt in tcp_packets:
            src_ip, dst_ip = self._extract_ips(pkt)
            tcp = pkt[TCP]
            flags = tcp.flags
    
            # Determine direction
            is_client_to_server = dst_ip == server_ip
            if server_port:
                is_client_to_server = tcp.dport == server_port
    
            stats = client_to_server if is_client_to_server else server_to_client
            seqs = client_seqs if is_client_to_server else server_seqs
    
            stats["packet_count"] += 1
            stats["byte_count"] += len(pkt)
    
            if flags & 0x02:
                stats["syn_count"] += 1
            if flags & 0x04:
                stats["rst_count"] += 1
            if flags & 0x01:
                stats["fin_count"] += 1
            if len(tcp.payload) > 0:
                stats["data_packets"] += 1
    
            # Retransmissions
            seq = tcp.seq
            if seq in seqs and len(tcp.payload) > 0:
                stats["retransmissions"] += 1
            seqs.add(seq)
    
        # Analysis
        total_client = client_to_server["packet_count"]
        total_server = server_to_client["packet_count"]
        asymmetry_ratio = total_client / total_server if total_server > 0 else 0
    
        # Determine primary RST source
        client_rst = client_to_server["rst_count"]
        server_rst = server_to_client["rst_count"]
        if client_rst > server_rst:
            rst_source = "client"
            interpretation = f"Client sends all RST packets ({client_rst} vs {server_rst}). Server responds normally. Suggests client-side issue (possibly firewall)."
        elif server_rst > client_rst:
            rst_source = "server"
            interpretation = "Server sends more RST packets. Suggests server-side rejection or service issue."
        else:
            rst_source = "balanced"
            interpretation = "Balanced RST distribution."
    
        return {
            "file": pcap_file,
            "analysis_timestamp": datetime.now().isoformat(),
            "server": f"{server_ip}:{server_port or 'any'}",
            "client_to_server": client_to_server,
            "server_to_client": server_to_client,
            "analysis": {
                "asymmetry_ratio": asymmetry_ratio,
                "primary_rst_source": rst_source,
                "data_flow_direction": "client_heavy"
                if asymmetry_ratio > 1.2
                else "server_heavy"
                if asymmetry_ratio < 0.8
                else "balanced",
                "interpretation": interpretation,
            },
        }
  • Helper function that reads the PCAP file, filters TCP packets, and routes 'traffic_flow' analysis to the _analyze_flow method.
    def _analyze_protocol_file(self, pcap_file: str) -> dict[str, Any]:
        """Perform the actual TCP packet analysis on a local PCAP file."""
        try:
            packets = rdpcap(pcap_file)
            tcp_packets = [pkt for pkt in packets if pkt.haslayer(TCP)]
    
            if not tcp_packets:
                return {
                    "file": pcap_file,
                    "total_packets": len(packets),
                    "tcp_packets_found": 0,
                    "message": "No TCP packets found in this capture",
                }
    
            # Apply filtering if specified
            filtered_packets = self._apply_filters(
                tcp_packets,
                self._analysis_kwargs.get("server_ip"),
                self._analysis_kwargs.get("server_port"),
            )
    
            # Route to appropriate analysis method
            if self._analysis_type == "connections":
                return self._analyze_connections(pcap_file, filtered_packets, packets)
            elif self._analysis_type == "anomalies":
                return self._analyze_anomalies(pcap_file, filtered_packets, packets)
            elif self._analysis_type == "retransmissions":
                return self._analyze_retrans(pcap_file, filtered_packets, packets)
            elif self._analysis_type == "traffic_flow":
                return self._analyze_flow(pcap_file, filtered_packets, packets)
            else:
                return {"error": f"Unknown analysis type: {self._analysis_type}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danohn/mcpacket'

If you have feedback or need assistance with the MCP directory API, please join our Discord server