analyze_tcp_connections
Analyze TCP connection states from PCAP files to detect problems and summarize connection statistics.
Instructions
Analyze TCP connection states and lifecycle.
This is the core tool for TCP connection analysis, solving 80% of TCP-related issues.
FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through Claude's web interface. Files must be accessible via URL or local file path.
SUPPORTED INPUT FORMATS:
Remote files: "https://example.com/capture.pcap"
Local files: "/absolute/path/to/capture.pcap"
UNSUPPORTED:
Files uploaded through Claude's file upload feature
Base64 file content
Relative file paths
Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Optional filter for server IP address server_port: Optional filter for server port detailed: Whether to return detailed connection information
Returns: A structured dictionary containing TCP connection analysis results including: - summary: Overall connection statistics - connections: List of individual connections with states - issues: Detected problems
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| pcap_file | Yes | ||
| server_ip | No | ||
| server_port | No | ||
| detailed | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/mcpcap/modules/tcp.py:21-63 (handler)The main function that executes the 'analyze_tcp_connections' tool logic. It delegates to analyze_packets() which routes to _analyze_connections() for the actual connection analysis, then to _analyze_single_connection() for per-connection details.
def analyze_tcp_connections( self, pcap_file: str, server_ip: str | None = None, server_port: int | None = None, detailed: bool = False, ) -> dict[str, Any]: """ Analyze TCP connection states and lifecycle. This is the core tool for TCP connection analysis, solving 80% of TCP-related issues. FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through Claude's web interface. Files must be accessible via URL or local file path. SUPPORTED INPUT FORMATS: - Remote files: "https://example.com/capture.pcap" - Local files: "/absolute/path/to/capture.pcap" UNSUPPORTED: - Files uploaded through Claude's file upload feature - Base64 file content - Relative file paths Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Optional filter for server IP address server_port: Optional filter for server port detailed: Whether to return detailed connection information Returns: A structured dictionary containing TCP connection analysis results including: - summary: Overall connection statistics - connections: List of individual connections with states - issues: Detected problems """ return self.analyze_packets( pcap_file, analysis_type="connections", server_ip=server_ip, server_port=server_port, detailed=detailed, ) - src/mcpcap/modules/tcp.py:234-300 (helper)_analyze_connections() performs the core TCP connection analysis: groups packets by connection (4-tuple), calls _analyze_single_connection() per connection, and aggregates summary statistics (handshakes, resets, normal closes, issues).
def _analyze_connections( self, pcap_file: str, tcp_packets: list, all_packets: list ) -> dict[str, Any]: """Analyze TCP connections.""" # Group packets by connection (4-tuple) connections = defaultdict(list) for pkt in tcp_packets: conn_key = self._get_connection_key(pkt) connections[conn_key].append(pkt) # Analyze each connection connection_details = [] successful_handshakes = 0 failed_handshakes = 0 reset_connections = 0 normal_close = 0 issues = [] for conn_key, pkts in connections.items(): conn_info = self._analyze_single_connection(conn_key, pkts) connection_details.append(conn_info) if conn_info["handshake_completed"]: successful_handshakes += 1 else: failed_handshakes += 1 if conn_info["close_reason"] == "reset": reset_connections += 1 elif conn_info["close_reason"] == "normal": normal_close += 1 # Detect issues total_retrans = sum(c["retransmissions"] for c in connection_details) if reset_connections > 0: issues.append(f"{reset_connections} connections terminated by RST") if total_retrans > 0: issues.append(f"{total_retrans} retransmissions detected") if failed_handshakes > 0: issues.append(f"{failed_handshakes} failed handshakes") return { "file": pcap_file, "analysis_timestamp": datetime.now().isoformat(), "total_packets": len(all_packets), "tcp_packets_found": len(tcp_packets), "filter": { "server_ip": self._analysis_kwargs.get("server_ip"), "server_port": self._analysis_kwargs.get("server_port"), }, "summary": { "total_connections": len(connections), "successful_handshakes": successful_handshakes, "failed_handshakes": failed_handshakes, "established_connections": successful_handshakes, "reset_connections": reset_connections, "normal_close": normal_close, "active_connections": len(connections) - reset_connections - normal_close, }, "connections": connection_details if self._analysis_kwargs.get("detailed", False) else connection_details[:10], "issues": issues, } - src/mcpcap/modules/tcp.py:302-374 (helper)_analyze_single_connection() analyzes an individual TCP connection: counts SYN/SYN-ACK/ACK/RST/FIN flags, detects retransmissions, determines handshake completion and close reason.
def _analyze_single_connection( self, conn_key: tuple, packets: list ) -> dict[str, Any]: """Analyze a single TCP connection.""" client, server = self._identify_connection_roles(packets) syn_count = 0 syn_ack_count = 0 ack_count = 0 rst_count = 0 fin_count = 0 data_packets = 0 retransmissions = 0 seen_seqs_by_sender: dict[tuple[str, int], set[int]] = defaultdict(set) handshake_completed = False for pkt in packets: src_ip, _ = self._extract_ips(pkt) tcp = pkt[TCP] flags = tcp.flags # Count flags if flags & 0x02: # SYN syn_count += 1 if flags & 0x12 == 0x12: # SYN-ACK syn_ack_count += 1 if flags & 0x10: # ACK ack_count += 1 if flags & 0x04: # RST rst_count += 1 if flags & 0x01: # FIN fin_count += 1 # Check for data if len(tcp.payload) > 0: data_packets += 1 # Detect retransmissions (simplified) seq = tcp.seq sender = (src_ip, tcp.sport) if seq in seen_seqs_by_sender[sender] and len(tcp.payload) > 0: retransmissions += 1 seen_seqs_by_sender[sender].add(seq) # Determine handshake completion if syn_count > 0 and syn_ack_count > 0 and ack_count > 0: handshake_completed = True # Determine close reason close_reason = "unknown" if rst_count > 0: close_reason = "reset" elif fin_count >= 2: close_reason = "normal" elif len(packets) > 3: close_reason = "active" return { "client": f"{client[0]}:{client[1]}", "server": f"{server[0]}:{server[1]}", "state": "closed" if close_reason in ["reset", "normal"] else "active", "handshake_completed": handshake_completed, "syn_count": syn_count, "syn_ack_count": syn_ack_count, "ack_count": ack_count, "rst_count": rst_count, "fin_count": fin_count, "data_packets": data_packets, "retransmissions": retransmissions, "close_reason": close_reason, "packet_count": len(packets), } - src/mcpcap/core/server.py:49-67 (registration)The tool is registered with the MCP server in _register_tools() via self.mcp.tool(module.analyze_tcp_connections) when the 'tcp' module is enabled.
def _register_tools(self) -> None: """Register all available tools with the MCP server.""" # Register tools for each loaded module for module_name, module in self.modules.items(): if module_name == "dns": self.mcp.tool(module.analyze_dns_packets) elif module_name == "dhcp": self.mcp.tool(module.analyze_dhcp_packets) elif module_name == "icmp": self.mcp.tool(module.analyze_icmp_packets) elif module_name == "capinfos": self.mcp.tool(module.analyze_capinfos) elif module_name == "tcp": self.mcp.tool(module.analyze_tcp_connections) self.mcp.tool(module.analyze_tcp_anomalies) self.mcp.tool(module.analyze_tcp_retransmissions) self.mcp.tool(module.analyze_traffic_flow) elif module_name == "sip": self.mcp.tool(module.analyze_sip_packets) - src/mcpcap/modules/tcp.py:21-63 (schema)The analyze_tcp_connections function signature and docstring define the input schema: pcap_file (str), server_ip (optional str), server_port (optional int), detailed (bool). Returns a dict with summary, connections, and issues.
def analyze_tcp_connections( self, pcap_file: str, server_ip: str | None = None, server_port: int | None = None, detailed: bool = False, ) -> dict[str, Any]: """ Analyze TCP connection states and lifecycle. This is the core tool for TCP connection analysis, solving 80% of TCP-related issues. FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through Claude's web interface. Files must be accessible via URL or local file path. SUPPORTED INPUT FORMATS: - Remote files: "https://example.com/capture.pcap" - Local files: "/absolute/path/to/capture.pcap" UNSUPPORTED: - Files uploaded through Claude's file upload feature - Base64 file content - Relative file paths Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Optional filter for server IP address server_port: Optional filter for server port detailed: Whether to return detailed connection information Returns: A structured dictionary containing TCP connection analysis results including: - summary: Overall connection statistics - connections: List of individual connections with states - issues: Detected problems """ return self.analyze_packets( pcap_file, analysis_type="connections", server_ip=server_ip, server_port=server_port, detailed=detailed, )