Skip to main content
Glama
danohn
by danohn

analyze_traffic_flow

Analyzes bidirectional network traffic from PCAP files to identify traffic direction, asymmetry, RST sources, and data transfer patterns for network security and troubleshooting.

Instructions

Analyze bidirectional traffic flow characteristics.

Identifies traffic direction, asymmetry, RST sources, and data transfer patterns.

Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Server IP address (required) server_port: Optional filter for server port

Returns: A structured dictionary containing: - client_to_server: Client-to-server traffic statistics - server_to_client: Server-to-client traffic statistics - analysis: Asymmetry analysis and interpretations

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pcap_fileYes
server_ipYes
server_portNo

Implementation Reference

  • The main handler function for the 'analyze_traffic_flow' tool. It accepts pcap_file, server_ip, and optional server_port, then delegates to analyze_packets with analysis_type='traffic_flow' to perform the analysis.
    def analyze_traffic_flow( self, pcap_file: str, server_ip: str, server_port: Optional[int] = None, ) -> dict[str, Any]: """ Analyze bidirectional traffic flow characteristics. Identifies traffic direction, asymmetry, RST sources, and data transfer patterns. Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Server IP address (required) server_port: Optional filter for server port Returns: A structured dictionary containing: - client_to_server: Client-to-server traffic statistics - server_to_client: Server-to-client traffic statistics - analysis: Asymmetry analysis and interpretations """ return self.analyze_packets( pcap_file, analysis_type="traffic_flow", server_ip=server_ip, server_port=server_port, )
  • Registration of the 'analyze_traffic_flow' tool in the MCP server within the TCP module registration block.
    self.mcp.tool(module.analyze_tcp_retransmissions) self.mcp.tool(module.analyze_traffic_flow)
  • Core helper function implementing the traffic flow analysis logic, calculating bidirectional statistics, asymmetry, and RST source analysis.
    def _analyze_flow( self, pcap_file: str, tcp_packets: list, all_packets: list ) -> dict[str, Any]: """Analyze traffic flow.""" server_ip = self._analysis_kwargs.get("server_ip") server_port = self._analysis_kwargs.get("server_port") if not server_ip: return {"error": "server_ip is required for traffic flow analysis"} client_to_server = { "packet_count": 0, "byte_count": 0, "syn_count": 0, "rst_count": 0, "fin_count": 0, "data_packets": 0, "retransmissions": 0, } server_to_client = { "packet_count": 0, "byte_count": 0, "syn_count": 0, "rst_count": 0, "fin_count": 0, "data_packets": 0, "retransmissions": 0, } client_seqs = set() server_seqs = set() for pkt in tcp_packets: src_ip, dst_ip = self._extract_ips(pkt) tcp = pkt[TCP] flags = tcp.flags # Determine direction is_client_to_server = dst_ip == server_ip if server_port: is_client_to_server = tcp.dport == server_port stats = client_to_server if is_client_to_server else server_to_client seqs = client_seqs if is_client_to_server else server_seqs stats["packet_count"] += 1 stats["byte_count"] += len(pkt) if flags & 0x02: stats["syn_count"] += 1 if flags & 0x04: stats["rst_count"] += 1 if flags & 0x01: stats["fin_count"] += 1 if len(tcp.payload) > 0: stats["data_packets"] += 1 # Retransmissions seq = tcp.seq if seq in seqs and len(tcp.payload) > 0: stats["retransmissions"] += 1 seqs.add(seq) # Analysis total_client = client_to_server["packet_count"] total_server = server_to_client["packet_count"] asymmetry_ratio = total_client / total_server if total_server > 0 else 0 # Determine primary RST source client_rst = client_to_server["rst_count"] server_rst = server_to_client["rst_count"] if client_rst > server_rst: rst_source = "client" interpretation = f"Client sends all RST packets ({client_rst} vs {server_rst}). Server responds normally. Suggests client-side issue (possibly firewall)." elif server_rst > client_rst: rst_source = "server" interpretation = "Server sends more RST packets. Suggests server-side rejection or service issue." else: rst_source = "balanced" interpretation = "Balanced RST distribution." return { "file": pcap_file, "analysis_timestamp": datetime.now().isoformat(), "server": f"{server_ip}:{server_port or 'any'}", "client_to_server": client_to_server, "server_to_client": server_to_client, "analysis": { "asymmetry_ratio": asymmetry_ratio, "primary_rst_source": rst_source, "data_flow_direction": "client_heavy" if asymmetry_ratio > 1.2 else "server_heavy" if asymmetry_ratio < 0.8 else "balanced", "interpretation": interpretation, }, }
  • Helper function that reads the PCAP file, filters TCP packets, and routes 'traffic_flow' analysis to the _analyze_flow method.
    def _analyze_protocol_file(self, pcap_file: str) -> dict[str, Any]: """Perform the actual TCP packet analysis on a local PCAP file.""" try: packets = rdpcap(pcap_file) tcp_packets = [pkt for pkt in packets if pkt.haslayer(TCP)] if not tcp_packets: return { "file": pcap_file, "total_packets": len(packets), "tcp_packets_found": 0, "message": "No TCP packets found in this capture", } # Apply filtering if specified filtered_packets = self._apply_filters( tcp_packets, self._analysis_kwargs.get("server_ip"), self._analysis_kwargs.get("server_port"), ) # Route to appropriate analysis method if self._analysis_type == "connections": return self._analyze_connections(pcap_file, filtered_packets, packets) elif self._analysis_type == "anomalies": return self._analyze_anomalies(pcap_file, filtered_packets, packets) elif self._analysis_type == "retransmissions": return self._analyze_retrans(pcap_file, filtered_packets, packets) elif self._analysis_type == "traffic_flow": return self._analyze_flow(pcap_file, filtered_packets, packets) else: return {"error": f"Unknown analysis type: {self._analysis_type}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danohn/mcpacket'

If you have feedback or need assistance with the MCP directory API, please join our Discord server