Skip to main content
Glama
desvert
by desvert

parse_bacnet_pcap

Extract and decode BACnet/IP packets from PCAP files to analyze industrial network traffic and identify connected devices.

Instructions

Parse BACnet/IP traffic from a PCAP and return decoded packets plus a basic device inventory.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pcap_pathYes
packet_limitNo

Implementation Reference

  • MCP tool registration point: The @mcp.tool() decorator registers parse_bacnet_pcap as an MCP tool. This async handler wraps the synchronous implementation using asyncio.to_thread.
    @mcp.tool()
    async def parse_bacnet_pcap(pcap_path: str, packet_limit: int = 500) -> dict:
        """Parse BACnet/IP traffic from a PCAP and return decoded packets plus a basic device inventory."""
        return await asyncio.to_thread(parse_bacnet_with_inventory, pcap_path=pcap_path, packet_limit=packet_limit)
  • Core implementation: The parse_bacnet_pcap function contains the main BACnet/IP parsing logic. It uses pyshark to read PCAP files, filters for BACnet/BVLC traffic, and extracts packet fields into BacnetPacket objects.
    def parse_bacnet_pcap(pcap_path: str, packet_limit: int = 500) -> BacnetParseResult:
        """Parse BACnet/IP traffic from a PCAP file."""
        ensure_tshark()
        path = validate_pcap_path(pcap_path)
    
        capture = pyshark.FileCapture(
            str(path),
            display_filter="bacnet or bvlc",
            keep_packets=False,
            use_json=True,
            include_raw=False,
            tshark_path="tshark",
        )
    
        packets: list[BacnetPacket] = []
        notes: list[str] = []
        packet_count = 0
    
        try:
            for packet in capture:
                packet_count += 1
                if packet_count > packet_limit:
                    notes.append(f"Stopped after packet_limit={packet_limit} matching packets.")
                    break
    
                bacnet = getattr(packet, "bacnet", None)
                bvlc = getattr(packet, "bvlc", None)
                ip = getattr(packet, "ip", None)
                udp = getattr(packet, "udp", None)
                if bacnet is None and bvlc is None:
                    continue
    
                packets.append(
                    BacnetPacket(
                        frame_number=int(packet.number),
                        timestamp=str(packet.sniff_time.isoformat()),
                        src_ip=safe_str(getattr(ip, "src", None)),
                        dst_ip=safe_str(getattr(ip, "dst", None)),
                        src_port=safe_int(getattr(udp, "srcport", None)),
                        dst_port=safe_int(getattr(udp, "dstport", None)),
                        bvlc_function=safe_str(_get_attr(bvlc, "function", "func")) or safe_str(_get_attr(bacnet, "bvlc_function")),
                        npdu_control=safe_str(_get_attr(bacnet, "control", "npdu_control")),
                        apdu_type=safe_str(_get_attr(bacnet, "apdu_type", "confirmed_service_request", "unconfirmed_service_request")),
                        service=safe_str(
                            _get_attr(
                                bacnet,
                                "confirmed_service_request",
                                "unconfirmed_service_request",
                                "confirmed_service_ack",
                                "service_choice",
                            )
                        ),
                        invoke_id=safe_int(_get_attr(bacnet, "invoke_id")),
                        object_type=safe_str(_get_attr(bacnet, "objectType", "object_type")),
                        object_instance=safe_int(_get_attr(bacnet, "instance_number", "object_instance")),
                        property_identifier=safe_str(_get_attr(bacnet, "propertyIdentifier", "property_identifier")),
                        raw_summary=safe_str(getattr(packet, "highest_layer", None)),
                    )
                )
        finally:
            capture.close()
    
        if not packets:
            notes.append("No BACnet packets were decoded by tshark. The capture may not contain BACnet/IP or tshark may lack the needed dissector.")
    
        return BacnetParseResult(
            metadata=ToolMetadata(pcap_path=str(path), packet_count=packet_count, notes=notes),
            packets=packets,
        )
  • Schema definitions: BacnetPacket (lines 38-53) defines the structure for individual BACnet packets with fields like frame_number, timestamps, IPs, ports, and BACnet-specific attributes. BacnetParseResult (lines 56-59) wraps packets with metadata.
    class BacnetPacket(BaseModel):
        frame_number: int
        timestamp: str
        src_ip: str | None = None
        dst_ip: str | None = None
        src_port: int | None = None
        dst_port: int | None = None
        bvlc_function: str | None = None
        npdu_control: str | None = None
        apdu_type: str | None = None
        service: str | None = None
        invoke_id: int | None = None
        object_type: str | None = None
        object_instance: int | None = None
        property_identifier: str | None = None
        raw_summary: str | None = None
    
    
    class BacnetParseResult(BaseModel):
        protocol: Literal["bacnet"] = "bacnet"
        metadata: ToolMetadata
        packets: list[BacnetPacket] = Field(default_factory=list)
  • Helper function: parse_bacnet_with_inventory wraps the core parser and adds device inventory analysis, returning both parsed results and discovered devices.
    def parse_bacnet_with_inventory(pcap_path: str, packet_limit: int = 500) -> dict:
        result = parse_bacnet_pcap(pcap_path=pcap_path, packet_limit=packet_limit)
        inventory = inventory_from_bacnet(result.packets)
        return {
            "result": result.model_dump(),
            "device_inventory": inventory.model_dump(),
        }
  • Device inventory helper: inventory_from_bacnet extracts device information from BACnet packets, identifying IPs, ports, and service hints for device discovery.
    def inventory_from_bacnet(packets: list[BacnetPacket]) -> DeviceInventory:
        table: dict[str, dict[str, set]] = defaultdict(lambda: {"protocols": set(), "ports": set(), "hints": set()})
        for pkt in packets:
            for ip, port, role in ((pkt.src_ip, pkt.src_port, "bacnet_sender"), (pkt.dst_ip, pkt.dst_port, "bacnet_listener")):
                if not ip:
                    continue
                row = table[ip]
                row["protocols"].add("bacnet")
                if port:
                    row["ports"].add(int(port))
                row["hints"].add(role)
                if pkt.service:
                    row["hints"].add(pkt.service)
        return DeviceInventory(
            devices=[
                DeviceRecord(
                    ip=ip,
                    protocols=sorted(info["protocols"]),
                    ports=sorted(info["ports"]),
                    hints=sorted(info["hints"]),
                )
                for ip, info in sorted(table.items())
            ]
        )
Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/desvert/otparse-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server