Skip to main content
Glama
danohn

mcpcap

by danohn

analyze_traffic_flow

Analyze network traffic flow characteristics from PCAP files to identify traffic direction, asymmetry, RST sources, and data transfer patterns between client and server.

Instructions

Analyze bidirectional traffic flow characteristics.

Identifies traffic direction, asymmetry, RST sources, and data transfer patterns.

Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Server IP address (required) server_port: Optional filter for server port

Returns: A structured dictionary containing: - client_to_server: Client-to-server traffic statistics - server_to_client: Server-to-client traffic statistics - analysis: Asymmetry analysis and interpretations

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pcap_fileYes
server_ipYes
server_portNo

Implementation Reference

  • The handler function implementing the analyze_traffic_flow tool. It sets the analysis_type to 'traffic_flow' and delegates to the shared packet analysis method.
    def analyze_traffic_flow( self, pcap_file: str, server_ip: str, server_port: Optional[int] = None, ) -> dict[str, Any]: """ Analyze bidirectional traffic flow characteristics. Identifies traffic direction, asymmetry, RST sources, and data transfer patterns. Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Server IP address (required) server_port: Optional filter for server port Returns: A structured dictionary containing: - client_to_server: Client-to-server traffic statistics - server_to_client: Server-to-client traffic statistics - analysis: Asymmetry analysis and interpretations """ return self.analyze_packets( pcap_file, analysis_type="traffic_flow", server_ip=server_ip, server_port=server_port, )
  • Registration of the analyze_traffic_flow tool (line 62) as part of the TCP module tools in the MCP server.
    self.mcp.tool(module.analyze_tcp_connections) self.mcp.tool(module.analyze_tcp_anomalies) self.mcp.tool(module.analyze_tcp_retransmissions) self.mcp.tool(module.analyze_traffic_flow)
  • Core helper method implementing the traffic flow analysis logic, computing bidirectional statistics, asymmetry ratios, and RST source analysis.
    def _analyze_flow( self, pcap_file: str, tcp_packets: list, all_packets: list ) -> dict[str, Any]: """Analyze traffic flow.""" server_ip = self._analysis_kwargs.get("server_ip") server_port = self._analysis_kwargs.get("server_port") if not server_ip: return {"error": "server_ip is required for traffic flow analysis"} client_to_server = { "packet_count": 0, "byte_count": 0, "syn_count": 0, "rst_count": 0, "fin_count": 0, "data_packets": 0, "retransmissions": 0, } server_to_client = { "packet_count": 0, "byte_count": 0, "syn_count": 0, "rst_count": 0, "fin_count": 0, "data_packets": 0, "retransmissions": 0, } client_seqs = set() server_seqs = set() for pkt in tcp_packets: src_ip, dst_ip = self._extract_ips(pkt) tcp = pkt[TCP] flags = tcp.flags # Determine direction is_client_to_server = dst_ip == server_ip if server_port: is_client_to_server = tcp.dport == server_port stats = client_to_server if is_client_to_server else server_to_client seqs = client_seqs if is_client_to_server else server_seqs stats["packet_count"] += 1 stats["byte_count"] += len(pkt) if flags & 0x02: stats["syn_count"] += 1 if flags & 0x04: stats["rst_count"] += 1 if flags & 0x01: stats["fin_count"] += 1 if len(tcp.payload) > 0: stats["data_packets"] += 1 # Retransmissions seq = tcp.seq if seq in seqs and len(tcp.payload) > 0: stats["retransmissions"] += 1 seqs.add(seq) # Analysis total_client = client_to_server["packet_count"] total_server = server_to_client["packet_count"] asymmetry_ratio = total_client / total_server if total_server > 0 else 0 # Determine primary RST source client_rst = client_to_server["rst_count"] server_rst = server_to_client["rst_count"] if client_rst > server_rst: rst_source = "client" interpretation = f"Client sends all RST packets ({client_rst} vs {server_rst}). Server responds normally. Suggests client-side issue (possibly firewall)." elif server_rst > client_rst: rst_source = "server" interpretation = "Server sends more RST packets. Suggests server-side rejection or service issue." else: rst_source = "balanced" interpretation = "Balanced RST distribution." return { "file": pcap_file, "analysis_timestamp": datetime.now().isoformat(), "server": f"{server_ip}:{server_port or 'any'}", "client_to_server": client_to_server, "server_to_client": server_to_client, "analysis": { "asymmetry_ratio": asymmetry_ratio, "primary_rst_source": rst_source, "data_flow_direction": "client_heavy" if asymmetry_ratio > 1.2 else "server_heavy" if asymmetry_ratio < 0.8 else "balanced", "interpretation": interpretation, }, }
  • Schema definition via docstring specifying input arguments and output structure for the tool.
    """ Analyze bidirectional traffic flow characteristics. Identifies traffic direction, asymmetry, RST sources, and data transfer patterns. Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Server IP address (required) server_port: Optional filter for server port Returns: A structured dictionary containing: - client_to_server: Client-to-server traffic statistics - server_to_client: Server-to-client traffic statistics - analysis: Asymmetry analysis and interpretations """

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danohn/mcpcap'

If you have feedback or need assistance with the MCP directory API, please join our Discord server