Skip to main content
Glama
mcpcap

mcpacket

by mcpcap

analyze_sip_packets

Analyze SIP signaling from PCAP files, extracting structured call details for network troubleshooting.

Instructions

Analyze SIP packets from a PCAP file and return structured signaling details.

FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through Claude's web interface. Files must be accessible via URL or local file path.

SUPPORTED INPUT FORMATS:

  • Remote files: "https://example.com/capture.pcap"

  • Local files: "/absolute/path/to/capture.pcap"

UNSUPPORTED:

  • Files uploaded through Claude's file upload feature

  • Base64 file content

  • Relative file paths

Args: pcap_file: HTTP URL or absolute local file path to PCAP file

Returns: A structured dictionary containing SIP packet analysis results

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pcap_fileYes

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • Main handler function for the 'analyze_sip_packets' tool. Accepts a pcap_file path/URL and delegates to BaseModule.analyze_packets() which handles remote download or local file validation, then calls _analyze_protocol_file() for the actual SIP analysis logic.
    def analyze_sip_packets(self, pcap_file: str) -> dict[str, Any]:
        """
        Analyze SIP packets from a PCAP file and return structured signaling details.
    
        FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through
        Claude's web interface. Files must be accessible via URL or local file path.
    
        SUPPORTED INPUT FORMATS:
        - Remote files: "https://example.com/capture.pcap"
        - Local files: "/absolute/path/to/capture.pcap"
    
        UNSUPPORTED:
        - Files uploaded through Claude's file upload feature
        - Base64 file content
        - Relative file paths
    
        Args:
            pcap_file: HTTP URL or absolute local file path to PCAP file
    
        Returns:
            A structured dictionary containing SIP packet analysis results
        """
        return self.analyze_packets(pcap_file)
  • _analyze_protocol_file performs the actual SIP analysis: reads PCAP via Scapy's rdpcap, filters SIP packets using _is_sip_packet, respects max_packets config, analyzes each packet via _analyze_sip_packet, generates statistics via _generate_statistics, and returns a structured result dictionary.
    def _analyze_protocol_file(self, pcap_file: str) -> dict[str, Any]:
        """Perform the actual SIP packet analysis on a local PCAP file."""
        try:
            packets = rdpcap(pcap_file)
            sip_packets = [pkt for pkt in packets if self._is_sip_packet(pkt)]
    
            if not sip_packets:
                return {
                    "file": pcap_file,
                    "total_packets_in_file": len(packets),
                    "sip_packets_found": 0,
                    "message": "No SIP packets found in this capture",
                }
    
            packets_to_analyze = sip_packets
            limited = False
            if self.config.max_packets and len(sip_packets) > self.config.max_packets:
                packets_to_analyze = sip_packets[: self.config.max_packets]
                limited = True
    
            packet_details = [
                self._analyze_sip_packet(pkt, packet_number)
                for packet_number, pkt in enumerate(packets_to_analyze, 1)
            ]
            stats = self._generate_statistics(packet_details)
    
            result = {
                "file": pcap_file,
                "analysis_timestamp": datetime.now().isoformat(),
                "total_packets_in_file": len(packets),
                "sip_packets_found": len(sip_packets),
                "sip_packets_analyzed": len(packet_details),
                "statistics": stats,
                "packets": packet_details,
            }
    
            if limited:
                result["note"] = (
                    f"Analysis limited to first {self.config.max_packets} SIP packets due to --max-packets setting"
                )
    
            return result
    
        except Exception as e:
            return {
                "error": f"Error reading PCAP file '{pcap_file}': {str(e)}",
                "file": pcap_file,
            }
  • The SIP module (SIPModule class) defines the protocol constants (SIP_METHODS, SIP_PORTS) and all helper methods for packet detection, parsing, and statistics generation used by the tool. Lines 12-28 define the schema-like constants for SIP method detection and port matching.
    """SIP analysis module."""
    
    from collections import Counter
    from datetime import datetime
    from typing import Any
    
    from fastmcp import FastMCP
    from scapy.all import IP, TCP, UDP, IPv6, Raw, rdpcap
    
    from .base import BaseModule
    
    SIP_METHODS = {
        "ACK",
        "BYE",
        "CANCEL",
        "INFO",
        "INVITE",
        "MESSAGE",
        "NOTIFY",
        "OPTIONS",
        "PRACK",
        "PUBLISH",
        "REFER",
        "REGISTER",
        "SUBSCRIBE",
        "UPDATE",
    }
    SIP_PORTS = {5060, 5061}
    
    
    class SIPModule(BaseModule):
        """Module for analyzing SIP packets in PCAP files."""
    
        @property
        def protocol_name(self) -> str:
            """Return the name of the protocol this module analyzes."""
            return "SIP"
    
        def analyze_sip_packets(self, pcap_file: str) -> dict[str, Any]:
            """
            Analyze SIP packets from a PCAP file and return structured signaling details.
    
            FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through
            Claude's web interface. Files must be accessible via URL or local file path.
    
            SUPPORTED INPUT FORMATS:
            - Remote files: "https://example.com/capture.pcap"
            - Local files: "/absolute/path/to/capture.pcap"
    
            UNSUPPORTED:
            - Files uploaded through Claude's file upload feature
            - Base64 file content
            - Relative file paths
    
            Args:
                pcap_file: HTTP URL or absolute local file path to PCAP file
    
            Returns:
                A structured dictionary containing SIP packet analysis results
            """
            return self.analyze_packets(pcap_file)
    
        def _analyze_protocol_file(self, pcap_file: str) -> dict[str, Any]:
            """Perform the actual SIP packet analysis on a local PCAP file."""
            try:
                packets = rdpcap(pcap_file)
                sip_packets = [pkt for pkt in packets if self._is_sip_packet(pkt)]
    
                if not sip_packets:
                    return {
                        "file": pcap_file,
                        "total_packets_in_file": len(packets),
                        "sip_packets_found": 0,
                        "message": "No SIP packets found in this capture",
                    }
    
                packets_to_analyze = sip_packets
                limited = False
                if self.config.max_packets and len(sip_packets) > self.config.max_packets:
                    packets_to_analyze = sip_packets[: self.config.max_packets]
                    limited = True
    
                packet_details = [
                    self._analyze_sip_packet(pkt, packet_number)
                    for packet_number, pkt in enumerate(packets_to_analyze, 1)
                ]
                stats = self._generate_statistics(packet_details)
    
                result = {
                    "file": pcap_file,
                    "analysis_timestamp": datetime.now().isoformat(),
                    "total_packets_in_file": len(packets),
                    "sip_packets_found": len(sip_packets),
                    "sip_packets_analyzed": len(packet_details),
                    "statistics": stats,
                    "packets": packet_details,
                }
    
                if limited:
                    result["note"] = (
                        f"Analysis limited to first {self.config.max_packets} SIP packets due to --max-packets setting"
                    )
    
                return result
    
            except Exception as e:
                return {
                    "error": f"Error reading PCAP file '{pcap_file}': {str(e)}",
                    "file": pcap_file,
                }
    
        def _is_sip_packet(self, pkt: Any) -> bool:
            """Check whether a packet contains SIP payload data."""
            if not pkt.haslayer(Raw):
                return False
    
            if not pkt.haslayer(UDP) and not pkt.haslayer(TCP):
                return False
    
            payload = bytes(pkt[Raw].load)
            if not payload:
                return False
    
            return self._is_sip_payload(payload)
    
        def _is_sip_payload(self, payload: bytes) -> bool:
            """Check whether payload bytes look like a SIP message."""
            try:
                first_line = (
                    payload.decode("utf-8", errors="ignore").splitlines()[0].strip()
                )
            except IndexError:
                return False
    
            if first_line.startswith("SIP/2.0 "):
                return True
    
            method = first_line.split(" ", 1)[0].upper()
            return method in SIP_METHODS
    
        def _analyze_sip_packet(self, pkt: Any, packet_number: int) -> dict[str, Any]:
            """Analyze a single SIP packet."""
            payload = bytes(pkt[Raw].load).decode("utf-8", errors="replace")
            start_line, headers, body = self._parse_sip_message(payload)
            src_ip, dst_ip = self._extract_ips(pkt)
            transport, src_port, dst_port = self._extract_transport(pkt)
            message_type, parsed_message = self._parse_start_line(start_line)
    
            via_header = headers.get("via", "")
            content_length = self._safe_int(headers.get("content-length"))
            known_port_match = src_port in SIP_PORTS or dst_port in SIP_PORTS
    
            packet_info = {
                "packet_number": packet_number,
                "timestamp": datetime.fromtimestamp(float(pkt.time)).isoformat(),
                "source_ip": src_ip,
                "destination_ip": dst_ip,
                "source_port": src_port,
                "destination_port": dst_port,
                "transport": transport,
                "message_type": message_type,
                "start_line": start_line,
                "call_id": headers.get("call-id", ""),
                "cseq": headers.get("cseq", ""),
                "from": headers.get("from", ""),
                "to": headers.get("to", ""),
                "contact": headers.get("contact", ""),
                "user_agent": headers.get("user-agent", ""),
                "server": headers.get("server", ""),
                "via": via_header,
                "content_type": headers.get("content-type", ""),
                "content_length": content_length,
                "body_length": len(body.encode("utf-8")),
                "known_sip_port": known_port_match,
                "headers": headers,
                "summary": pkt.summary(),
            }
            packet_info.update(parsed_message)
            return packet_info
    
        def _parse_sip_message(self, payload: str) -> tuple[str, dict[str, str], str]:
            """Parse a SIP message into start line, headers, and body."""
            normalized = payload.replace("\r\n", "\n").replace("\r", "\n")
            header_part, _, body = normalized.partition("\n\n")
            header_lines = [line for line in header_part.split("\n") if line.strip()]
            start_line = header_lines[0].strip() if header_lines else ""
    
            headers: dict[str, str] = {}
            current_header: str | None = None
            for line in header_lines[1:]:
                if line.startswith((" ", "\t")) and current_header:
                    headers[current_header] = f"{headers[current_header]} {line.strip()}"
                    continue
    
                if ":" not in line:
                    continue
    
                key, value = line.split(":", 1)
                normalized_key = key.strip().lower()
                headers[normalized_key] = value.strip()
                current_header = normalized_key
    
            return start_line, headers, body
    
        def _parse_start_line(self, start_line: str) -> tuple[str, dict[str, Any]]:
            """Parse the SIP start line into either request or response data."""
            if start_line.startswith("SIP/2.0 "):
                parts = start_line.split(" ", 2)
                status_code = self._safe_int(parts[1]) if len(parts) > 1 else None
                return "response", {
                    "status_code": status_code,
                    "reason_phrase": parts[2] if len(parts) > 2 else "",
                }
    
            parts = start_line.split(" ", 2)
            method = parts[0].upper() if parts else ""
            return "request", {
                "method": method,
                "request_uri": parts[1] if len(parts) > 1 else "",
                "sip_version": parts[2] if len(parts) > 2 else "",
            }
    
        def _extract_ips(self, pkt: Any) -> tuple[str, str]:
            """Extract source and destination IP addresses."""
            if pkt.haslayer(IP):
                return pkt[IP].src, pkt[IP].dst
            if pkt.haslayer(IPv6):
                return pkt[IPv6].src, pkt[IPv6].dst
            return "unknown", "unknown"
    
        def _extract_transport(self, pkt: Any) -> tuple[str, int | None, int | None]:
            """Extract transport protocol and ports."""
            if pkt.haslayer(UDP):
                return "UDP", pkt[UDP].sport, pkt[UDP].dport
            if pkt.haslayer(TCP):
                return "TCP", pkt[TCP].sport, pkt[TCP].dport
            return "unknown", None, None
    
        def _safe_int(self, value: str | None) -> int | None:
            """Convert a header value to int when possible."""
            if value is None:
                return None
            try:
                return int(value.strip().split(" ", 1)[0])
            except (TypeError, ValueError, AttributeError):
                return None
    
        def _generate_statistics(self, packets: list[dict[str, Any]]) -> dict[str, Any]:
            """Generate SIP-specific statistics from analyzed packets."""
            requests = [packet for packet in packets if packet["message_type"] == "request"]
            responses = [
                packet for packet in packets if packet["message_type"] == "response"
            ]
    
            method_counts = Counter(packet.get("method", "") for packet in requests)
            response_classes = Counter()
            for packet in responses:
                status_code = packet.get("status_code")
                if isinstance(status_code, int):
                    response_classes[f"{status_code // 100}xx"] += 1
    
            call_ids = {packet["call_id"] for packet in packets if packet.get("call_id")}
            transports = Counter(packet["transport"] for packet in packets)
            user_agents = sorted(
                {packet["user_agent"] for packet in packets if packet.get("user_agent")}
            )
    
            return {
                "requests": len(requests),
                "responses": len(responses),
                "methods": dict(sorted(method_counts.items())),
                "response_classes": dict(sorted(response_classes.items())),
                "unique_call_ids": len(call_ids),
                "call_ids": sorted(call_ids),
                "transports": dict(sorted(transports.items())),
                "user_agents": user_agents,
            }
    
        def setup_prompts(self, mcp: FastMCP) -> None:
            """Set up SIP-specific prompts for the MCP server."""
    
            @mcp.prompt
            def sip_security_analysis():
                """Prompt for reviewing SIP traffic from a security perspective."""
                return """You are analyzing SIP signaling traffic for security issues. Focus on:
    
    1. Authentication failures, brute-force registration attempts, or credential misuse.
    2. Unusual call setup patterns, suspicious destinations, or unexpected SIP methods.
    3. Indicators of toll fraud, rogue endpoints, or malformed signaling.
    4. Exposure of internal addressing, software banners, or topology information.
    5. Concrete packet-level evidence and any missing context needed for confidence."""
    
            @mcp.prompt
            def sip_troubleshooting_analysis():
                """Prompt for troubleshooting SIP signaling behavior."""
                return """You are troubleshooting SIP signaling. Focus on:
    
    1. Call setup progression across INVITE, provisional responses, final responses, ACK, BYE, and CANCEL.
    2. Registration success or failure, including CSeq progression and response codes.
    3. Transport or addressing mismatches visible in Via, Contact, From, and To headers.
    4. Error response patterns such as 4xx, 5xx, or 6xx classes and the point where signaling fails.
    5. Concise next-step hypotheses grounded only in the capture contents."""
    
            @mcp.prompt
            def sip_forensic_investigation():
                """Prompt for reconstructing SIP activity for forensic work."""
                return """You are reconstructing SIP activity for a forensic investigation. Focus on:
    
    1. Building a timeline of registrations, call attempts, responses, and terminations.
    2. Correlating traffic by Call-ID, CSeq, source/destination IP, and transport.
    3. Identifying the apparent user agents, servers, and contacted SIP URIs.
    4. Highlighting failed calls, repeated attempts, and notable response codes.
    5. Preserving uncertainty explicitly when fields are missing or ambiguous."""
  • Registration of the tool: when the 'sip' module is loaded, 'self.mcp.tool(module.analyze_sip_packets)' registers the analyze_sip_packets function as an MCP tool.
    elif module_name == "sip":
        self.mcp.tool(module.analyze_sip_packets)
  • BaseModule.analyze_packets is the helper that analyze_sip_packets delegates to. It handles remote URLs (downloads to temp file) and local files (validates existence/extension), then calls the protocol-specific _analyze_protocol_file.
    def analyze_packets(self, pcap_file: str) -> dict[str, Any]:
        """Analyze packets from a PCAP file (local or remote).
    
        Args:
            pcap_file: Path to local PCAP file or HTTP URL to remote PCAP file
    
        Returns:
            A structured dictionary containing packet analysis results
        """
        # Check if this is a remote URL or local file
        if pcap_file.startswith(("http://", "https://")):
            return self._handle_remote_analysis(pcap_file)
        else:
            return self._handle_local_analysis(pcap_file)
    
    def _handle_remote_analysis(self, pcap_url: str) -> dict[str, Any]:
        """Handle remote PCAP file analysis."""
        try:
            # Download remote file to temporary location
            with tempfile.NamedTemporaryFile(suffix=".pcap", delete=False) as tmp_file:
                temp_path = tmp_file.name
    
            local_path = self._download_pcap_file(pcap_url, temp_path)
            result = self._analyze_protocol_file(local_path)
    
            # Clean up temporary file
            try:
                os.unlink(local_path)
            except OSError:
                pass  # Ignore cleanup errors
    
            return result
    
        except Exception as e:
            return {
                "error": f"Failed to download PCAP file '{pcap_url}': {str(e)}",
                "pcap_url": pcap_url,
            }
    
    def _handle_local_analysis(self, pcap_file: str) -> dict[str, Any]:
        """Handle local PCAP file analysis."""
        # Validate file exists
        if not os.path.exists(pcap_file):
            return {
                "error": f"PCAP file not found: {pcap_file}",
                "pcap_file": pcap_file,
            }
    
        # Validate file extension
        if not pcap_file.lower().endswith((".pcap", ".pcapng", ".cap")):
            return {
                "error": f"File '{pcap_file}' is not a supported PCAP file (.pcap/.pcapng/.cap)",
                "pcap_file": pcap_file,
            }
    
        try:
            return self._analyze_protocol_file(pcap_file)
        except Exception as e:
            return {
                "error": f"Failed to analyze PCAP file '{pcap_file}': {str(e)}",
                "pcap_file": pcap_file,
            }
    
    def _download_pcap_file(self, pcap_url: str, local_path: str) -> str:
        """Download a remote PCAP file to local storage.
    
        Args:
            pcap_url: URL of the PCAP file to download
            local_path: Local path to save the file
    
        Returns:
            Local path to the downloaded file
        """
        import requests
    
        try:
            response = requests.get(pcap_url, timeout=60, stream=True)
            response.raise_for_status()
    
            os.makedirs(os.path.dirname(local_path), exist_ok=True)
    
            with open(local_path, "wb") as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
    
            return local_path
    
        except requests.RequestException as e:
            raise ValueError(
                f"Failed to download PCAP file '{pcap_url}': {str(e)}"
            ) from e
