analyze_traffic_flow
Analyze network traffic flow characteristics from PCAP files to identify traffic direction, asymmetry, RST sources, and data transfer patterns between client and server.
Instructions
Analyze bidirectional traffic flow characteristics.
Identifies traffic direction, asymmetry, RST sources, and data transfer patterns.
Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Server IP address (required) server_port: Optional filter for server port
Returns: A structured dictionary containing: - client_to_server: Client-to-server traffic statistics - server_to_client: Server-to-client traffic statistics - analysis: Asymmetry analysis and interpretations
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| pcap_file | Yes | ||
| server_ip | Yes | ||
| server_port | No |
Implementation Reference
- src/mcpcap/modules/tcp.py:134-161 (handler)The handler function implementing the analyze_traffic_flow tool. It sets the analysis_type to 'traffic_flow' and delegates to the shared packet analysis method.def analyze_traffic_flow( self, pcap_file: str, server_ip: str, server_port: Optional[int] = None, ) -> dict[str, Any]: """ Analyze bidirectional traffic flow characteristics. Identifies traffic direction, asymmetry, RST sources, and data transfer patterns. Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Server IP address (required) server_port: Optional filter for server port Returns: A structured dictionary containing: - client_to_server: Client-to-server traffic statistics - server_to_client: Server-to-client traffic statistics - analysis: Asymmetry analysis and interpretations """ return self.analyze_packets( pcap_file, analysis_type="traffic_flow", server_ip=server_ip, server_port=server_port, )
- src/mcpcap/core/server.py:59-62 (registration)Registration of the analyze_traffic_flow tool (line 62) as part of the TCP module tools in the MCP server.self.mcp.tool(module.analyze_tcp_connections) self.mcp.tool(module.analyze_tcp_anomalies) self.mcp.tool(module.analyze_tcp_retransmissions) self.mcp.tool(module.analyze_traffic_flow)
- src/mcpcap/modules/tcp.py:472-570 (helper)Core helper method implementing the traffic flow analysis logic, computing bidirectional statistics, asymmetry ratios, and RST source analysis.def _analyze_flow( self, pcap_file: str, tcp_packets: list, all_packets: list ) -> dict[str, Any]: """Analyze traffic flow.""" server_ip = self._analysis_kwargs.get("server_ip") server_port = self._analysis_kwargs.get("server_port") if not server_ip: return {"error": "server_ip is required for traffic flow analysis"} client_to_server = { "packet_count": 0, "byte_count": 0, "syn_count": 0, "rst_count": 0, "fin_count": 0, "data_packets": 0, "retransmissions": 0, } server_to_client = { "packet_count": 0, "byte_count": 0, "syn_count": 0, "rst_count": 0, "fin_count": 0, "data_packets": 0, "retransmissions": 0, } client_seqs = set() server_seqs = set() for pkt in tcp_packets: src_ip, dst_ip = self._extract_ips(pkt) tcp = pkt[TCP] flags = tcp.flags # Determine direction is_client_to_server = dst_ip == server_ip if server_port: is_client_to_server = tcp.dport == server_port stats = client_to_server if is_client_to_server else server_to_client seqs = client_seqs if is_client_to_server else server_seqs stats["packet_count"] += 1 stats["byte_count"] += len(pkt) if flags & 0x02: stats["syn_count"] += 1 if flags & 0x04: stats["rst_count"] += 1 if flags & 0x01: stats["fin_count"] += 1 if len(tcp.payload) > 0: stats["data_packets"] += 1 # Retransmissions seq = tcp.seq if seq in seqs and len(tcp.payload) > 0: stats["retransmissions"] += 1 seqs.add(seq) # Analysis total_client = client_to_server["packet_count"] total_server = server_to_client["packet_count"] asymmetry_ratio = total_client / total_server if total_server > 0 else 0 # Determine primary RST source client_rst = client_to_server["rst_count"] server_rst = server_to_client["rst_count"] if client_rst > server_rst: rst_source = "client" interpretation = f"Client sends all RST packets ({client_rst} vs {server_rst}). Server responds normally. Suggests client-side issue (possibly firewall)." elif server_rst > client_rst: rst_source = "server" interpretation = "Server sends more RST packets. Suggests server-side rejection or service issue." else: rst_source = "balanced" interpretation = "Balanced RST distribution." return { "file": pcap_file, "analysis_timestamp": datetime.now().isoformat(), "server": f"{server_ip}:{server_port or 'any'}", "client_to_server": client_to_server, "server_to_client": server_to_client, "analysis": { "asymmetry_ratio": asymmetry_ratio, "primary_rst_source": rst_source, "data_flow_direction": "client_heavy" if asymmetry_ratio > 1.2 else "server_heavy" if asymmetry_ratio < 0.8 else "balanced", "interpretation": interpretation, }, }
- src/mcpcap/modules/tcp.py:140-155 (schema)Schema definition via docstring specifying input arguments and output structure for the tool.""" Analyze bidirectional traffic flow characteristics. Identifies traffic direction, asymmetry, RST sources, and data transfer patterns. Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Server IP address (required) server_port: Optional filter for server port Returns: A structured dictionary containing: - client_to_server: Client-to-server traffic statistics - server_to_client: Server-to-client traffic statistics - analysis: Asymmetry analysis and interpretations """