Skip to main content
Glama

Security MCP Server

by nordeim
dig_tool.py37.2 kB
""" DNS Dig query tool for authorized internal networks. Features: - DNS record queries (A, AAAA, MX, NS, TXT, SOA, CNAME, PTR, SRV, etc.) - Custom DNS server queries (@server syntax) - Reverse DNS lookups (-x flag) - Query options (+short, +trace, +dnssec, etc.) - Clean output formatting options - Low-risk information gathering - Network service discovery Security Controls: - Target must be .lab.internal domain or RFC1918 IP - DNS servers (@server) must be RFC1918 or .lab.internal - All standard DNS record types allowed (read-only operations) - Query timeout enforcement (default 30s) - Comprehensive logging and monitoring Usage: tool = DigTool() # Basic A record query result = await tool.run(ToolInput( target="server.lab.internal", extra_args="A" )) # Query specific DNS server result = await tool.run(ToolInput( target="server.lab.internal", extra_args="@192.168.1.1 MX +short" )) # Reverse DNS lookup result = await tool.run(ToolInput( target="192.168.1.10", extra_args="-x" )) # Clean TXT record output result = await tool.run(ToolInput( target="company.lab.internal", extra_args="TXT +noall +answer" )) # Trace DNS delegation result = await tool.run(ToolInput( target="server.lab.internal", extra_args="A +trace" )) Configuration: # config.yaml tool: default_timeout: 30 # 30 seconds for DNS queries default_concurrency: 5 # Higher for low-risk tool circuit_breaker: failure_threshold: 5 recovery_timeout: 60.0 Environment Variables: MCP_DEFAULT_TIMEOUT_SEC: Override default timeout MCP_DEFAULT_CONCURRENCY: Override concurrency Author: MCP Network Tools Team Version: 2.0.0 """ import logging import re import shlex from typing import ClassVar, Optional, Sequence, Dict, Any, List from datetime import datetime, timezone from mcp_server.base_tool import MCPBaseTool, ToolInput, ToolOutput, ToolErrorType, ErrorContext from mcp_server.config import get_config log = logging.getLogger(__name__) class DigTool(MCPBaseTool): """ DNS query tool for authorized internal networks. Dig (Domain Information Groper) is a DNS lookup utility for querying DNS nameservers. This wrapper provides safe DNS enumeration and troubleshooting for internal networks. Command Structure: dig [@SERVER] [OPTIONS] [DOMAIN] [TYPE] [+QUERY_OPTIONS] @SERVER: DNS server to query (optional, e.g., @192.168.1.1) OPTIONS: Standard flags (-x, -4, -6, -t, -p, etc.) DOMAIN: Domain name (from target parameter) TYPE: Record type (A, MX, TXT, etc.) +QUERY_OPTIONS: Dig-specific options (+short, +trace, etc.) Security Model: - Target validated by base class (RFC1918/.lab.internal) - DNS servers must be RFC1918/.lab.internal - All standard record types allowed (read-only) - All query options allowed (information gathering) - Low-risk tool suitable for higher concurrency Attributes: command_name: System command (dig) allowed_flags: Whitelist of permitted flags default_timeout_sec: Default timeout (30s) concurrency: Max concurrent executions (5 - low-risk) Example: >>> tool = DigTool() >>> result = await tool.run(ToolInput( ... target="server.lab.internal", ... extra_args="@192.168.1.1 MX +short" ... )) """ command_name: ClassVar[str] = "dig" # Allowed DNS record types (all standard types - all safe for read-only) ALLOWED_RECORD_TYPES = frozenset([ # Common types 'A', 'AAAA', 'MX', 'NS', 'TXT', 'SOA', 'CNAME', 'PTR', # Service discovery 'SRV', 'NAPTR', # Security 'CAA', 'DNSKEY', 'DS', 'RRSIG', 'NSEC', 'NSEC3', 'TLSA', # Legacy/Special 'SPF', 'HINFO', 'RP', 'LOC', 'SSHFP', # Meta 'ANY', 'AXFR', 'IXFR', ]) # Allowed query options (+ prefix) - all safe for information gathering ALLOWED_QUERY_OPTIONS = frozenset([ # Output format control 'short', 'noall', 'answer', 'authority', 'additional', 'question', 'stats', 'nostats', 'comments', 'nocomments', 'cmd', 'nocmd', 'rrcomments', 'norrcomments', # Query behavior 'trace', 'notrace', 'recurse', 'norecurse', 'dnssec', 'nodnssec', 'nsid', 'nonsid', 'besteffort', 'nobesteffort', # Protocol options 'tcp', 'notcp', 'vc', 'novc', 'ignore', 'noignore', 'fail', 'nofail', 'keepopen', 'nokeepopen', # DNS flags 'aaflag', 'noaaflag', 'adflag', 'noadflag', 'cdflag', 'nocdflag', 'qr', 'noqr', 'raflag', 'noraflag', 'tcflag', 'notcflag', # Timing and retries 'time', 'tries', 'retry', 'ndots', # EDNS options 'bufsize', 'edns', 'noedns', 'ednsflags', 'ednsopt', # Display options 'multiline', 'nomultiline', 'onesoa', 'noonesoa', 'identify', 'noidentify', 'split', # Advanced 'subnet', 'nsid', 'expire', 'sit', 'cookie', 'ttlid', 'nottlid', 'ttlunits', 'nottlunits', ]) # Standard flags allowed_flags: ClassVar[Sequence[str]] = [ # Lookup options '-x', # Reverse lookup '-4', # IPv4 only '-6', # IPv6 only # Query specification '-t', # Type specification (alternative to positional) '-c', # Class specification (IN, CH, HS) '-q', # Query name (alternative to positional) # Connection options '-p', # Port number (default 53) '-b', # Bind to source address # Batch mode '-f', # Read queries from file # Output control '-v', # Verbose output '-V', # Print version ] # Flags that require values _FLAGS_REQUIRE_VALUE = frozenset({ '-t', '-c', '-q', '-p', '-b', '-f' }) # Timeouts (DNS queries are quick) default_timeout_sec: ClassVar[float] = 30.0 # Concurrency (DNS queries are low-impact, allow higher concurrency) concurrency: ClassVar[int] = 5 # Circuit breaker (lenient for low-risk tool) circuit_breaker_failure_threshold: ClassVar[int] = 5 circuit_breaker_recovery_timeout: ClassVar[float] = 60.0 circuit_breaker_expected_exception: ClassVar[tuple] = (Exception,) # Compiled patterns for parsing _DNS_SERVER_PATTERN = re.compile(r'^@([\w\.\-]+)$') _ANSWER_SECTION_PATTERN = re.compile( r'^([^\s]+)\s+(\d+)\s+IN\s+([A-Z]+)\s+(.+)$', re.MULTILINE ) _QUERY_TIME_PATTERN = re.compile(r'Query time:\s+(\d+)\s+msec') _SERVER_PATTERN = re.compile(r'SERVER:\s+([\d\.a-fA-F:]+)#(\d+)') _FLAGS_PATTERN = re.compile(r'flags:\s+([^;]+);') _STATUS_PATTERN = re.compile(r'status:\s+(\w+)') def __init__(self): """Initialize Dig tool with configuration.""" super().__init__() self.config = get_config() self._apply_config() log.info("dig_tool.initialized timeout=%.1f concurrency=%d", self.default_timeout_sec, self.concurrency) log.debug("dig_tool.info low_risk_tool read_only_operations") def _apply_config(self): """Apply configuration settings with safe clamping.""" try: # Apply circuit breaker config if hasattr(self.config, 'circuit_breaker') and self.config.circuit_breaker: cb = self.config.circuit_breaker if hasattr(cb, 'failure_threshold'): original = self.circuit_breaker_failure_threshold self.circuit_breaker_failure_threshold = max(1, min(10, int(cb.failure_threshold))) if self.circuit_breaker_failure_threshold != original: log.info("dig.config_clamped param=failure_threshold original=%d new=%d", original, self.circuit_breaker_failure_threshold) if hasattr(cb, 'recovery_timeout'): original = self.circuit_breaker_recovery_timeout self.circuit_breaker_recovery_timeout = max(30.0, min(300.0, float(cb.recovery_timeout))) if self.circuit_breaker_recovery_timeout != original: log.info("dig.config_clamped param=recovery_timeout original=%.1f new=%.1f", original, self.circuit_breaker_recovery_timeout) # Apply tool config if hasattr(self.config, 'tool') and self.config.tool: tool = self.config.tool if hasattr(tool, 'default_timeout'): original = self.default_timeout_sec self.default_timeout_sec = max(5.0, min(300.0, float(tool.default_timeout))) if self.default_timeout_sec != original: log.info("dig.config_clamped param=default_timeout original=%.1f new=%.1f", original, self.default_timeout_sec) if hasattr(tool, 'default_concurrency'): original = self.concurrency self.concurrency = max(1, min(10, int(tool.default_concurrency))) if self.concurrency != original: log.info("dig.config_clamped param=concurrency original=%d new=%d", original, self.concurrency) log.debug("dig.config_applied timeout=%.1f concurrency=%d", self.default_timeout_sec, self.concurrency) except Exception as e: log.error("dig.config_apply_failed error=%s using_safe_defaults", str(e)) # Reset to safe defaults self.circuit_breaker_failure_threshold = 5 self.circuit_breaker_recovery_timeout = 60.0 self.default_timeout_sec = 30.0 self.concurrency = 5 async def _execute_tool(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: """Execute dig with DNS-specific validation.""" # Step 1: Dig-specific validation validation_error = self._validate_dig_requirements(inp) if validation_error: return validation_error # Step 2: Parse and validate arguments try: validated_args = self._parse_and_validate_args(inp.extra_args or "", inp.target) except ValueError as e: error_context = ErrorContext( error_type=ToolErrorType.VALIDATION_ERROR, message=f"Invalid arguments: {str(e)}", recovery_suggestion="Check DNS server (@), record type, and query options (+)", timestamp=self._get_timestamp(), tool_name=self.tool_name, target=inp.target, metadata={"error": str(e), "provided_args": inp.extra_args} ) return self._create_error_output(error_context, inp.correlation_id or "") # Step 3: Optimize arguments optimized_args = self._optimize_dig_args(validated_args) # Step 4: Create enhanced input enhanced_input = ToolInput( target=inp.target, extra_args=optimized_args, timeout_sec=timeout_sec or inp.timeout_sec or self.default_timeout_sec, correlation_id=inp.correlation_id ) log.info("dig.executing target=%s args=%s", inp.target, optimized_args) # Step 5: Execute with base class result = await super()._execute_tool(enhanced_input, enhanced_input.timeout_sec) # Step 6: Parse output if result.returncode == 0 and result.stdout: try: parsed = self._parse_dig_output(result.stdout) result.ensure_metadata() result.metadata['parsed'] = parsed result.metadata['answers_found'] = len(parsed.get('answers', [])) result.metadata['query_time'] = parsed.get('query_time') result.metadata['status'] = parsed.get('status') log.info("dig.execution_completed target=%s answers=%d query_time=%s status=%s", inp.target, len(parsed.get('answers', [])), parsed.get('query_time'), parsed.get('status')) except Exception as e: log.warning("dig.parse_failed error=%s", str(e)) # Don't fail on parse errors return result def _validate_dig_requirements(self, inp: ToolInput) -> Optional[ToolOutput]: """ Validate dig-specific requirements. Checks: - DNS server authorization if @SERVER specified - Reverse lookup format if -x specified Args: inp: Tool input Returns: ToolOutput with error if validation fails, None otherwise """ # Extract and validate DNS server if specified dns_server = self._extract_dns_server(inp.extra_args or "") if dns_server: if not self._is_authorized_dns_server(dns_server): error_context = ErrorContext( error_type=ToolErrorType.VALIDATION_ERROR, message=f"DNS server not authorized: {dns_server}", recovery_suggestion=( "Use RFC1918 DNS server or .lab.internal DNS server:\n" " @192.168.1.1\n" " @10.0.0.1\n" " @dns.lab.internal\n" "Do not use public DNS servers (8.8.8.8, 1.1.1.1, etc.)" ), timestamp=self._get_timestamp(), tool_name=self.tool_name, target=inp.target, metadata={ "dns_server": dns_server, "examples": ["@192.168.1.1", "@10.0.0.53", "@ns1.lab.internal"] } ) return self._create_error_output(error_context, inp.correlation_id or "") log.debug("dig.dns_server_validated server=%s", dns_server) # Validate reverse lookup format if -x specified if "-x" in (inp.extra_args or ""): # Target should be an IP address for reverse lookup # Base class already validates RFC1918 try: import ipaddress ipaddress.ip_address(inp.target) log.debug("dig.reverse_lookup target=%s", inp.target) except ValueError: error_context = ErrorContext( error_type=ToolErrorType.VALIDATION_ERROR, message=f"Reverse lookup (-x) requires IP address target, got: {inp.target}", recovery_suggestion=( "For reverse lookups, use IP address as target:\n" " target='192.168.1.10'\n" " extra_args='-x'" ), timestamp=self._get_timestamp(), tool_name=self.tool_name, target=inp.target, metadata={"flag": "-x", "target": inp.target} ) return self._create_error_output(error_context, inp.correlation_id or "") return None def _extract_dns_server(self, extra_args: str) -> Optional[str]: """ Extract @SERVER from arguments. Args: extra_args: Extra arguments string Returns: DNS server hostname/IP or None if not specified """ try: tokens = shlex.split(extra_args) for token in tokens: match = self._DNS_SERVER_PATTERN.match(token) if match: return match.group(1) except ValueError as e: log.debug("dig.dns_server_extraction_failed error=%s", str(e)) return None def _is_authorized_dns_server(self, server: str) -> bool: """ Validate DNS server is RFC1918 or .lab.internal. Args: server: DNS server hostname or IP Returns: True if authorized, False otherwise """ # Check .lab.internal if server.endswith('.lab.internal'): return True # Check RFC1918 try: import ipaddress ip = ipaddress.ip_address(server) return ip.version == 4 and ip.is_private except ValueError: # Not an IP address, must be .lab.internal return False def _parse_and_validate_args(self, extra_args: str, target: str) -> str: """ Parse and validate dig arguments. Handles three special syntax patterns: 1. @SERVER - DNS server specification 2. +OPTION - Query options (dig-specific) 3. RECORD_TYPE - DNS record type (positional) Args: extra_args: Extra arguments string target: Target domain/IP for context Returns: Validated arguments string Raises: ValueError: If validation fails """ if not extra_args: # Default to A record query return "A" try: tokens = shlex.split(extra_args) except ValueError as e: raise ValueError(f"Failed to parse arguments: {str(e)}") validated = [] i = 0 while i < len(tokens): token = tokens[i] # Handle @SERVER (DNS server specification) if token.startswith("@"): match = self._DNS_SERVER_PATTERN.match(token) if not match: raise ValueError( f"Invalid DNS server format: {token}\n" f"Use: @192.168.1.1 or @dns.lab.internal" ) # Already validated in _validate_dig_requirements validated.append(token) i += 1 continue # Handle +OPTION (query options) if token.startswith("+"): # Extract option name (handle +option or +option=value) if "=" in token: option_name = token[1:].split("=")[0] option_value = token.split("=", 1)[1] if option_name not in self.ALLOWED_QUERY_OPTIONS: raise ValueError( f"Query option not allowed: +{option_name}\n" f"Allowed options: +short, +trace, +dnssec, +noall, +answer, etc." ) # Validate value for specific options if option_name in ('time', 'tries', 'retry', 'ndots', 'bufsize'): try: int_value = int(option_value) if int_value < 0: raise ValueError(f"+{option_name} must be non-negative") except ValueError as e: if "must be" in str(e): raise raise ValueError(f"+{option_name} requires numeric value") validated.append(token) else: option_name = token[1:] if option_name not in self.ALLOWED_QUERY_OPTIONS: raise ValueError( f"Query option not allowed: +{option_name}\n" f"Allowed: +short, +trace, +dnssec, +tcp, +noall, +answer, etc.\n" f"See: {sorted(list(self.ALLOWED_QUERY_OPTIONS))[:20]}" ) validated.append(token) i += 1 continue # Handle standard flags if token.startswith("-"): flag_base = token.split("=")[0] if flag_base not in self.allowed_flags: raise ValueError(f"Flag not allowed: {token}") validated.append(token) # Check if flag requires value if flag_base in self._FLAGS_REQUIRE_VALUE: if i + 1 >= len(tokens): raise ValueError(f"{flag_base} requires a value") value = tokens[i + 1] # Validate specific flag values if flag_base == "-p": # Port validation try: port = int(value) if not (1 <= port <= 65535): raise ValueError(f"Port must be 1-65535, got: {port}") except ValueError as e: if "must be" in str(e): raise raise ValueError(f"Invalid port number: {value}") elif flag_base == "-t": # Record type validation if value.upper() not in self.ALLOWED_RECORD_TYPES: raise ValueError( f"Record type not allowed: {value}\n" f"Allowed: {', '.join(sorted(list(self.ALLOWED_RECORD_TYPES))[:20])}" ) elif flag_base == "-c": # Class validation if value.upper() not in ('IN', 'CH', 'HS', 'ANY'): raise ValueError(f"Invalid class: {value} (use: IN, CH, HS, or ANY)") validated.append(value) i += 2 else: i += 1 continue # Non-flag token - could be: # 1. Record type (A, MX, TXT, etc.) # 2. Domain name (should match target) # 3. Value for previous flag (handled above) # Check if it's a record type if token.upper() in self.ALLOWED_RECORD_TYPES: validated.append(token.upper()) i += 1 continue # Could be domain name (same as target) - allow but log log.debug("dig.additional_token token=%s context=domain_or_value", token) validated.append(token) i += 1 result = " ".join(validated) log.debug("dig.args_validated args_count=%d", len(validated)) return result def _optimize_dig_args(self, validated_args: str) -> str: """ Add helpful defaults for dig queries. Adds default record type (A) if not specified. Does not add output format options - allows dig's default verbose output which is helpful for troubleshooting. Args: validated_args: Validated arguments string Returns: Optimized arguments string """ try: tokens = shlex.split(validated_args) if validated_args else [] except ValueError: tokens = validated_args.split() if validated_args else [] optimized = [] # Check what's present has_record_type = any( t.upper() in self.ALLOWED_RECORD_TYPES for t in tokens if not t.startswith(('-', '@', '+')) ) # Add default record type if missing if not has_record_type and "-x" not in tokens: # Default to A record (reverse lookups don't need type) optimized.append("A") log.debug("dig.optimization added=record_type value=A") # Add original arguments optimized.extend(tokens) result = " ".join(optimized) if len(optimized) != len(tokens): log.info("dig.arguments_optimized original=%d optimized=%d", len(tokens), len(optimized)) return result def _parse_dig_output(self, output: str) -> Dict[str, Any]: """ Parse dig output for DNS records and query information. Dig output format: ; <<>> DiG 9.16.1 <<>> example.com A ;; global options: +cmd ;; Got answer: ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 12345 ;; flags: qr rd ra; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1 ;; ANSWER SECTION: example.com. 300 IN A 93.184.216.34 ;; Query time: 10 msec ;; SERVER: 192.168.1.1#53(192.168.1.1) ;; WHEN: Mon Jan 01 12:00:00 UTC 2024 ;; MSG SIZE rcvd: 56 Args: output: Raw dig output Returns: Dictionary with parsed DNS information """ results = { "answers": [], "authority": [], "additional": [], "query_time": None, "server": None, "flags": [], "status": None, "message_size": None, } # Extract status status_match = self._STATUS_PATTERN.search(output) if status_match: results["status"] = status_match.group(1) # Extract flags flags_match = self._FLAGS_PATTERN.search(output) if flags_match: results["flags"] = flags_match.group(1).strip().split() # Parse sections current_section = None for line in output.split('\n'): # Detect section headers if ";; ANSWER SECTION:" in line: current_section = "answers" continue elif ";; AUTHORITY SECTION:" in line: current_section = "authority" continue elif ";; ADDITIONAL SECTION:" in line: current_section = "additional" continue # End of section if line.startswith(";;") or line.strip() == "": if current_section and not line.startswith(";; "): current_section = None continue # Parse record lines in current section if current_section: match = self._ANSWER_SECTION_PATTERN.match(line) if match: record = { "name": match.group(1), "ttl": int(match.group(2)), "type": match.group(3), "value": match.group(4).strip() } results[current_section].append(record) # Extract query time time_match = self._QUERY_TIME_PATTERN.search(output) if time_match: results["query_time"] = int(time_match.group(1)) # Extract server server_match = self._SERVER_PATTERN.search(output) if server_match: results["server"] = { "ip": server_match.group(1), "port": int(server_match.group(2)) } # Extract message size msg_size_pattern = re.compile(r'MSG SIZE\s+rcvd:\s+(\d+)') msg_match = msg_size_pattern.search(output) if msg_match: results["message_size"] = int(msg_match.group(1)) log.debug("dig.output_parsed answers=%d authority=%d additional=%d query_time=%s status=%s", len(results["answers"]), len(results["authority"]), len(results["additional"]), results["query_time"], results["status"]) return results def _get_timestamp(self) -> datetime: """ Get current timestamp with timezone. Returns: Current UTC timestamp """ return datetime.now(timezone.utc) def get_tool_info(self) -> Dict[str, Any]: """ Get comprehensive tool information. Returns: Dictionary with complete tool metadata including DNS capabilities and usage examples """ base_info = super().get_tool_info() dig_info = { "dig_specific": { "capabilities": { "record_types": sorted(list(self.ALLOWED_RECORD_TYPES)), "record_types_count": len(self.ALLOWED_RECORD_TYPES), "query_options": sorted(list(self.ALLOWED_QUERY_OPTIONS)), "query_options_count": len(self.ALLOWED_QUERY_OPTIONS), }, "common_record_types": { "A": "IPv4 address", "AAAA": "IPv6 address", "MX": "Mail exchange", "NS": "Name server", "TXT": "Text records", "SOA": "Start of authority", "CNAME": "Canonical name", "PTR": "Pointer (reverse DNS)", "SRV": "Service locator", "CAA": "Certificate authority authorization" }, "common_query_options": { "+short": "Short output (answers only)", "+noall +answer": "Clean answer section only", "+trace": "Trace DNS delegation path", "+dnssec": "Request DNSSEC records", "+tcp": "Use TCP instead of UDP", "+multiline": "Verbose multi-line output" }, "safety_features": [ "DNS server authorization (RFC1918/.lab.internal)", "Target validation (base class)", "Record type whitelist (all standard types)", "Query option whitelist (all safe options)", "Read-only operations (no modifications)", "Low-risk information gathering", "Higher concurrency (5 simultaneous queries)", "Quick timeouts (30s default)" ], "usage_examples": [ { "description": "Basic A record query", "input": { "target": "server.lab.internal", "extra_args": "A" }, "command": "dig server.lab.internal A" }, { "description": "Query specific DNS server", "input": { "target": "server.lab.internal", "extra_args": "@192.168.1.1 A" }, "command": "dig @192.168.1.1 server.lab.internal A" }, { "description": "MX records with short output", "input": { "target": "company.lab.internal", "extra_args": "MX +short" }, "command": "dig company.lab.internal MX +short" }, { "description": "Reverse DNS lookup", "input": { "target": "192.168.1.10", "extra_args": "-x" }, "command": "dig -x 192.168.1.10" }, { "description": "TXT records (clean output)", "input": { "target": "server.lab.internal", "extra_args": "TXT +noall +answer" }, "command": "dig server.lab.internal TXT +noall +answer" }, { "description": "Trace DNS delegation", "input": { "target": "server.lab.internal", "extra_args": "A +trace" }, "command": "dig server.lab.internal A +trace" }, { "description": "DNSSEC validation", "input": { "target": "server.lab.internal", "extra_args": "A +dnssec +multiline" }, "command": "dig server.lab.internal A +dnssec +multiline" }, { "description": "Service locator (SRV) records", "input": { "target": "_ldap._tcp.company.lab.internal", "extra_args": "SRV +short" }, "command": "dig _ldap._tcp.company.lab.internal SRV +short" }, { "description": "All records (ANY query)", "input": { "target": "server.lab.internal", "extra_args": "ANY +noall +answer" }, "command": "dig server.lab.internal ANY +noall +answer" } ], "use_cases": [ "DNS troubleshooting and validation", "Service discovery (SRV records)", "Mail server enumeration (MX records)", "Name server identification (NS records)", "Reverse DNS lookups (PTR records)", "SPF/DMARC record verification (TXT records)", "DNSSEC validation", "Network reconnaissance (authorized)" ], "complementary_tools": { "before_nmap": "Discover hosts via DNS before port scanning", "before_masscan": "Enumerate services via SRV records", "troubleshooting": "Verify DNS configuration for targets" } } } base_info.update(dig_info) return base_info def validate_configuration(self) -> Dict[str, Any]: """ Validate current tool configuration. Checks: - Command availability - Configuration settings - Circuit breaker state Returns: Dictionary with validation results """ issues = [] warnings = [] # Check if dig is available if not self._resolve_command(): issues.append("dig command not found in PATH") # Check circuit breaker state if self._circuit_breaker: try: from mcp_server.circuit_breaker import CircuitBreakerState if self._circuit_breaker.state == CircuitBreakerState.OPEN: warnings.append("Circuit breaker is currently OPEN") except Exception: pass # Check timeout if self.default_timeout_sec < 5: warnings.append(f"Very low timeout: {self.default_timeout_sec}s") return { "valid": len(issues) == 0, "issues": issues, "warnings": warnings, "configuration": { "command_available": self._resolve_command() is not None, "timeout": self.default_timeout_sec, "concurrency": self.concurrency, "record_types_count": len(self.ALLOWED_RECORD_TYPES), "query_options_count": len(self.ALLOWED_QUERY_OPTIONS), "risk_level": "low", "operation_type": "read-only" } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nordeim/Security-MCP-Server-v3'

If you have feedback or need assistance with the MCP directory API, please join our Discord server