Skip to main content
Glama
mcpcap

mcpacket

by mcpcap

analyze_traffic_flow

Analyze bidirectional traffic flow from a PCAP file to identify traffic direction, asymmetry, RST sources, and data transfer patterns. Requires server IP and optional port filter.

Instructions

Analyze bidirectional traffic flow characteristics.

Identifies traffic direction, asymmetry, RST sources, and data transfer patterns.

Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Server IP address (required) server_port: Optional filter for server port

Returns: A structured dictionary containing: - client_to_server: Client-to-server traffic statistics - server_to_client: Server-to-client traffic statistics - analysis: Asymmetry analysis and interpretations

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pcap_fileYes
server_ipYes
server_portNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • The main handler function for the 'analyze_traffic_flow' tool. It accepts pcap_file, server_ip, and optional server_port, then delegates to self.analyze_packets with analysis_type='traffic_flow'.
    def analyze_traffic_flow(
        self,
        pcap_file: str,
        server_ip: str,
        server_port: int | None = None,
    ) -> dict[str, Any]:
        """
        Analyze bidirectional traffic flow characteristics.
    
        Identifies traffic direction, asymmetry, RST sources, and data transfer patterns.
    
        Args:
            pcap_file: HTTP URL or absolute local file path to PCAP file
            server_ip: Server IP address (required)
            server_port: Optional filter for server port
    
        Returns:
            A structured dictionary containing:
            - client_to_server: Client-to-server traffic statistics
            - server_to_client: Server-to-client traffic statistics
            - analysis: Asymmetry analysis and interpretations
        """
        return self.analyze_packets(
            pcap_file,
            analysis_type="traffic_flow",
            server_ip=server_ip,
            server_port=server_port,
        )
  • The private helper _analyze_flow that implements the actual traffic flow analysis logic. It categorizes packets by direction (client-to-server vs server-to-client), counts packets/bytes/flags/RST/FIN/retransmissions, computes asymmetry ratio, identifies primary RST source, and returns structured results.
    def _analyze_flow(
        self, pcap_file: str, tcp_packets: list, all_packets: list
    ) -> dict[str, Any]:
        """Analyze traffic flow."""
        server_ip = self._analysis_kwargs.get("server_ip")
        server_port = self._analysis_kwargs.get("server_port")
    
        if not server_ip:
            return {"error": "server_ip is required for traffic flow analysis"}
    
        client_to_server = {
            "packet_count": 0,
            "byte_count": 0,
            "syn_count": 0,
            "rst_count": 0,
            "fin_count": 0,
            "data_packets": 0,
            "retransmissions": 0,
        }
    
        server_to_client = {
            "packet_count": 0,
            "byte_count": 0,
            "syn_count": 0,
            "rst_count": 0,
            "fin_count": 0,
            "data_packets": 0,
            "retransmissions": 0,
        }
    
        client_seqs = set()
        server_seqs = set()
    
        for pkt in tcp_packets:
            src_ip, dst_ip = self._extract_ips(pkt)
            tcp = pkt[TCP]
            flags = tcp.flags
    
            # Determine direction
            is_client_to_server = dst_ip == server_ip
            if server_port:
                is_client_to_server = tcp.dport == server_port
    
            stats = client_to_server if is_client_to_server else server_to_client
            seqs = client_seqs if is_client_to_server else server_seqs
    
            stats["packet_count"] += 1
            stats["byte_count"] += len(pkt)
    
            if flags & 0x02:
                stats["syn_count"] += 1
            if flags & 0x04:
                stats["rst_count"] += 1
            if flags & 0x01:
                stats["fin_count"] += 1
            if len(tcp.payload) > 0:
                stats["data_packets"] += 1
    
            # Retransmissions
            seq = tcp.seq
            if seq in seqs and len(tcp.payload) > 0:
                stats["retransmissions"] += 1
            seqs.add(seq)
    
        # Analysis
        total_client = client_to_server["packet_count"]
        total_server = server_to_client["packet_count"]
        asymmetry_ratio = total_client / total_server if total_server > 0 else 0
    
        # Determine primary RST source
        client_rst = client_to_server["rst_count"]
        server_rst = server_to_client["rst_count"]
        if client_rst > server_rst:
            rst_source = "client"
            interpretation = f"Client sends all RST packets ({client_rst} vs {server_rst}). Server responds normally. Suggests client-side issue (possibly firewall)."
        elif server_rst > client_rst:
            rst_source = "server"
            interpretation = "Server sends more RST packets. Suggests server-side rejection or service issue."
        else:
            rst_source = "balanced"
            interpretation = "Balanced RST distribution."
    
        return {
            "file": pcap_file,
            "analysis_timestamp": datetime.now().isoformat(),
            "server": f"{server_ip}:{server_port or 'any'}",
            "client_to_server": client_to_server,
            "server_to_client": server_to_client,
            "analysis": {
                "asymmetry_ratio": asymmetry_ratio,
                "primary_rst_source": rst_source,
                "data_flow_direction": "client_heavy"
                if asymmetry_ratio > 1.2
                else "server_heavy"
                if asymmetry_ratio < 0.8
                else "balanced",
                "interpretation": interpretation,
            },
        }
  • Registration of analyze_traffic_flow as an MCP tool via self.mcp.tool(module.analyze_traffic_flow) in the TCP module registration block.
    self.mcp.tool(module.analyze_traffic_flow)
  • The function signature and docstring define the input schema (pcap_file: str, server_ip: str, server_port: Optional[int]) and output schema (dict with client_to_server, server_to_client, analysis fields).
    def analyze_traffic_flow(
        self,
        pcap_file: str,
        server_ip: str,
        server_port: int | None = None,
    ) -> dict[str, Any]:
        """
        Analyze bidirectional traffic flow characteristics.
    
        Identifies traffic direction, asymmetry, RST sources, and data transfer patterns.
    
        Args:
            pcap_file: HTTP URL or absolute local file path to PCAP file
            server_ip: Server IP address (required)
            server_port: Optional filter for server port
    
        Returns:
            A structured dictionary containing:
            - client_to_server: Client-to-server traffic statistics
            - server_to_client: Server-to-client traffic statistics
            - analysis: Asymmetry analysis and interpretations
        """
        return self.analyze_packets(
            pcap_file,
            analysis_type="traffic_flow",
            server_ip=server_ip,
            server_port=server_port,
        )
