Skip to main content
Glama
danohn

mcpacket

by danohn
dhcp.py13.2 kB
"""DHCP analysis module.""" from typing import Any from fastmcp import FastMCP from scapy.all import BOOTP, DHCP, IP, UDP, rdpcap from .base import BaseModule class DHCPModule(BaseModule): """Module for analyzing DHCP packets in PCAP files.""" @property def protocol_name(self) -> str: """Return the name of the protocol this module analyzes.""" return "DHCP" def analyze_dhcp_packets(self, pcap_file: str) -> dict[str, Any]: """ Analyze DHCP packets from a PCAP file and return comprehensive analysis results. ⚠️ FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through Claude's web interface. Files must be accessible via URL or local file path. SUPPORTED INPUT FORMATS: - Remote files: "https://example.com/capture.pcap" - Local files: "/absolute/path/to/capture.pcap" UNSUPPORTED: - Files uploaded through Claude's file upload feature - Base64 file content - Relative file paths Args: pcap_file: HTTP URL or absolute local file path to PCAP file Returns: A structured dictionary containing DHCP packet analysis results """ return self.analyze_packets(pcap_file) def _analyze_protocol_file(self, pcap_file: str) -> dict[str, Any]: """Perform the actual DHCP packet analysis on a local PCAP file.""" try: packets = rdpcap(pcap_file) dhcp_packets = [pkt for pkt in packets if pkt.haslayer(BOOTP)] if not dhcp_packets: return { "file": pcap_file, "total_packets": len(packets), "dhcp_packets_found": 0, "message": "No DHCP packets found in this capture", } # Apply max_packets limit if specified packets_to_analyze = dhcp_packets limited = False if self.config.max_packets and len(dhcp_packets) > self.config.max_packets: packets_to_analyze = dhcp_packets[: self.config.max_packets] limited = True packet_details = [] for i, pkt in enumerate(packets_to_analyze, 1): packet_info = self._analyze_dhcp_packet(pkt, i) packet_details.append(packet_info) # Generate statistics stats = self._generate_statistics(packet_details) result = { "file": pcap_file, "total_packets": len(packets), "dhcp_packets_found": len(dhcp_packets), "dhcp_packets_analyzed": len(packet_details), "statistics": stats, "packets": packet_details, } # Add information about packet limiting if limited: result["note"] = ( f"Analysis limited to first {self.config.max_packets} DHCP packets due to --max-packets setting" ) return result except Exception as e: return { "error": f"Error reading PCAP file '{pcap_file}': {str(e)}", "file": pcap_file, } def _analyze_dhcp_packet(self, packet, packet_num: int) -> dict[str, Any]: """Analyze a single DHCP packet and extract relevant information.""" info = { "packet_number": packet_num, "timestamp": packet.time, } # Basic IP information if packet.haslayer(IP): ip_layer = packet[IP] info.update( { "src_ip": ip_layer.src, "dst_ip": ip_layer.dst, } ) # UDP information if packet.haslayer(UDP): udp_layer = packet[UDP] info.update( { "src_port": udp_layer.sport, "dst_port": udp_layer.dport, } ) # BOOTP/DHCP information if packet.haslayer(BOOTP): bootp_layer = packet[BOOTP] info.update( { "op": "Request" if bootp_layer.op == 1 else "Reply", "transaction_id": f"0x{bootp_layer.xid:08x}", "client_ip": str(bootp_layer.ciaddr), "your_ip": str(bootp_layer.yiaddr), "server_ip": str(bootp_layer.siaddr), "relay_ip": str(bootp_layer.giaddr), "client_mac": bootp_layer.chaddr[:6].hex(":"), } ) # DHCP options analysis if packet.haslayer(DHCP): dhcp_layer = packet[DHCP] dhcp_info = self._parse_dhcp_options(dhcp_layer.options) info.update(dhcp_info) return info def _parse_dhcp_options(self, options) -> dict[str, Any]: """Parse DHCP options and return structured information.""" dhcp_info = {"options": {}} # DHCP message type mappings message_types = { 1: "DISCOVER", 2: "OFFER", 3: "REQUEST", 4: "DECLINE", 5: "ACK", 6: "NAK", 7: "RELEASE", 8: "INFORM", } for opt in options: if isinstance(opt, tuple) and len(opt) == 2: key, value = opt if key == "message-type": dhcp_info["message_type"] = message_types.get( value, f"Unknown({value})" ) dhcp_info["message_type_code"] = value elif key == "lease_time": dhcp_info["lease_time"] = value dhcp_info["options"]["lease_time"] = f"{value} seconds" elif key == "renewal_time": dhcp_info["renewal_time"] = value dhcp_info["options"]["renewal_time"] = f"{value} seconds" elif key == "rebinding_time": dhcp_info["rebinding_time"] = value dhcp_info["options"]["rebinding_time"] = f"{value} seconds" elif key == "server_id": dhcp_info["server_id"] = str(value) elif key == "subnet_mask": dhcp_info["options"]["subnet_mask"] = str(value) elif key == "router": dhcp_info["options"]["router"] = str(value) elif key == "name_server": dhcp_info["options"]["dns_servers"] = str(value) elif key == "requested_addr": dhcp_info["requested_ip"] = str(value) elif key == "client_id": if isinstance(value, bytes): dhcp_info["client_id"] = value.hex(":") else: dhcp_info["client_id"] = str(value) elif key == "param_req_list": dhcp_info["options"]["parameter_request_list"] = list(value) else: # Store other options as strings dhcp_info["options"][key] = str(value) return dhcp_info def _generate_statistics(self, packets: list[dict[str, Any]]) -> dict[str, Any]: """Generate statistics from analyzed DHCP packets.""" stats = { "unique_clients": set(), "unique_servers": set(), "message_types": {}, "transactions": {}, } for pkt in packets: # Count unique clients and servers if "client_mac" in pkt: stats["unique_clients"].add(pkt["client_mac"]) if "server_id" in pkt: stats["unique_servers"].add(pkt["server_id"]) # Count message types if "message_type" in pkt: msg_type = pkt["message_type"] stats["message_types"][msg_type] = ( stats["message_types"].get(msg_type, 0) + 1 ) # Track transactions if "transaction_id" in pkt: tx_id = pkt["transaction_id"] if tx_id not in stats["transactions"]: stats["transactions"][tx_id] = [] stats["transactions"][tx_id].append( { "packet_number": pkt["packet_number"], "message_type": pkt.get("message_type", "Unknown"), "timestamp": pkt["timestamp"], } ) # Convert sets to counts for JSON serialization return { "unique_clients_count": len(stats["unique_clients"]), "unique_servers_count": len(stats["unique_servers"]), "unique_clients": list(stats["unique_clients"]), "unique_servers": list(stats["unique_servers"]), "message_type_counts": stats["message_types"], "transaction_count": len(stats["transactions"]), "transactions": stats["transactions"], } def setup_prompts(self, mcp: FastMCP) -> None: """Set up DHCP-specific analysis prompts for the MCP server. Args: mcp: FastMCP server instance """ @mcp.prompt def dhcp_network_analysis(): """Prompt for analyzing DHCP traffic from a network perspective""" return """You are a network administrator analyzing DHCP traffic. Focus your analysis on: 1. **IP Address Management:** - Track DHCP lease assignments and renewals - Identify IP address pool usage and availability - Look for lease conflicts or duplicate IP assignments - Monitor lease duration patterns and optimization 2. **Client Behavior Analysis:** - Identify DHCP client types and operating systems - Track client renewal patterns and timing - Look for unusual client behavior (rapid requests, failures) - Monitor mobile devices vs. static clients 3. **Server Performance:** - Analyze DHCP server response times - Look for server failures or timeouts - Check for multiple DHCP servers (potential conflicts) - Monitor server load and capacity planning 4. **Network Troubleshooting:** - Identify DHCP DISCOVER floods or storms - Look for relay agent issues - Check for scope exhaustion problems - Find misconfigurations in options delivery Provide specific recommendations for network optimization and problem resolution.""" @mcp.prompt def dhcp_security_analysis(): """Prompt for analyzing DHCP traffic from a security perspective""" return """You are a security analyst examining DHCP traffic for threats. Focus on: 1. **Rogue DHCP Detection:** - Identify unauthorized DHCP servers on the network - Look for conflicting server responses or options - Check for suspicious server_id values - Monitor for DHCP server impersonation attempts 2. **DHCP Attacks:** - Detect DHCP starvation attacks (rapid lease consumption) - Look for DHCP spoofing or man-in-the-middle attempts - Identify abnormal request patterns or frequencies - Check for MAC address spoofing in client requests 3. **Client Anomalies:** - Find clients with suspicious hostnames or identifiers - Look for rapid MAC address changes from single sources - Detect clients requesting unusual or dangerous options - Monitor for clients bypassing expected DHCP flow 4. **Data Exfiltration Risks:** - Check for unusual data in DHCP option fields - Look for DNS server redirection attempts - Monitor for suspicious router or gateway assignments - Identify potential DNS tunneling via DHCP options Provide threat assessment and recommended security controls.""" @mcp.prompt def dhcp_forensic_investigation(): """Prompt for forensic analysis of DHCP traffic""" return """You are conducting a digital forensics investigation involving DHCP traffic. Approach systematically: 1. **Timeline Reconstruction:** - Create chronological sequence of DHCP transactions - Map client activity periods and network presence - Correlate DHCP assignments with incident timeframes - Track device mobility through IP address changes 2. **Device Attribution:** - Link MAC addresses to specific devices or users - Track device behavior patterns across time - Identify device types through DHCP fingerprinting - Document lease history for evidence correlation 3. **Network Mapping:** - Reconstruct network topology from DHCP data - Identify network segments and VLAN assignments - Map DHCP server infrastructure and relationships - Document network configuration at incident time 4. **Evidence Collection:** - Extract client identifiers and hostnames - Document IP address assignments for legal proceedings - Preserve transaction timing for event correlation - Collect option data that may contain forensic artifacts Present findings with precise timestamps, evidence preservation notes, and clear documentation suitable for legal proceedings."""

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danohn/mcpacket'

If you have feedback or need assistance with the MCP directory API, please join our Discord server