parse_bacnet_pcap
Extract and decode BACnet/IP packets from PCAP files to analyze industrial network traffic and identify connected devices.
Instructions
Parse BACnet/IP traffic from a PCAP and return decoded packets plus a basic device inventory.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| pcap_path | Yes | ||
| packet_limit | No |
Implementation Reference
- otparse/server.py:19-22 (registration)MCP tool registration point: The @mcp.tool() decorator registers parse_bacnet_pcap as an MCP tool. This async handler wraps the synchronous implementation using asyncio.to_thread.
@mcp.tool() async def parse_bacnet_pcap(pcap_path: str, packet_limit: int = 500) -> dict: """Parse BACnet/IP traffic from a PCAP and return decoded packets plus a basic device inventory.""" return await asyncio.to_thread(parse_bacnet_with_inventory, pcap_path=pcap_path, packet_limit=packet_limit) - otparse/parsers/bacnet.py:17-85 (handler)Core implementation: The parse_bacnet_pcap function contains the main BACnet/IP parsing logic. It uses pyshark to read PCAP files, filters for BACnet/BVLC traffic, and extracts packet fields into BacnetPacket objects.
def parse_bacnet_pcap(pcap_path: str, packet_limit: int = 500) -> BacnetParseResult: """Parse BACnet/IP traffic from a PCAP file.""" ensure_tshark() path = validate_pcap_path(pcap_path) capture = pyshark.FileCapture( str(path), display_filter="bacnet or bvlc", keep_packets=False, use_json=True, include_raw=False, tshark_path="tshark", ) packets: list[BacnetPacket] = [] notes: list[str] = [] packet_count = 0 try: for packet in capture: packet_count += 1 if packet_count > packet_limit: notes.append(f"Stopped after packet_limit={packet_limit} matching packets.") break bacnet = getattr(packet, "bacnet", None) bvlc = getattr(packet, "bvlc", None) ip = getattr(packet, "ip", None) udp = getattr(packet, "udp", None) if bacnet is None and bvlc is None: continue packets.append( BacnetPacket( frame_number=int(packet.number), timestamp=str(packet.sniff_time.isoformat()), src_ip=safe_str(getattr(ip, "src", None)), dst_ip=safe_str(getattr(ip, "dst", None)), src_port=safe_int(getattr(udp, "srcport", None)), dst_port=safe_int(getattr(udp, "dstport", None)), bvlc_function=safe_str(_get_attr(bvlc, "function", "func")) or safe_str(_get_attr(bacnet, "bvlc_function")), npdu_control=safe_str(_get_attr(bacnet, "control", "npdu_control")), apdu_type=safe_str(_get_attr(bacnet, "apdu_type", "confirmed_service_request", "unconfirmed_service_request")), service=safe_str( _get_attr( bacnet, "confirmed_service_request", "unconfirmed_service_request", "confirmed_service_ack", "service_choice", ) ), invoke_id=safe_int(_get_attr(bacnet, "invoke_id")), object_type=safe_str(_get_attr(bacnet, "objectType", "object_type")), object_instance=safe_int(_get_attr(bacnet, "instance_number", "object_instance")), property_identifier=safe_str(_get_attr(bacnet, "propertyIdentifier", "property_identifier")), raw_summary=safe_str(getattr(packet, "highest_layer", None)), ) ) finally: capture.close() if not packets: notes.append("No BACnet packets were decoded by tshark. The capture may not contain BACnet/IP or tshark may lack the needed dissector.") return BacnetParseResult( metadata=ToolMetadata(pcap_path=str(path), packet_count=packet_count, notes=notes), packets=packets, ) - otparse/models.py:38-59 (schema)Schema definitions: BacnetPacket (lines 38-53) defines the structure for individual BACnet packets with fields like frame_number, timestamps, IPs, ports, and BACnet-specific attributes. BacnetParseResult (lines 56-59) wraps packets with metadata.
class BacnetPacket(BaseModel): frame_number: int timestamp: str src_ip: str | None = None dst_ip: str | None = None src_port: int | None = None dst_port: int | None = None bvlc_function: str | None = None npdu_control: str | None = None apdu_type: str | None = None service: str | None = None invoke_id: int | None = None object_type: str | None = None object_instance: int | None = None property_identifier: str | None = None raw_summary: str | None = None class BacnetParseResult(BaseModel): protocol: Literal["bacnet"] = "bacnet" metadata: ToolMetadata packets: list[BacnetPacket] = Field(default_factory=list) - otparse/parsers/bacnet.py:88-94 (helper)Helper function: parse_bacnet_with_inventory wraps the core parser and adds device inventory analysis, returning both parsed results and discovered devices.
def parse_bacnet_with_inventory(pcap_path: str, packet_limit: int = 500) -> dict: result = parse_bacnet_pcap(pcap_path=pcap_path, packet_limit=packet_limit) inventory = inventory_from_bacnet(result.packets) return { "result": result.model_dump(), "device_inventory": inventory.model_dump(), } - otparse/analyzers/devices.py:32-55 (helper)Device inventory helper: inventory_from_bacnet extracts device information from BACnet packets, identifying IPs, ports, and service hints for device discovery.
def inventory_from_bacnet(packets: list[BacnetPacket]) -> DeviceInventory: table: dict[str, dict[str, set]] = defaultdict(lambda: {"protocols": set(), "ports": set(), "hints": set()}) for pkt in packets: for ip, port, role in ((pkt.src_ip, pkt.src_port, "bacnet_sender"), (pkt.dst_ip, pkt.dst_port, "bacnet_listener")): if not ip: continue row = table[ip] row["protocols"].add("bacnet") if port: row["ports"].add(int(port)) row["hints"].add(role) if pkt.service: row["hints"].add(pkt.service) return DeviceInventory( devices=[ DeviceRecord( ip=ip, protocols=sorted(info["protocols"]), ports=sorted(info["ports"]), hints=sorted(info["hints"]), ) for ip, info in sorted(table.items()) ] )