analyze_traffic_flow
Analyze bidirectional traffic flow from a PCAP file to identify traffic direction, asymmetry, RST sources, and data transfer patterns. Requires server IP and optional port filter.
Instructions
Analyze bidirectional traffic flow characteristics.
Identifies traffic direction, asymmetry, RST sources, and data transfer patterns.
Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Server IP address (required) server_port: Optional filter for server port
Returns: A structured dictionary containing: - client_to_server: Client-to-server traffic statistics - server_to_client: Server-to-client traffic statistics - analysis: Asymmetry analysis and interpretations
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| pcap_file | Yes | ||
| server_ip | Yes | ||
| server_port | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/mcpcap/modules/tcp.py:134-161 (handler)The main handler function for the 'analyze_traffic_flow' tool. It accepts pcap_file, server_ip, and optional server_port, then delegates to self.analyze_packets with analysis_type='traffic_flow'.
def analyze_traffic_flow( self, pcap_file: str, server_ip: str, server_port: int | None = None, ) -> dict[str, Any]: """ Analyze bidirectional traffic flow characteristics. Identifies traffic direction, asymmetry, RST sources, and data transfer patterns. Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Server IP address (required) server_port: Optional filter for server port Returns: A structured dictionary containing: - client_to_server: Client-to-server traffic statistics - server_to_client: Server-to-client traffic statistics - analysis: Asymmetry analysis and interpretations """ return self.analyze_packets( pcap_file, analysis_type="traffic_flow", server_ip=server_ip, server_port=server_port, ) - src/mcpcap/modules/tcp.py:473-571 (handler)The private helper _analyze_flow that implements the actual traffic flow analysis logic. It categorizes packets by direction (client-to-server vs server-to-client), counts packets/bytes/flags/RST/FIN/retransmissions, computes asymmetry ratio, identifies primary RST source, and returns structured results.
def _analyze_flow( self, pcap_file: str, tcp_packets: list, all_packets: list ) -> dict[str, Any]: """Analyze traffic flow.""" server_ip = self._analysis_kwargs.get("server_ip") server_port = self._analysis_kwargs.get("server_port") if not server_ip: return {"error": "server_ip is required for traffic flow analysis"} client_to_server = { "packet_count": 0, "byte_count": 0, "syn_count": 0, "rst_count": 0, "fin_count": 0, "data_packets": 0, "retransmissions": 0, } server_to_client = { "packet_count": 0, "byte_count": 0, "syn_count": 0, "rst_count": 0, "fin_count": 0, "data_packets": 0, "retransmissions": 0, } client_seqs = set() server_seqs = set() for pkt in tcp_packets: src_ip, dst_ip = self._extract_ips(pkt) tcp = pkt[TCP] flags = tcp.flags # Determine direction is_client_to_server = dst_ip == server_ip if server_port: is_client_to_server = tcp.dport == server_port stats = client_to_server if is_client_to_server else server_to_client seqs = client_seqs if is_client_to_server else server_seqs stats["packet_count"] += 1 stats["byte_count"] += len(pkt) if flags & 0x02: stats["syn_count"] += 1 if flags & 0x04: stats["rst_count"] += 1 if flags & 0x01: stats["fin_count"] += 1 if len(tcp.payload) > 0: stats["data_packets"] += 1 # Retransmissions seq = tcp.seq if seq in seqs and len(tcp.payload) > 0: stats["retransmissions"] += 1 seqs.add(seq) # Analysis total_client = client_to_server["packet_count"] total_server = server_to_client["packet_count"] asymmetry_ratio = total_client / total_server if total_server > 0 else 0 # Determine primary RST source client_rst = client_to_server["rst_count"] server_rst = server_to_client["rst_count"] if client_rst > server_rst: rst_source = "client" interpretation = f"Client sends all RST packets ({client_rst} vs {server_rst}). Server responds normally. Suggests client-side issue (possibly firewall)." elif server_rst > client_rst: rst_source = "server" interpretation = "Server sends more RST packets. Suggests server-side rejection or service issue." else: rst_source = "balanced" interpretation = "Balanced RST distribution." return { "file": pcap_file, "analysis_timestamp": datetime.now().isoformat(), "server": f"{server_ip}:{server_port or 'any'}", "client_to_server": client_to_server, "server_to_client": server_to_client, "analysis": { "asymmetry_ratio": asymmetry_ratio, "primary_rst_source": rst_source, "data_flow_direction": "client_heavy" if asymmetry_ratio > 1.2 else "server_heavy" if asymmetry_ratio < 0.8 else "balanced", "interpretation": interpretation, }, } - src/mcpcap/core/server.py:65-65 (registration)Registration of analyze_traffic_flow as an MCP tool via self.mcp.tool(module.analyze_traffic_flow) in the TCP module registration block.
self.mcp.tool(module.analyze_traffic_flow) - src/mcpcap/modules/tcp.py:134-161 (schema)The function signature and docstring define the input schema (pcap_file: str, server_ip: str, server_port: Optional[int]) and output schema (dict with client_to_server, server_to_client, analysis fields).
def analyze_traffic_flow( self, pcap_file: str, server_ip: str, server_port: int | None = None, ) -> dict[str, Any]: """ Analyze bidirectional traffic flow characteristics. Identifies traffic direction, asymmetry, RST sources, and data transfer patterns. Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Server IP address (required) server_port: Optional filter for server port Returns: A structured dictionary containing: - client_to_server: Client-to-server traffic statistics - server_to_client: Server-to-client traffic statistics - analysis: Asymmetry analysis and interpretations """ return self.analyze_packets( pcap_file, analysis_type="traffic_flow", server_ip=server_ip, server_port=server_port, )