Skip to main content
Glama

Security MCP Server

by nordeim
implementation_plan_dig_tool.md25.4 kB
# DNS Dig Tool - Deep Analysis & Meticulous Implementation Plan ## Executive Summary **Tool:** DNS Dig (Domain Information Groper) **Purpose:** DNS query and enumeration tool for authorized internal networks **Risk Level:** 🟢 **LOW** (read-only information gathering) **Complexity:** 🟡 **MEDIUM** (unique syntax patterns) **Estimated LOC:** ~650 lines **Key Insight:** Dig is fundamentally different from attack tools (nmap/hydra/sqlmap) - it's read-only DNS information gathering, making it lower risk and suitable for higher concurrency. --- ## Phase 1: Deep Architecture Analysis ### 1.1 Dig Command Structure Understanding ```bash # Basic syntax dig [OPTIONS] [@SERVER] [DOMAIN] [TYPE] # Examples: dig example.lab.internal # Query default DNS dig @192.168.1.1 example.lab.internal # Query specific DNS server dig example.lab.internal A # Query A records dig example.lab.internal MX +short # Mail servers, short output dig -x 192.168.1.10 # Reverse DNS lookup dig @192.168.1.1 example.lab.internal ANY +trace # Trace with all records dig example.lab.internal TXT +noall +answer # Clean TXT output ``` **Component Breakdown:** 1. **@SERVER** - DNS server to query (optional, special syntax) 2. **OPTIONS** - Flags like `-x`, `-4`, `-6`, `-t`, `-p` 3. **+QUERY_OPTIONS** - Dig-specific options like `+short`, `+trace`, `+dnssec` 4. **DOMAIN** - Domain name to query 5. **TYPE** - Record type (A, MX, TXT, etc.) ### 1.2 Unique Syntax Challenges **Challenge 1: @ DNS Server Prefix** ```python # Input: "@192.168.1.1 example.lab.internal" # @192.168.1.1 is NOT a flag, it's a server specification # Must parse separately from flags ``` **Challenge 2: + Query Options** ```python # Input: "+short +noall +answer" # +option is dig-specific, not standard flags # Must parse separately and validate against whitelist ``` **Challenge 3: Positional Arguments** ```python # Input: "example.lab.internal MX" # Both are non-flag tokens # Domain is target (already validated by base class) # MX is record type (must validate) ``` ### 1.3 Target Validation Strategy **Scenario 1: Forward Lookup** ```python ToolInput( target="server.lab.internal", # Domain (base validates .lab.internal) extra_args="A +short" # Record type and options ) # ✅ Base class validates .lab.internal ``` **Scenario 2: Reverse Lookup** ```python ToolInput( target="192.168.1.10", # IP address (base validates RFC1918) extra_args="-x" # Reverse lookup flag ) # ✅ Base class validates RFC1918 ``` **Scenario 3: Custom DNS Server** ```python ToolInput( target="server.lab.internal", extra_args="@192.168.1.1 MX" # Query specific DNS server ) # ✅ Base validates target # ✅ Tool validates @192.168.1.1 is RFC1918 or .lab.internal ``` **Decision:** Use base class validation for target, add custom validation for @SERVER specification. ### 1.4 Security Model **Low Risk Justification:** - DNS queries are read-only - Cannot modify target systems - Cannot cause DoS (with reasonable timeout) - No code execution - Standard information gathering **Security Controls:** 1. **DNS Server Restriction:** Only query RFC1918 or .lab.internal DNS servers 2. **Target Restriction:** Only query .lab.internal domains or RFC1918 IPs (PTR) 3. **Record Type Validation:** Whitelist of safe record types (all standard types) 4. **Query Option Validation:** Whitelist of safe +options 5. **Timeout Enforcement:** Prevent hanging queries **No Restrictions Needed:** - All record types are safe (A, MX, TXT, etc.) - All query options are safe (+short, +trace, etc.) - All standard flags are safe (-x, -4, -6, etc.) --- ## Phase 2: Detailed Component Design ### 2.1 Class Structure ```python class DigTool(MCPBaseTool): """ DNS query tool for authorized internal networks. Dig is a DNS lookup utility for querying DNS nameservers. This wrapper provides safe DNS enumeration for internal networks. Security Model: - Target must be .lab.internal domain or RFC1918 IP - DNS servers (if specified with @) must be RFC1918 or .lab.internal - All standard DNS record types allowed (read-only) - All dig query options allowed (information gathering) """ command_name: ClassVar[str] = "dig" # Allowed DNS record types (all standard types - all safe) ALLOWED_RECORD_TYPES = frozenset([ 'A', 'AAAA', 'MX', 'NS', 'TXT', 'SOA', 'CNAME', 'PTR', 'SRV', 'CAA', 'ANY', 'DNSKEY', 'DS', 'RRSIG', 'NSEC', 'NSEC3', 'TLSA', 'SPF', 'NAPTR', 'HINFO', 'RP' ]) # Allowed query options (+ prefix) ALLOWED_QUERY_OPTIONS = frozenset([ # Output format 'short', 'noall', 'answer', 'authority', 'additional', 'question', 'stats', 'nostats', 'comments', 'nocomments', # Query behavior 'trace', 'notrace', 'recurse', 'norecurse', 'dnssec', 'nodnssec', 'nsid', 'nonsid', # Protocol 'tcp', 'notcp', 'vc', 'novc', 'ignore', 'noignore', # Advanced 'aaflag', 'noaaflag', 'adflag', 'noadflag', 'cdflag', 'nocdflag', 'qr', 'noqr', # Timing 'time', 'retry', 'ndots', 'bufsize', 'edns', # Misc 'multiline', 'nomultiline', 'onesoa', 'noonesoa' ]) # Standard flags allowed_flags: ClassVar[Sequence[str]] = [ '-x', # Reverse lookup '-4', # IPv4 only '-6', # IPv6 only '-t', # Type specification '-c', # Class specification '-p', # Port number '-b', # Bind to source address '-f', # Read from file (batch mode) '-q', # Query name '-v', # Verbose '-V', # Version ] _FLAGS_REQUIRE_VALUE = frozenset({ '-t', '-c', '-p', '-b', '-f', '-q' }) # Timeouts (DNS queries are quick) default_timeout_sec: ClassVar[float] = 30.0 # Concurrency (DNS queries are low-impact, allow higher concurrency) concurrency: ClassVar[int] = 5 # Circuit breaker (lenient for low-risk tool) circuit_breaker_failure_threshold: ClassVar[int] = 5 circuit_breaker_recovery_timeout: ClassVar[float] = 60.0 # Compiled patterns for parsing _DNS_SERVER_PATTERN = re.compile(r'^@([\w\.\-]+)$') _ANSWER_SECTION_PATTERN = re.compile( r'^([^\s]+)\s+(\d+)\s+IN\s+([A-Z]+)\s+(.+)$', re.MULTILINE ) _QUERY_TIME_PATTERN = re.compile(r'Query time:\s+(\d+)\s+msec') _SERVER_PATTERN = re.compile(r'SERVER:\s+([\d\.]+)#(\d+)') ``` ### 2.2 Initialization Pattern ```python def __init__(self): """Initialize Dig tool with configuration.""" super().__init__() self.config = get_config() self._apply_config() log.info("dig_tool.initialized timeout=%.1f concurrency=%d", self.default_timeout_sec, self.concurrency) def _apply_config(self): """Apply configuration with safe clamping.""" try: # Circuit breaker if hasattr(self.config, 'circuit_breaker') and self.config.circuit_breaker: cb = self.config.circuit_breaker if hasattr(cb, 'failure_threshold'): self.circuit_breaker_failure_threshold = max(1, min(10, int(cb.failure_threshold))) if hasattr(cb, 'recovery_timeout'): self.circuit_breaker_recovery_timeout = max(30.0, min(300.0, float(cb.recovery_timeout))) # Tool config if hasattr(self.config, 'tool') and self.config.tool: if hasattr(self.config.tool, 'default_timeout'): self.default_timeout_sec = max(5.0, min(300.0, float(self.config.tool.default_timeout))) if hasattr(self.config.tool, 'default_concurrency'): self.concurrency = max(1, min(10, int(self.config.tool.default_concurrency))) log.debug("dig.config_applied timeout=%.1f concurrency=%d", self.default_timeout_sec, self.concurrency) except Exception as e: log.error("dig.config_failed error=%s using_defaults", str(e)) # Safe defaults self.circuit_breaker_failure_threshold = 5 self.circuit_breaker_recovery_timeout = 60.0 self.default_timeout_sec = 30.0 self.concurrency = 5 ``` ### 2.3 Execution Flow ```python async def _execute_tool(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: """Execute dig with DNS-specific validation.""" # Step 1: Dig-specific validation validation_error = self._validate_dig_requirements(inp) if validation_error: return validation_error # Step 2: Parse and validate arguments try: validated_args = self._parse_and_validate_args(inp.extra_args or "", inp.target) except ValueError as e: error_context = ErrorContext(...) return self._create_error_output(error_context, inp.correlation_id or "") # Step 3: Optimize arguments optimized_args = self._optimize_dig_args(validated_args) # Step 4: Execute enhanced_input = ToolInput( target=inp.target, extra_args=optimized_args, timeout_sec=timeout_sec or inp.timeout_sec or self.default_timeout_sec, correlation_id=inp.correlation_id ) log.info("dig.executing target=%s args=%s", inp.target, optimized_args) result = await super()._execute_tool(enhanced_input, enhanced_input.timeout_sec) # Step 5: Parse output if result.returncode == 0 and result.stdout: try: parsed = self._parse_dig_output(result.stdout) result.ensure_metadata() result.metadata['parsed'] = parsed result.metadata['answers_found'] = len(parsed.get('answers', [])) log.info("dig.completed target=%s answers=%d query_time=%s", inp.target, len(parsed.get('answers', [])), parsed.get('query_time')) except Exception as e: log.warning("dig.parse_failed error=%s", str(e)) return result ``` ### 2.4 Validation Methods ```python def _validate_dig_requirements(self, inp: ToolInput) -> Optional[ToolOutput]: """Validate dig-specific requirements.""" # Extract and validate DNS server if specified dns_server = self._extract_dns_server(inp.extra_args or "") if dns_server: if not self._is_authorized_dns_server(dns_server): return self._create_error_output( ErrorContext( error_type=ToolErrorType.VALIDATION_ERROR, message=f"DNS server not authorized: {dns_server}", recovery_suggestion=( "Use RFC1918 DNS server or .lab.internal DNS server:\n" " @192.168.1.1\n" " @10.0.0.1\n" " @dns.lab.internal" ), timestamp=self._get_timestamp(), tool_name=self.tool_name, target=inp.target, metadata={"dns_server": dns_server} ) ) log.debug("dig.dns_server_validated server=%s", dns_server) # Validate reverse lookup format if -x specified if "-x" in (inp.extra_args or ""): # Target should be an IP address for reverse lookup # Base class already validates RFC1918 log.debug("dig.reverse_lookup target=%s", inp.target) return None def _extract_dns_server(self, extra_args: str) -> Optional[str]: """Extract @SERVER from arguments.""" try: tokens = shlex.split(extra_args) for token in tokens: match = self._DNS_SERVER_PATTERN.match(token) if match: return match.group(1) except ValueError: pass return None def _is_authorized_dns_server(self, server: str) -> bool: """Validate DNS server is RFC1918 or .lab.internal.""" # Check .lab.internal if server.endswith('.lab.internal'): return True # Check RFC1918 try: import ipaddress ip = ipaddress.ip_address(server) return ip.version == 4 and ip.is_private except ValueError: # Not an IP, must be .lab.internal return False ``` ### 2.5 Argument Parsing (Handle @ and +) ```python def _parse_and_validate_args(self, extra_args: str, target: str) -> str: """ Parse and validate dig arguments. Handles three special cases: 1. @SERVER - DNS server specification 2. +OPTION - Query options 3. RECORD_TYPE - DNS record type (positional) """ if not extra_args: # Default to A record query return "A" tokens = shlex.split(extra_args) validated = [] for token in tokens: # Handle @SERVER if token.startswith("@"): match = self._DNS_SERVER_PATTERN.match(token) if not match: raise ValueError(f"Invalid DNS server format: {token}") # Already validated in _validate_dig_requirements validated.append(token) continue # Handle +OPTION if token.startswith("+"): option_name = token[1:].split("=")[0] # Handle +time=5 if option_name not in self.ALLOWED_QUERY_OPTIONS: raise ValueError( f"Query option not allowed: {token}\n" f"Allowed: +short, +trace, +dnssec, +noall, +answer, etc." ) validated.append(token) continue # Handle standard flags if token.startswith("-"): flag_base = token.split("=")[0] if flag_base not in self.allowed_flags: raise ValueError(f"Flag not allowed: {token}") validated.append(token) # Check if flag requires value (next token) if flag_base in self._FLAGS_REQUIRE_VALUE: # Value validation happens in next iteration or below pass continue # Non-flag token - could be: # 1. Record type (A, MX, TXT, etc.) # 2. Value for previous flag # 3. Domain name (but target is already validated by base) # Check if it's a record type if token.upper() in self.ALLOWED_RECORD_TYPES: validated.append(token.upper()) continue # Check if it's a value for previous flag if validated and validated[-1].startswith("-"): flag = validated[-1] if flag in self._FLAGS_REQUIRE_VALUE: # Validate specific flag values if flag == "-p": port = int(token) if not (1 <= port <= 65535): raise ValueError(f"Port must be 1-65535: {port}") validated.append(token) continue # Could be domain name (same as target) # Allow it but log log.debug("dig.additional_token token=%s", token) validated.append(token) return " ".join(validated) ``` ### 2.6 Optimization ```python def _optimize_dig_args(self, validated_args: str) -> str: """Add helpful defaults for cleaner output.""" tokens = shlex.split(validated_args) if validated_args else [] optimized = [] # Check what's present has_output_format = any( t in ('+short', '+noall', '+answer') for t in tokens ) has_record_type = any( t.upper() in self.ALLOWED_RECORD_TYPES for t in tokens ) # Add defaults if missing # Don't add output format by default - let dig use default verbose output # This is helpful for troubleshooting if not has_record_type: # Default to A record optimized.append("A") log.debug("dig.optimization added=record_type value=A") # Add original arguments optimized.extend(tokens) result = " ".join(optimized) if len(optimized) != len(tokens): log.info("dig.arguments_optimized original=%d optimized=%d", len(tokens), len(optimized)) return result ``` ### 2.7 Output Parsing ```python def _parse_dig_output(self, output: str) -> Dict[str, Any]: """ Parse dig output for DNS records. Dig output format: ; <<>> DiG 9.16.1 <<>> example.com A ;; ANSWER SECTION: example.com. 300 IN A 93.184.216.34 ;; Query time: 10 msec ;; SERVER: 8.8.8.8#53(8.8.8.8) """ results = { "answers": [], "query_time": None, "server": None, "flags": [], "status": None, } # Extract status if "status: NOERROR" in output: results["status"] = "NOERROR" elif "status: NXDOMAIN" in output: results["status"] = "NXDOMAIN" elif "status: SERVFAIL" in output: results["status"] = "SERVFAIL" # Extract flags flags_pattern = re.compile(r'flags:\s+([^;]+);') flags_match = flags_pattern.search(output) if flags_match: results["flags"] = flags_match.group(1).strip().split() # Extract answer section in_answer_section = False for line in output.split('\n'): if ";; ANSWER SECTION:" in line: in_answer_section = True continue if in_answer_section: if line.startswith(";;") or line.strip() == "": in_answer_section = False continue match = self._ANSWER_SECTION_PATTERN.match(line) if match: results["answers"].append({ "name": match.group(1), "ttl": int(match.group(2)), "type": match.group(3), "value": match.group(4) }) # Extract query time time_match = self._QUERY_TIME_PATTERN.search(output) if time_match: results["query_time"] = int(time_match.group(1)) # Extract server server_match = self._SERVER_PATTERN.search(output) if server_match: results["server"] = { "ip": server_match.group(1), "port": int(server_match.group(2)) } log.debug("dig.output_parsed answers=%d query_time=%s status=%s", len(results["answers"]), results["query_time"], results["status"]) return results ``` --- ## Phase 3: Implementation Validation Checklist ### 3.1 Framework Compliance - [ ] Extends MCPBaseTool correctly - [ ] Uses ClassVar for class variables - [ ] Calls super().__init__() first - [ ] Uses ErrorContext for all errors - [ ] Returns ToolOutput from all error paths - [ ] Structured logging (key=value) - [ ] Complete type hints - [ ] Comprehensive docstrings ### 3.2 Security Validation - [ ] No private variable access - [ ] DNS server authorization (@SERVER) - [ ] Target validation (base class) - [ ] Record type whitelist - [ ] Query option whitelist - [ ] Timeout enforcement - [ ] No shell injection vulnerabilities ### 3.3 Dig-Specific Features - [ ] @SERVER parsing and validation - [ ] +OPTION parsing and validation - [ ] Record type validation - [ ] Reverse lookup support (-x) - [ ] Output parsing (answers, time, server) - [ ] Multiple record type support - [ ] IPv4/IPv6 support --- ## Phase 4: Test Cases ```python # Test 1: Basic A record query ToolInput( target="server.lab.internal", extra_args="A" ) # Expected: Success, returns A records # Test 2: MX records with specific DNS server ToolInput( target="company.lab.internal", extra_args="@192.168.1.1 MX" ) # Expected: Success, queries 192.168.1.1 for MX records # Test 3: Reverse DNS lookup ToolInput( target="192.168.1.10", extra_args="-x" ) # Expected: Success, returns PTR record # Test 4: Short output format ToolInput( target="server.lab.internal", extra_args="A +short" ) # Expected: Success, clean output with just IP # Test 5: Clean answer section ToolInput( target="server.lab.internal", extra_args="TXT +noall +answer" ) # Expected: Success, only answer section # Test 6: Trace query ToolInput( target="server.lab.internal", extra_args="A +trace" ) # Expected: Success, shows delegation path # Test 7: Unauthorized DNS server ToolInput( target="server.lab.internal", extra_args="@8.8.8.8 A" ) # Expected: Error "DNS server not authorized" # Test 8: Invalid record type ToolInput( target="server.lab.internal", extra_args="INVALID_TYPE" ) # Expected: Error "Record type not allowed" # Test 9: Multiple query options ToolInput( target="server.lab.internal", extra_args="A +short +tcp +dnssec" ) # Expected: Success with all options applied # Test 10: ANY record query ToolInput( target="server.lab.internal", extra_args="ANY +noall +answer" ) # Expected: Success, returns all record types ``` --- ## Phase 5: Documentation Requirements ### 5.1 Module Docstring ```python """ DNS Dig query tool for authorized internal networks. Features: - DNS record queries (A, AAAA, MX, NS, TXT, SOA, etc.) - Custom DNS server queries - Reverse DNS lookups - Query options (+short, +trace, +dnssec) - Clean output formatting - Low-risk information gathering Security Controls: - Target must be .lab.internal domain or RFC1918 IP - DNS servers must be RFC1918 or .lab.internal - All standard record types allowed (read-only) - Query timeout enforcement - Comprehensive logging Usage: tool = DigTool() # Basic A record query result = await tool.run(ToolInput( target="server.lab.internal", extra_args="A" )) # Query specific DNS server result = await tool.run(ToolInput( target="server.lab.internal", extra_args="@192.168.1.1 MX" )) # Reverse DNS lookup result = await tool.run(ToolInput( target="192.168.1.10", extra_args="-x" )) Configuration: # config.yaml tool: default_timeout: 30 default_concurrency: 5 circuit_breaker: failure_threshold: 5 recovery_timeout: 60.0 Author: MCP Network Tools Team Version: 2.0.0 """ ``` --- ## Pre-Implementation Validation ### Validation Questions 1. **Is the target handling correct?** - ✅ Yes: Base class validates RFC1918/.lab.internal - ✅ Tool validates @SERVER if specified 2. **Are @ and + syntax handled?** - ✅ Yes: Special parsing for @SERVER and +OPTION 3. **Are all security controls in place?** - ✅ Yes: DNS server authorization, record type whitelist 4. **Does it match framework patterns?** - ✅ Yes: Follows Hydra/Masscan/SQLmap refactored patterns 5. **Is output parsing robust?** - ✅ Yes: Regex patterns for structured dig output 6. **Is concurrency appropriate?** - ✅ Yes: Higher (5) for low-risk read-only tool 7. **Are timeout values appropriate?** - ✅ Yes: Short (30s) for DNS queries --- ## Implementation Readiness Assessment ### Checklist - [x] Architecture validated against actual dig syntax - [x] All unique syntax patterns addressed (@, +) - [x] Security model appropriate for risk level - [x] Target validation strategy clear - [x] DNS server authorization designed - [x] Record type validation designed - [x] Output parsing strategy defined - [x] Test cases comprehensive - [x] Documentation structure planned - [x] Pattern matching validated ### Risk Assessment **Risks:** ✅ **VERY LOW** - Read-only information gathering - No attack capabilities - Established framework patterns - Clear validation layers - Comprehensive error handling ### Unique Value Proposition **Why This Tool Complements Existing Collection:** 1. **Information Gathering:** DNS enumeration before attacks 2. **Low Risk:** Safe for frequent use 3. **Network Discovery:** Find services via DNS records 4. **Troubleshooting:** Verify DNS configuration 5. **Complementary:** Works with nmap/masscan (discover, then scan) **Tool Synergies:** ``` dig → nmap → masscan → hydra/sqlmap ↓ ↓ ↓ ↓ DNS Ports Fast Exploit Enum Scan Scan Validate ``` ### Estimated Complexity **Lines of Code:** ~650 lines - Class definition: ~80 - Validation: ~200 - Parsing (@ and +): ~150 - Output parsing: ~100 - Documentation: ~120 **Development Time:** 3-4 hours for complete implementation --- ## Final Validation **APPROVED FOR IMPLEMENTATION ✅** The plan is: 1. ✅ **Architecturally sound** - Handles dig's unique syntax (@, +) 2. ✅ **Security-appropriate** - Low-risk controls for read-only tool 3. ✅ **Framework-compliant** - Follows proven patterns 4. ✅ **Comprehensive** - All dig features covered 5. ✅ **Testable** - Clear test cases defined 6. ✅ **Complementary** - Adds value to tool collection **Key Differentiators:** - Unique syntax handling (@SERVER, +OPTION) - Higher concurrency (low risk) - Shorter timeout (quick queries) - Read-only operations (safe) - Network discovery focus **Ready to proceed with meticulous implementation of DigTool.**

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nordeim/Security-MCP-Server-v3'

If you have feedback or need assistance with the MCP directory API, please join our Discord server