Skip to main content
Glama
SlanyCukr

Bug Bounty MCP Server

by SlanyCukr

amass_scan

Execute subdomain enumeration on target domains using Amass to identify attack surfaces for security assessments and reconnaissance.

Instructions

Execute Amass for subdomain enumeration with enhanced logging.

Args: domain: Target domain for enumeration mode: Amass mode (enum, intel, viz) additional_args: Additional Amass arguments

Returns: Subdomain enumeration results

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
additional_argsNo
domainYes
modeNoenum

Implementation Reference

  • MCP handler and registration for the 'amass_scan' tool, which proxies requests to the REST API endpoint '/api/amass'.
    @mcp.tool()
    def amass_scan(
        domain: str, mode: str = "enum", additional_args: str = ""
    ) -> dict[str, Any]:
        """Execute Amass for subdomain enumeration with enhanced logging.
    
        Args:
            domain: Target domain for enumeration
            mode: Amass mode (enum, intel, viz)
            additional_args: Additional Amass arguments
    
        Returns:
            Subdomain enumeration results
        """
        data = {"domain": domain, "mode": mode, "additional_args": additional_args}
    
        logger.info(f"🔍 Starting Amass subdomain enumeration for {domain}")
        result = api_client.safe_post("api/amass", data)
    
        if result.get("success"):
            logger.info(f"✅ Amass enumeration completed for {domain}")
        else:
            logger.error("❌ Amass enumeration failed")
    
        return result
  • Primary handler function 'execute_amass' for the REST API /api/amass endpoint that executes the amass command and processes output.
    @tool(required_fields=["target"])
    def execute_amass():
        """Execute Amass for advanced subdomain enumeration."""
        data = request.get_json()
        params = extract_amass_params(data)
    
        logger.info(f"Executing Amass on {params['domain']}")
    
        started_at = datetime.now()
        command = build_amass_command(params)
        timeout_minutes = params.get("timeout_minutes")
        if timeout_minutes is None:
            timeout = 1800
        else:
            timeout = int(timeout_minutes) * 60
        execution_result = execute_command(command, timeout=timeout)
        ended_at = datetime.now()
    
        return parse_amass_output(execution_result, params, command, started_at, ended_at)
  • Helper function that constructs the full amass CLI command based on input parameters.
    def build_amass_command(params: dict) -> str:
        """Build amass command from parameters."""
        cmd_parts = ["amass"]
    
        cmd_parts.append(params["mode"])
        cmd_parts.extend(["-d", params["domain"]])
    
        # Core enumeration parameters
        if params["active"]:
            cmd_parts.append("-active")
        if params["brute"]:
            cmd_parts.append("-brute")
        if params["passive"]:
            cmd_parts.append("-passive")
    
        # Wordlist and dictionary parameters
        if params["wordlist"]:
            cmd_parts.extend(["-w", params["wordlist"]])
        if params["wordlist_mask"]:
            cmd_parts.extend(["-wm", params["wordlist_mask"]])
        if params["alterations"]:
            cmd_parts.append("-alts")
    
        # Output and information parameters
        if params["show_sources"]:
            cmd_parts.append("-src")
        if params["show_ips"]:
            cmd_parts.append("-ip")
        if params["include_unresolved"]:
            cmd_parts.append("-include-unresolvable")
    
        # Data source parameters
        if params["data_sources"]:
            cmd_parts.extend(["-include", params["data_sources"]])
        if params["exclude_sources"]:
            cmd_parts.extend(["-exclude", params["exclude_sources"]])
    
        # Certificate transparency configuration
        for source in params.get("ct_sources", []):
            cmd_parts.extend(["-src", source])
        if params.get("ct_timeout"):
            cmd_parts.extend(["-timeout", str(params["ct_timeout"])])
    
        # Performance and rate limiting parameters
        if params["timeout_minutes"]:
            timeout_seconds = int(params["timeout_minutes"]) * 60
            cmd_parts.extend(["-timeout", str(timeout_seconds)])
        if params["max_depth"]:
            cmd_parts.extend(["-max-depth", str(params["max_depth"])])
        if params["dns_qps"]:
            cmd_parts.extend(["-dns-qps", str(params["dns_qps"])])
        if params["resolvers_qps"]:
            cmd_parts.extend(["-resolvers-qps", str(params["resolvers_qps"])])
        if params["min_recursive"]:
            cmd_parts.extend(["-min-recursive", str(params["min_recursive"])])
        if params["max_dns_queries"]:
            cmd_parts.extend(["-max-dns-queries", str(params["max_dns_queries"])])
    
        # Network configuration parameters
        if params["resolvers_file"]:
            cmd_parts.extend(["-r", params["resolvers_file"]])
        if params["trusted_resolvers"]:
            cmd_parts.extend(["-tr", params["trusted_resolvers"]])
        if params["blacklist_file"]:
            cmd_parts.extend(["-bl", params["blacklist_file"]])
        if params["no_dns"]:
            cmd_parts.append("-no-dns")
    
        # Configuration and output parameters
        if params["config_file"]:
            cmd_parts.extend(["-config", params["config_file"]])
        if params["output_file"]:
            cmd_parts.extend(["-o", params["output_file"]])
        if params["log_file"]:
            cmd_parts.extend(["-log", params["log_file"]])
    
        # Verbosity parameters
        if params["verbose"]:
            cmd_parts.append("-v")
        if params["silent"]:
            cmd_parts.append("-silent")
        if params["debug"]:
            cmd_parts.append("-debug")
    
        # Intel mode specific parameters
        if params["mode"] == "intel":
            if params["whois"]:
                cmd_parts.append("-whois")
            if params["asn"]:
                cmd_parts.append("-asn")
            if params["cidr"]:
                cmd_parts.append("-cidr")
            if params["org"]:
                cmd_parts.append("-org")
    
        # Advanced parameters
        if params["exclude_disabled"]:
            cmd_parts.append("-exclude-disabled")
        if params["scripts_only"]:
            cmd_parts.append("-scripts-only")
    
        # Visualization mode parameters
        if params["mode"] == "viz":
            if params["viz_input_file"]:
                cmd_parts.extend(["-i", params["viz_input_file"]])
            if params["viz_output_file"]:
                cmd_parts.extend(["-o", params["viz_output_file"]])
    
        # Handle additional arguments
        if params["additional_args"]:
            additional_parts = params["additional_args"].split()
            cmd_parts.extend(additional_parts)
    
        return " ".join(cmd_parts)
  • Helper function that parses the raw amass output into structured JSON findings list.
    def parse_amass_output(
        execution_result: dict[str, Any],
        params: dict,
        command: str,
        started_at: datetime,
        ended_at: datetime,
    ) -> dict[str, Any]:
        """Parse amass text output into structured findings."""
        duration_ms = int((ended_at - started_at).total_seconds() * 1000)
    
        if not execution_result["success"]:
            return {
                "success": False,
                "tool": "amass",
                "params": params,
                "command": command,
                "started_at": started_at.isoformat(),
                "ended_at": ended_at.isoformat(),
                "duration_ms": duration_ms,
                "error": execution_result.get("error", "Command execution failed"),
                "findings": [],
                "stats": {"findings": 0, "dupes": 0, "payload_bytes": 0},
            }
    
        stdout = execution_result.get("stdout", "")
        with open("/tmp/amass_raw_output.log", "w") as f:
            f.write(stdout)
        logger.info("Amass raw stdout logged to /tmp/amass_raw_output.log")
        findings = []
        seen_subdomains = set()
    
        # Parse text output - each line is a subdomain
        for line in stdout.strip().split("\n"):
            line = line.strip()
            if not line:
                continue
    
            # Basic subdomain validation
            if "." in line and not line.startswith("."):
                subdomain = line
                if subdomain not in seen_subdomains:
                    seen_subdomains.add(subdomain)
    
                    finding = {
                        "type": "subdomain",
                        "target": subdomain,
                        "evidence": {
                            "subdomain": subdomain,
                            "domain": params.get("domain", ""),
                            "discovered_by": "amass",
                        },
                        "severity": "info",
                        "confidence": "medium",
                        "tags": ["subdomain", "text_output"],
                        "raw_ref": line,
                    }
                    findings.append(finding)
    
        payload_bytes = len(stdout.encode("utf-8"))
    
        return {
            "success": True,
            "tool": "amass",
            "params": params,
            "command": command,
            "started_at": started_at.isoformat(),
            "ended_at": ended_at.isoformat(),
            "duration_ms": duration_ms,
            "findings": findings,
            "stats": {
                "findings": len(findings),
                "dupes": 0,
                "payload_bytes": payload_bytes,
            },
        }
  • Helper function to extract and default input parameters for amass from the API request.
    def extract_amass_params(data: dict) -> dict:
        """Extract amass parameters from request data."""
        return {
            "domain": data.get("target", ""),
            "mode": data.get("mode", "enum"),
            "active": data.get("active", False),
            "brute": data.get("brute", False),
            "passive": data.get("passive", True),
            "wordlist": data.get("wordlist"),
            "wordlist_mask": data.get("wordlist_mask"),
            "alterations": data.get("alterations", False),
            "show_sources": data.get("show_sources", False),
            "show_ips": data.get("show_ips", False),
            "include_unresolved": data.get("include_unresolved", False),
            "data_sources": data.get("data_sources"),
            "exclude_sources": data.get("exclude_sources"),
            "timeout_minutes": data.get("timeout_minutes"),
            "max_depth": data.get("max_depth", 0),
            "dns_qps": data.get("dns_qps"),
            "resolvers_qps": data.get("resolvers_qps"),
            "min_recursive": data.get("min_recursive", 0),
            "max_dns_queries": data.get("max_dns_queries"),
            "resolvers_file": data.get("resolvers_file"),
            "trusted_resolvers": data.get("trusted_resolvers"),
            "blacklist_file": data.get("blacklist_file"),
            "no_dns": data.get("no_dns", False),
            "config_file": data.get("config_file"),
            "output_file": data.get("output_file"),
            "log_file": data.get("log_file"),
            "verbose": data.get("verbose", False),
            "silent": data.get("silent", False),
            "debug": data.get("debug", False),
            "whois": data.get("whois", False),
            "asn": data.get("asn", False),
            "cidr": data.get("cidr", False),
            "org": data.get("org", False),
            "exclude_disabled": data.get("exclude_disabled", True),
            "scripts_only": data.get("scripts_only", False),
            "viz_input_file": data.get("viz_input_file"),
            "viz_output_file": data.get("viz_output_file"),
            "additional_args": data.get("additional_args"),
            "ct_sources": data.get("ct_sources", ["crt.sh", "google"]),
            "ct_timeout": data.get("ct_timeout"),
        }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SlanyCukr/bugbounty-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server