Behavior4/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations exist, so the description carries the full burden. It discloses the critical limitation that Claude-uploaded files are not supported and specifies allowed input sources. This is a significant behavioral trait beyond what the schema reveals. A slight deduction for not mentioning any network access or performance implications.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is well-organized with clear sections (limitations, supported formats, args, returns). Every sentence adds value, though it is slightly verbose. Minor redundancy in the 'FILE UPLOAD LIMITATION' and 'UNSUPPORTED' sections could be merged.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness4/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the simple tool (one parameter, single purpose, has output schema), the description covers purpose, input constraints, parameter semantics, and return type. Sibling differentiation is clear through protocol specificity. A perfect score would require more detail on output structure or error handling, but the output schema covers the former.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema coverage is 0%, and the description adds meaningful semantics: it explains the pcap_file parameter accepts HTTP URLs or absolute local paths, and lists unsupported formats. This compensates for the bare schema. However, details like file size limits or encoding expectations are absent.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description explicitly states 'Analyze SIP packets from a PCAP file and return structured signaling details,' which clearly identifies the specific verb, resource, and scope. This distinguishes it from sibling tools like analyze_dns_packets or analyze_dhcp_packets.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines5/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides explicit when-to-use (SIP analysis) and when-not-to-use (files uploaded via Claude's web interface, base64, relative paths). It also lists supported formats and alternatives (other protocol analyzers as siblings are implied).

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mcpcap/mcpacket'

If you have feedback or need assistance with the MCP directory API, please join our Discord server