Behavior3/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden. It explains the tool analyzes bidirectional traffic and returns structured statistics, but does not disclose behavioral traits like resource usage, permissions needed, or side effects. The return structure provides some transparency.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is structured with a brief opening, bullet points, and explicit Args/Returns sections. It is relatively concise and front-loaded with the purpose, though the bullet points could be integrated into a single paragraph for better flow.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness4/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the lack of annotations and moderate complexity (3 parameters, output schema exists), the description covers the purpose, parameters, and return structure adequately. It is missing usage guidelines and deeper behavioral context, but the output schema description compensates for return value understanding.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The schema has no descriptions (0% coverage), but the description adds meaning for all three parameters: pcap_file is a URL or local path, server_ip is required, server_port is an optional filter. This significantly enhances understanding beyond the raw schema types.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The name 'analyze_traffic_flow' and description clearly state it analyzes bidirectional traffic flow characteristics, including direction, asymmetry, RST sources, and data transfer patterns. This distinguishes it from sibling tools like analyze_tcp_connections or analyze_dns_packets, which focus on specific protocols or types of analysis.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines3/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description implies usage for analyzing traffic flow but does not provide explicit guidance on when to use this tool versus alternatives such as analyze_tcp_anomalies or analyze_dhcp_packets. No criteria for selection or exclusions are given.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mcpcap/mcpacket'

If you have feedback or need assistance with the MCP directory API, please join our Discord server