implementation_plan_dig_tool.md•25.4 kB
# DNS Dig Tool - Deep Analysis & Meticulous Implementation Plan
## Executive Summary
**Tool:** DNS Dig (Domain Information Groper)
**Purpose:** DNS query and enumeration tool for authorized internal networks
**Risk Level:** 🟢 **LOW** (read-only information gathering)
**Complexity:** 🟡 **MEDIUM** (unique syntax patterns)
**Estimated LOC:** ~650 lines
**Key Insight:** Dig is fundamentally different from attack tools (nmap/hydra/sqlmap) - it's read-only DNS information gathering, making it lower risk and suitable for higher concurrency.
---
## Phase 1: Deep Architecture Analysis
### 1.1 Dig Command Structure Understanding
```bash
# Basic syntax
dig [OPTIONS] [@SERVER] [DOMAIN] [TYPE]
# Examples:
dig example.lab.internal # Query default DNS
dig @192.168.1.1 example.lab.internal # Query specific DNS server
dig example.lab.internal A # Query A records
dig example.lab.internal MX +short # Mail servers, short output
dig -x 192.168.1.10 # Reverse DNS lookup
dig @192.168.1.1 example.lab.internal ANY +trace # Trace with all records
dig example.lab.internal TXT +noall +answer # Clean TXT output
```
**Component Breakdown:**
1. **@SERVER** - DNS server to query (optional, special syntax)
2. **OPTIONS** - Flags like `-x`, `-4`, `-6`, `-t`, `-p`
3. **+QUERY_OPTIONS** - Dig-specific options like `+short`, `+trace`, `+dnssec`
4. **DOMAIN** - Domain name to query
5. **TYPE** - Record type (A, MX, TXT, etc.)
### 1.2 Unique Syntax Challenges
**Challenge 1: @ DNS Server Prefix**
```python
# Input: "@192.168.1.1 example.lab.internal"
# @192.168.1.1 is NOT a flag, it's a server specification
# Must parse separately from flags
```
**Challenge 2: + Query Options**
```python
# Input: "+short +noall +answer"
# +option is dig-specific, not standard flags
# Must parse separately and validate against whitelist
```
**Challenge 3: Positional Arguments**
```python
# Input: "example.lab.internal MX"
# Both are non-flag tokens
# Domain is target (already validated by base class)
# MX is record type (must validate)
```
### 1.3 Target Validation Strategy
**Scenario 1: Forward Lookup**
```python
ToolInput(
target="server.lab.internal", # Domain (base validates .lab.internal)
extra_args="A +short" # Record type and options
)
# ✅ Base class validates .lab.internal
```
**Scenario 2: Reverse Lookup**
```python
ToolInput(
target="192.168.1.10", # IP address (base validates RFC1918)
extra_args="-x" # Reverse lookup flag
)
# ✅ Base class validates RFC1918
```
**Scenario 3: Custom DNS Server**
```python
ToolInput(
target="server.lab.internal",
extra_args="@192.168.1.1 MX" # Query specific DNS server
)
# ✅ Base validates target
# ✅ Tool validates @192.168.1.1 is RFC1918 or .lab.internal
```
**Decision:** Use base class validation for target, add custom validation for @SERVER specification.
### 1.4 Security Model
**Low Risk Justification:**
- DNS queries are read-only
- Cannot modify target systems
- Cannot cause DoS (with reasonable timeout)
- No code execution
- Standard information gathering
**Security Controls:**
1. **DNS Server Restriction:** Only query RFC1918 or .lab.internal DNS servers
2. **Target Restriction:** Only query .lab.internal domains or RFC1918 IPs (PTR)
3. **Record Type Validation:** Whitelist of safe record types (all standard types)
4. **Query Option Validation:** Whitelist of safe +options
5. **Timeout Enforcement:** Prevent hanging queries
**No Restrictions Needed:**
- All record types are safe (A, MX, TXT, etc.)
- All query options are safe (+short, +trace, etc.)
- All standard flags are safe (-x, -4, -6, etc.)
---
## Phase 2: Detailed Component Design
### 2.1 Class Structure
```python
class DigTool(MCPBaseTool):
"""
DNS query tool for authorized internal networks.
Dig is a DNS lookup utility for querying DNS nameservers.
This wrapper provides safe DNS enumeration for internal networks.
Security Model:
- Target must be .lab.internal domain or RFC1918 IP
- DNS servers (if specified with @) must be RFC1918 or .lab.internal
- All standard DNS record types allowed (read-only)
- All dig query options allowed (information gathering)
"""
command_name: ClassVar[str] = "dig"
# Allowed DNS record types (all standard types - all safe)
ALLOWED_RECORD_TYPES = frozenset([
'A', 'AAAA', 'MX', 'NS', 'TXT', 'SOA', 'CNAME', 'PTR',
'SRV', 'CAA', 'ANY', 'DNSKEY', 'DS', 'RRSIG', 'NSEC',
'NSEC3', 'TLSA', 'SPF', 'NAPTR', 'HINFO', 'RP'
])
# Allowed query options (+ prefix)
ALLOWED_QUERY_OPTIONS = frozenset([
# Output format
'short', 'noall', 'answer', 'authority', 'additional',
'question', 'stats', 'nostats', 'comments', 'nocomments',
# Query behavior
'trace', 'notrace', 'recurse', 'norecurse',
'dnssec', 'nodnssec', 'nsid', 'nonsid',
# Protocol
'tcp', 'notcp', 'vc', 'novc', 'ignore', 'noignore',
# Advanced
'aaflag', 'noaaflag', 'adflag', 'noadflag',
'cdflag', 'nocdflag', 'qr', 'noqr',
# Timing
'time', 'retry', 'ndots', 'bufsize', 'edns',
# Misc
'multiline', 'nomultiline', 'onesoa', 'noonesoa'
])
# Standard flags
allowed_flags: ClassVar[Sequence[str]] = [
'-x', # Reverse lookup
'-4', # IPv4 only
'-6', # IPv6 only
'-t', # Type specification
'-c', # Class specification
'-p', # Port number
'-b', # Bind to source address
'-f', # Read from file (batch mode)
'-q', # Query name
'-v', # Verbose
'-V', # Version
]
_FLAGS_REQUIRE_VALUE = frozenset({
'-t', '-c', '-p', '-b', '-f', '-q'
})
# Timeouts (DNS queries are quick)
default_timeout_sec: ClassVar[float] = 30.0
# Concurrency (DNS queries are low-impact, allow higher concurrency)
concurrency: ClassVar[int] = 5
# Circuit breaker (lenient for low-risk tool)
circuit_breaker_failure_threshold: ClassVar[int] = 5
circuit_breaker_recovery_timeout: ClassVar[float] = 60.0
# Compiled patterns for parsing
_DNS_SERVER_PATTERN = re.compile(r'^@([\w\.\-]+)$')
_ANSWER_SECTION_PATTERN = re.compile(
r'^([^\s]+)\s+(\d+)\s+IN\s+([A-Z]+)\s+(.+)$',
re.MULTILINE
)
_QUERY_TIME_PATTERN = re.compile(r'Query time:\s+(\d+)\s+msec')
_SERVER_PATTERN = re.compile(r'SERVER:\s+([\d\.]+)#(\d+)')
```
### 2.2 Initialization Pattern
```python
def __init__(self):
"""Initialize Dig tool with configuration."""
super().__init__()
self.config = get_config()
self._apply_config()
log.info("dig_tool.initialized timeout=%.1f concurrency=%d",
self.default_timeout_sec, self.concurrency)
def _apply_config(self):
"""Apply configuration with safe clamping."""
try:
# Circuit breaker
if hasattr(self.config, 'circuit_breaker') and self.config.circuit_breaker:
cb = self.config.circuit_breaker
if hasattr(cb, 'failure_threshold'):
self.circuit_breaker_failure_threshold = max(1, min(10, int(cb.failure_threshold)))
if hasattr(cb, 'recovery_timeout'):
self.circuit_breaker_recovery_timeout = max(30.0, min(300.0, float(cb.recovery_timeout)))
# Tool config
if hasattr(self.config, 'tool') and self.config.tool:
if hasattr(self.config.tool, 'default_timeout'):
self.default_timeout_sec = max(5.0, min(300.0, float(self.config.tool.default_timeout)))
if hasattr(self.config.tool, 'default_concurrency'):
self.concurrency = max(1, min(10, int(self.config.tool.default_concurrency)))
log.debug("dig.config_applied timeout=%.1f concurrency=%d",
self.default_timeout_sec, self.concurrency)
except Exception as e:
log.error("dig.config_failed error=%s using_defaults", str(e))
# Safe defaults
self.circuit_breaker_failure_threshold = 5
self.circuit_breaker_recovery_timeout = 60.0
self.default_timeout_sec = 30.0
self.concurrency = 5
```
### 2.3 Execution Flow
```python
async def _execute_tool(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput:
"""Execute dig with DNS-specific validation."""
# Step 1: Dig-specific validation
validation_error = self._validate_dig_requirements(inp)
if validation_error:
return validation_error
# Step 2: Parse and validate arguments
try:
validated_args = self._parse_and_validate_args(inp.extra_args or "", inp.target)
except ValueError as e:
error_context = ErrorContext(...)
return self._create_error_output(error_context, inp.correlation_id or "")
# Step 3: Optimize arguments
optimized_args = self._optimize_dig_args(validated_args)
# Step 4: Execute
enhanced_input = ToolInput(
target=inp.target,
extra_args=optimized_args,
timeout_sec=timeout_sec or inp.timeout_sec or self.default_timeout_sec,
correlation_id=inp.correlation_id
)
log.info("dig.executing target=%s args=%s", inp.target, optimized_args)
result = await super()._execute_tool(enhanced_input, enhanced_input.timeout_sec)
# Step 5: Parse output
if result.returncode == 0 and result.stdout:
try:
parsed = self._parse_dig_output(result.stdout)
result.ensure_metadata()
result.metadata['parsed'] = parsed
result.metadata['answers_found'] = len(parsed.get('answers', []))
log.info("dig.completed target=%s answers=%d query_time=%s",
inp.target,
len(parsed.get('answers', [])),
parsed.get('query_time'))
except Exception as e:
log.warning("dig.parse_failed error=%s", str(e))
return result
```
### 2.4 Validation Methods
```python
def _validate_dig_requirements(self, inp: ToolInput) -> Optional[ToolOutput]:
"""Validate dig-specific requirements."""
# Extract and validate DNS server if specified
dns_server = self._extract_dns_server(inp.extra_args or "")
if dns_server:
if not self._is_authorized_dns_server(dns_server):
return self._create_error_output(
ErrorContext(
error_type=ToolErrorType.VALIDATION_ERROR,
message=f"DNS server not authorized: {dns_server}",
recovery_suggestion=(
"Use RFC1918 DNS server or .lab.internal DNS server:\n"
" @192.168.1.1\n"
" @10.0.0.1\n"
" @dns.lab.internal"
),
timestamp=self._get_timestamp(),
tool_name=self.tool_name,
target=inp.target,
metadata={"dns_server": dns_server}
)
)
log.debug("dig.dns_server_validated server=%s", dns_server)
# Validate reverse lookup format if -x specified
if "-x" in (inp.extra_args or ""):
# Target should be an IP address for reverse lookup
# Base class already validates RFC1918
log.debug("dig.reverse_lookup target=%s", inp.target)
return None
def _extract_dns_server(self, extra_args: str) -> Optional[str]:
"""Extract @SERVER from arguments."""
try:
tokens = shlex.split(extra_args)
for token in tokens:
match = self._DNS_SERVER_PATTERN.match(token)
if match:
return match.group(1)
except ValueError:
pass
return None
def _is_authorized_dns_server(self, server: str) -> bool:
"""Validate DNS server is RFC1918 or .lab.internal."""
# Check .lab.internal
if server.endswith('.lab.internal'):
return True
# Check RFC1918
try:
import ipaddress
ip = ipaddress.ip_address(server)
return ip.version == 4 and ip.is_private
except ValueError:
# Not an IP, must be .lab.internal
return False
```
### 2.5 Argument Parsing (Handle @ and +)
```python
def _parse_and_validate_args(self, extra_args: str, target: str) -> str:
"""
Parse and validate dig arguments.
Handles three special cases:
1. @SERVER - DNS server specification
2. +OPTION - Query options
3. RECORD_TYPE - DNS record type (positional)
"""
if not extra_args:
# Default to A record query
return "A"
tokens = shlex.split(extra_args)
validated = []
for token in tokens:
# Handle @SERVER
if token.startswith("@"):
match = self._DNS_SERVER_PATTERN.match(token)
if not match:
raise ValueError(f"Invalid DNS server format: {token}")
# Already validated in _validate_dig_requirements
validated.append(token)
continue
# Handle +OPTION
if token.startswith("+"):
option_name = token[1:].split("=")[0] # Handle +time=5
if option_name not in self.ALLOWED_QUERY_OPTIONS:
raise ValueError(
f"Query option not allowed: {token}\n"
f"Allowed: +short, +trace, +dnssec, +noall, +answer, etc."
)
validated.append(token)
continue
# Handle standard flags
if token.startswith("-"):
flag_base = token.split("=")[0]
if flag_base not in self.allowed_flags:
raise ValueError(f"Flag not allowed: {token}")
validated.append(token)
# Check if flag requires value (next token)
if flag_base in self._FLAGS_REQUIRE_VALUE:
# Value validation happens in next iteration or below
pass
continue
# Non-flag token - could be:
# 1. Record type (A, MX, TXT, etc.)
# 2. Value for previous flag
# 3. Domain name (but target is already validated by base)
# Check if it's a record type
if token.upper() in self.ALLOWED_RECORD_TYPES:
validated.append(token.upper())
continue
# Check if it's a value for previous flag
if validated and validated[-1].startswith("-"):
flag = validated[-1]
if flag in self._FLAGS_REQUIRE_VALUE:
# Validate specific flag values
if flag == "-p":
port = int(token)
if not (1 <= port <= 65535):
raise ValueError(f"Port must be 1-65535: {port}")
validated.append(token)
continue
# Could be domain name (same as target)
# Allow it but log
log.debug("dig.additional_token token=%s", token)
validated.append(token)
return " ".join(validated)
```
### 2.6 Optimization
```python
def _optimize_dig_args(self, validated_args: str) -> str:
"""Add helpful defaults for cleaner output."""
tokens = shlex.split(validated_args) if validated_args else []
optimized = []
# Check what's present
has_output_format = any(
t in ('+short', '+noall', '+answer')
for t in tokens
)
has_record_type = any(
t.upper() in self.ALLOWED_RECORD_TYPES
for t in tokens
)
# Add defaults if missing
# Don't add output format by default - let dig use default verbose output
# This is helpful for troubleshooting
if not has_record_type:
# Default to A record
optimized.append("A")
log.debug("dig.optimization added=record_type value=A")
# Add original arguments
optimized.extend(tokens)
result = " ".join(optimized)
if len(optimized) != len(tokens):
log.info("dig.arguments_optimized original=%d optimized=%d",
len(tokens), len(optimized))
return result
```
### 2.7 Output Parsing
```python
def _parse_dig_output(self, output: str) -> Dict[str, Any]:
"""
Parse dig output for DNS records.
Dig output format:
; <<>> DiG 9.16.1 <<>> example.com A
;; ANSWER SECTION:
example.com. 300 IN A 93.184.216.34
;; Query time: 10 msec
;; SERVER: 8.8.8.8#53(8.8.8.8)
"""
results = {
"answers": [],
"query_time": None,
"server": None,
"flags": [],
"status": None,
}
# Extract status
if "status: NOERROR" in output:
results["status"] = "NOERROR"
elif "status: NXDOMAIN" in output:
results["status"] = "NXDOMAIN"
elif "status: SERVFAIL" in output:
results["status"] = "SERVFAIL"
# Extract flags
flags_pattern = re.compile(r'flags:\s+([^;]+);')
flags_match = flags_pattern.search(output)
if flags_match:
results["flags"] = flags_match.group(1).strip().split()
# Extract answer section
in_answer_section = False
for line in output.split('\n'):
if ";; ANSWER SECTION:" in line:
in_answer_section = True
continue
if in_answer_section:
if line.startswith(";;") or line.strip() == "":
in_answer_section = False
continue
match = self._ANSWER_SECTION_PATTERN.match(line)
if match:
results["answers"].append({
"name": match.group(1),
"ttl": int(match.group(2)),
"type": match.group(3),
"value": match.group(4)
})
# Extract query time
time_match = self._QUERY_TIME_PATTERN.search(output)
if time_match:
results["query_time"] = int(time_match.group(1))
# Extract server
server_match = self._SERVER_PATTERN.search(output)
if server_match:
results["server"] = {
"ip": server_match.group(1),
"port": int(server_match.group(2))
}
log.debug("dig.output_parsed answers=%d query_time=%s status=%s",
len(results["answers"]), results["query_time"], results["status"])
return results
```
---
## Phase 3: Implementation Validation Checklist
### 3.1 Framework Compliance
- [ ] Extends MCPBaseTool correctly
- [ ] Uses ClassVar for class variables
- [ ] Calls super().__init__() first
- [ ] Uses ErrorContext for all errors
- [ ] Returns ToolOutput from all error paths
- [ ] Structured logging (key=value)
- [ ] Complete type hints
- [ ] Comprehensive docstrings
### 3.2 Security Validation
- [ ] No private variable access
- [ ] DNS server authorization (@SERVER)
- [ ] Target validation (base class)
- [ ] Record type whitelist
- [ ] Query option whitelist
- [ ] Timeout enforcement
- [ ] No shell injection vulnerabilities
### 3.3 Dig-Specific Features
- [ ] @SERVER parsing and validation
- [ ] +OPTION parsing and validation
- [ ] Record type validation
- [ ] Reverse lookup support (-x)
- [ ] Output parsing (answers, time, server)
- [ ] Multiple record type support
- [ ] IPv4/IPv6 support
---
## Phase 4: Test Cases
```python
# Test 1: Basic A record query
ToolInput(
target="server.lab.internal",
extra_args="A"
)
# Expected: Success, returns A records
# Test 2: MX records with specific DNS server
ToolInput(
target="company.lab.internal",
extra_args="@192.168.1.1 MX"
)
# Expected: Success, queries 192.168.1.1 for MX records
# Test 3: Reverse DNS lookup
ToolInput(
target="192.168.1.10",
extra_args="-x"
)
# Expected: Success, returns PTR record
# Test 4: Short output format
ToolInput(
target="server.lab.internal",
extra_args="A +short"
)
# Expected: Success, clean output with just IP
# Test 5: Clean answer section
ToolInput(
target="server.lab.internal",
extra_args="TXT +noall +answer"
)
# Expected: Success, only answer section
# Test 6: Trace query
ToolInput(
target="server.lab.internal",
extra_args="A +trace"
)
# Expected: Success, shows delegation path
# Test 7: Unauthorized DNS server
ToolInput(
target="server.lab.internal",
extra_args="@8.8.8.8 A"
)
# Expected: Error "DNS server not authorized"
# Test 8: Invalid record type
ToolInput(
target="server.lab.internal",
extra_args="INVALID_TYPE"
)
# Expected: Error "Record type not allowed"
# Test 9: Multiple query options
ToolInput(
target="server.lab.internal",
extra_args="A +short +tcp +dnssec"
)
# Expected: Success with all options applied
# Test 10: ANY record query
ToolInput(
target="server.lab.internal",
extra_args="ANY +noall +answer"
)
# Expected: Success, returns all record types
```
---
## Phase 5: Documentation Requirements
### 5.1 Module Docstring
```python
"""
DNS Dig query tool for authorized internal networks.
Features:
- DNS record queries (A, AAAA, MX, NS, TXT, SOA, etc.)
- Custom DNS server queries
- Reverse DNS lookups
- Query options (+short, +trace, +dnssec)
- Clean output formatting
- Low-risk information gathering
Security Controls:
- Target must be .lab.internal domain or RFC1918 IP
- DNS servers must be RFC1918 or .lab.internal
- All standard record types allowed (read-only)
- Query timeout enforcement
- Comprehensive logging
Usage:
tool = DigTool()
# Basic A record query
result = await tool.run(ToolInput(
target="server.lab.internal",
extra_args="A"
))
# Query specific DNS server
result = await tool.run(ToolInput(
target="server.lab.internal",
extra_args="@192.168.1.1 MX"
))
# Reverse DNS lookup
result = await tool.run(ToolInput(
target="192.168.1.10",
extra_args="-x"
))
Configuration:
# config.yaml
tool:
default_timeout: 30
default_concurrency: 5
circuit_breaker:
failure_threshold: 5
recovery_timeout: 60.0
Author: MCP Network Tools Team
Version: 2.0.0
"""
```
---
## Pre-Implementation Validation
### Validation Questions
1. **Is the target handling correct?**
- ✅ Yes: Base class validates RFC1918/.lab.internal
- ✅ Tool validates @SERVER if specified
2. **Are @ and + syntax handled?**
- ✅ Yes: Special parsing for @SERVER and +OPTION
3. **Are all security controls in place?**
- ✅ Yes: DNS server authorization, record type whitelist
4. **Does it match framework patterns?**
- ✅ Yes: Follows Hydra/Masscan/SQLmap refactored patterns
5. **Is output parsing robust?**
- ✅ Yes: Regex patterns for structured dig output
6. **Is concurrency appropriate?**
- ✅ Yes: Higher (5) for low-risk read-only tool
7. **Are timeout values appropriate?**
- ✅ Yes: Short (30s) for DNS queries
---
## Implementation Readiness Assessment
### Checklist
- [x] Architecture validated against actual dig syntax
- [x] All unique syntax patterns addressed (@, +)
- [x] Security model appropriate for risk level
- [x] Target validation strategy clear
- [x] DNS server authorization designed
- [x] Record type validation designed
- [x] Output parsing strategy defined
- [x] Test cases comprehensive
- [x] Documentation structure planned
- [x] Pattern matching validated
### Risk Assessment
**Risks:** ✅ **VERY LOW**
- Read-only information gathering
- No attack capabilities
- Established framework patterns
- Clear validation layers
- Comprehensive error handling
### Unique Value Proposition
**Why This Tool Complements Existing Collection:**
1. **Information Gathering:** DNS enumeration before attacks
2. **Low Risk:** Safe for frequent use
3. **Network Discovery:** Find services via DNS records
4. **Troubleshooting:** Verify DNS configuration
5. **Complementary:** Works with nmap/masscan (discover, then scan)
**Tool Synergies:**
```
dig → nmap → masscan → hydra/sqlmap
↓ ↓ ↓ ↓
DNS Ports Fast Exploit
Enum Scan Scan Validate
```
### Estimated Complexity
**Lines of Code:** ~650 lines
- Class definition: ~80
- Validation: ~200
- Parsing (@ and +): ~150
- Output parsing: ~100
- Documentation: ~120
**Development Time:** 3-4 hours for complete implementation
---
## Final Validation
**APPROVED FOR IMPLEMENTATION ✅**
The plan is:
1. ✅ **Architecturally sound** - Handles dig's unique syntax (@, +)
2. ✅ **Security-appropriate** - Low-risk controls for read-only tool
3. ✅ **Framework-compliant** - Follows proven patterns
4. ✅ **Comprehensive** - All dig features covered
5. ✅ **Testable** - Clear test cases defined
6. ✅ **Complementary** - Adds value to tool collection
**Key Differentiators:**
- Unique syntax handling (@SERVER, +OPTION)
- Higher concurrency (low risk)
- Shorter timeout (quick queries)
- Read-only operations (safe)
- Network discovery focus
**Ready to proceed with meticulous implementation of DigTool.**