analyze_dns_packets
Analyze DNS traffic in PCAP files to extract structured insights about domain queries, responses, and network behavior from local or remote packet captures.
Instructions
Analyze DNS packets from a PCAP file and return comprehensive analysis results.
Args: pcap_file: Path to local PCAP file or HTTP URL to remote PCAP file
Returns: A structured dictionary containing DNS packet analysis results
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| pcap_file | Yes |
Implementation Reference
- src/mcpcap/modules/dns.py:20-42 (handler)The primary handler function for the 'analyze_dns_packets' tool. Defines input (pcap_file: str) and output (dict[str, Any]), includes detailed documentation on supported formats, and delegates to the base analyze_packets method for execution.def analyze_dns_packets(self, pcap_file: str) -> dict[str, Any]: """ Analyze DNS packets from a PCAP file and return comprehensive analysis results. ⚠️ FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through Claude's web interface. Files must be accessible via URL or local file path. SUPPORTED INPUT FORMATS: - Remote files: "https://example.com/capture.pcap" - Local files: "/absolute/path/to/capture.pcap" UNSUPPORTED: - Files uploaded through Claude's file upload feature - Base64 file content - Relative file paths Args: pcap_file: HTTP URL or absolute local file path to PCAP file Returns: A structured dictionary containing DNS packet analysis results """ return self.analyze_packets(pcap_file)
- src/mcpcap/core/server.py:43-55 (registration)The registration block where the analyze_dns_packets method from the DNSModule is registered as an MCP tool using self.mcp.tool().def _register_tools(self) -> None: """Register all available tools with the MCP server.""" # Register tools for each loaded module for module_name, module in self.modules.items(): if module_name == "dns": self.mcp.tool(module.analyze_dns_packets) elif module_name == "dhcp": self.mcp.tool(module.analyze_dhcp_packets) elif module_name == "icmp": self.mcp.tool(module.analyze_icmp_packets) elif module_name == "capinfos": self.mcp.tool(module.analyze_capinfos)
- src/mcpcap/modules/dns.py:44-96 (helper)Key helper method implementing the core DNS analysis logic: loads PCAP with scapy, filters DNS packets, limits if needed, analyzes each packet, generates stats, and structures the output.def _analyze_protocol_file(self, pcap_file: str) -> dict[str, Any]: """Perform the actual DNS packet analysis on a local PCAP file.""" try: packets = rdpcap(pcap_file) dns_packets = [pkt for pkt in packets if pkt.haslayer(DNS)] if not dns_packets: return { "file": pcap_file, "total_packets": len(packets), "dns_packets_found": 0, "message": "No DNS packets found in this capture", } # Apply max_packets limit if specified packets_to_analyze = dns_packets limited = False if self.config.max_packets and len(dns_packets) > self.config.max_packets: packets_to_analyze = dns_packets[: self.config.max_packets] limited = True packet_details = [] for i, pkt in enumerate(packets_to_analyze, 1): packet_info = self._analyze_dns_packet(pkt, i) packet_details.append(packet_info) # Generate statistics stats = self._generate_statistics(packet_details) result = { "file": pcap_file, "analysis_timestamp": datetime.now().isoformat(), "total_packets_in_file": len(packets), "dns_packets_found": len(dns_packets), "dns_packets_analyzed": len(packet_details), "statistics": stats, "packets": packet_details, } # Add information about packet limiting if limited: result["note"] = ( f"Analysis limited to first {self.config.max_packets} DNS packets due to --max-packets setting" ) return result except Exception as e: return { "error": f"Error reading PCAP file '{pcap_file}': {str(e)}", "file": pcap_file, }
- src/mcpcap/modules/base.py:43-56 (helper)Base class helper called by the handler; handles downloading remote PCAPs to temp files and validates/calls protocol-specific analysis for local files.def analyze_packets(self, pcap_file: str) -> dict[str, Any]: """Analyze packets from a PCAP file (local or remote). Args: pcap_file: Path to local PCAP file or HTTP URL to remote PCAP file Returns: A structured dictionary containing packet analysis results """ # Check if this is a remote URL or local file if pcap_file.startswith(("http://", "https://")): return self._handle_remote_analysis(pcap_file) else: return self._handle_local_analysis(pcap_file)
- src/mcpcap/modules/dns.py:97-223 (helper)Helper function that parses individual DNS packets, extracting IP info, protocol, questions, answers (with type-specific handling for A, AAAA, CNAME, MX), flags, and timestamps.def _analyze_dns_packet(self, pkt: Any, packet_number: int) -> dict[str, Any]: """Analyze a single DNS packet. Args: pkt: Scapy packet object packet_number: Packet sequence number Returns: Dictionary containing packet analysis """ dns_layer = pkt[DNS] # Extract IP information src_ip = dst_ip = "unknown" if pkt.haslayer(IP): src_ip = pkt[IP].src dst_ip = pkt[IP].dst elif pkt.haslayer(IPv6): src_ip = pkt[IPv6].src dst_ip = pkt[IPv6].dst # Extract protocol (UDP/TCP) protocol = "unknown" if pkt.haslayer(UDP): protocol = "UDP" elif pkt.haslayer(TCP): protocol = "TCP" # Extract DNS questions questions = [] if dns_layer.qd: for q in dns_layer.qd: try: name = ( q.qname.decode().rstrip(".") if hasattr(q.qname, "decode") else str(q.qname).rstrip(".") ) questions.append( { "name": name, "type": getattr(q, "qtype", 0), "class": getattr(q, "qclass", 0), } ) except (AttributeError, UnicodeDecodeError) as e: # Skip malformed questions but log the issue questions.append( { "name": f"<parsing_error: {str(e)}>", "type": getattr(q, "qtype", 0), "class": getattr(q, "qclass", 0), } ) # Extract DNS answers answers = [] if dns_layer.an: for a in dns_layer.an: try: # Safely extract the resource record name if hasattr(a, "rrname"): if hasattr(a.rrname, "decode"): name = a.rrname.decode().rstrip(".") else: name = str(a.rrname).rstrip(".") else: name = "<unknown>" answer_data = { "name": name, "type": getattr(a, "type", 0), "class": getattr(a, "rclass", 0), "ttl": getattr(a, "ttl", 0), } # Handle different answer types if hasattr(a, "rdata"): try: if a.type == 1: # A record answer_data["address"] = str(a.rdata) elif a.type == 28: # AAAA record answer_data["address"] = str(a.rdata) elif a.type == 5: # CNAME answer_data["cname"] = str(a.rdata).rstrip(".") elif a.type == 15: # MX answer_data["mx"] = str(a.rdata) else: answer_data["data"] = str(a.rdata) except Exception as rdata_error: answer_data["data"] = ( f"<rdata_parsing_error: {str(rdata_error)}>" ) answers.append(answer_data) except (AttributeError, UnicodeDecodeError) as e: # Skip malformed answers but include error info answers.append( { "name": f"<parsing_error: {str(e)}>", "type": getattr(a, "type", 0), "class": getattr(a, "rclass", 0), "ttl": getattr(a, "ttl", 0), "data": "<malformed_record>", } ) return { "packet_number": packet_number, "timestamp": datetime.fromtimestamp(float(pkt.time)).isoformat(), "source_ip": src_ip, "destination_ip": dst_ip, "protocol": protocol, "dns_id": dns_layer.id, "flags": { "is_response": bool(dns_layer.qr), "authoritative": bool(dns_layer.aa), "truncated": bool(dns_layer.tc), "recursion_desired": bool(dns_layer.rd), "recursion_available": bool(dns_layer.ra), }, "questions": questions, "answers": answers, "summary": pkt.summary(), }