Skip to main content
Glama
SlanyCukr

Bug Bounty MCP Server

by SlanyCukr

httpx_probe

Probe HTTP services to identify active endpoints, analyze responses, and filter results by status codes or content length for security assessments.

Instructions

Execute HTTPx for HTTP probing with enhanced logging.

Args: targets: Target URLs or IPs target_file: File containing targets ports: Ports to probe methods: HTTP methods to use status_code: Filter by status code content_length: Show content length output_file: Output file path additional_args: Additional HTTPx arguments

Returns: HTTP probing results

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
additional_argsNo
content_lengthNo
methodsNoGET
output_fileNo
portsNo
status_codeNo
target_fileNo
targetsNo

Implementation Reference

  • MCP tool 'httpx_probe' handler: FastMCP-decorated function that proxies HTTP probing requests to the REST API endpoint '/api/httpx'.
    @mcp.tool()
    def httpx_probe(
        targets: str = "",
        target_file: str = "",
        ports: str = "",
        methods: str = "GET",
        status_code: str = "",
        content_length: bool = False,
        output_file: str = "",
        additional_args: str = "",
    ) -> dict[str, Any]:
        """Execute HTTPx for HTTP probing with enhanced logging.
    
        Args:
            targets: Target URLs or IPs
            target_file: File containing targets
            ports: Ports to probe
            methods: HTTP methods to use
            status_code: Filter by status code
            content_length: Show content length
            output_file: Output file path
            additional_args: Additional HTTPx arguments
    
        Returns:
            HTTP probing results
        """
        data = {
            "targets": targets,
            "target_file": target_file,
            "ports": ports,
            "methods": methods,
            "status_code": status_code,
            "content_length": content_length,
            "output_file": output_file,
            "additional_args": additional_args,
        }
    
        logger.info("๐ŸŒ Starting HTTPx probing")
        result = api_client.safe_post("api/httpx", data)
    
        if result.get("success"):
            logger.info("โœ… HTTPx probing completed")
        else:
            logger.error("โŒ HTTPx probing failed")
    
        return result
  • REST API handler for httpx tool: Builds and executes httpx command, parses JSON output into structured findings.
    @tool(required_fields=[])
    def execute_httpx():
        """Execute HTTPx for HTTP probing."""
        data = request.get_json()
        params = extract_httpx_params(data)
    
        logger.info("Executing HTTPx on targets")
    
        started_at = datetime.now()
        command, temp_file = prepare_httpx_targets(params)
    
        try:
            execution_result = execute_command(command, timeout=600)
            ended_at = datetime.now()
            return parse_httpx_result(
                execution_result, params, command, started_at, ended_at
            )
        finally:
            # Clean up temporary file if created
            if temp_file and os.path.exists(temp_file):
                try:
                    os.unlink(temp_file)
                except Exception:
                    pass
  • Helper function to extract and normalize httpx parameters from incoming request data.
    def extract_httpx_params(data):
        """Extract httpx parameters from request data."""
        return {
            "targets": data.get("targets", ""),
            "target_file": data.get("target_file", ""),
            "status_code": data.get("status_code", False) or data.get("sc", False),
            "content_length": data.get("content_length", False) or data.get("cl", False),
            "title": data.get("title", True),
            "tech_detect": data.get("tech_detect", False) or data.get("tech", False),
            "web_server": data.get("web_server", False) or data.get("server", False),
            "location": data.get("location", False),
            "response_time": data.get("response_time", False) or data.get("rt", False),
            "method": data.get("method", ""),
            "methods": data.get("methods", ""),
            "match_code": data.get("match_code", "") or data.get("mc", ""),
            "filter_code": data.get("filter_code", "") or data.get("fc", ""),
            "threads": data.get("threads", 50),
            "timeout": data.get("timeout", 10),
            "follow_redirects": data.get("follow_redirects", False),
            "follow_host_redirects": data.get("follow_host_redirects", False),
            "json": data.get("json", False),
            "ports": data.get("ports", ""),
            "silent": data.get("silent", True),
            "additional_args": data.get("additional_args", ""),
        }
  • Helper function to construct the full httpx CLI command based on parameters.
    def build_httpx_command(params):
        """Build httpx command from parameters."""
        cmd_parts = ["httpx"]
    
        # Handle targets - either from direct input or file
        if params["targets"]:
            cmd_parts.extend(["-l", params["targets"]])
        elif params["target_file"]:
            cmd_parts.extend(["-l", params["target_file"]])
    
        # Add common httpx parameters
        if params["status_code"]:
            cmd_parts.append("-sc")
        if params["content_length"]:
            cmd_parts.append("-cl")
        if params["title"]:
            cmd_parts.append("-title")
        if params["tech_detect"]:
            cmd_parts.append("-tech-detect")
        if params["web_server"]:
            cmd_parts.append("-server")
        if params["location"]:
            cmd_parts.append("-location")
        if params["response_time"]:
            cmd_parts.append("-rt")
        if params["method"] and params["method"] != "GET":
            cmd_parts.extend(["-X", params["method"]])
        if params["methods"] and params["methods"] != "GET":
            cmd_parts.extend(["-X", params["methods"]])
    
        # Match codes (mc) and filter codes (fc)
        if params["match_code"]:
            cmd_parts.extend(["-mc", str(params["match_code"])])
        if params["filter_code"]:
            cmd_parts.extend(["-fc", str(params["filter_code"])])
    
        # Threads
        if params["threads"] != 50:
            cmd_parts.extend(["-threads", str(params["threads"])])
    
        # Timeout
        if params["timeout"] != 10:
            cmd_parts.extend(["-timeout", str(params["timeout"])])
    
        # Follow redirects
        if params["follow_redirects"]:
            cmd_parts.append("-follow-redirects")
        if params["follow_host_redirects"]:
            cmd_parts.append("-follow-host-redirects")
    
        # Always use JSON output for structured parsing
        cmd_parts.append("-json")
    
        # Ports
        if params["ports"]:
            cmd_parts.extend(["-ports", str(params["ports"])])
    
        # Silent mode
        if params["silent"]:
            cmd_parts.append("-silent")
    
        # Additional arguments
        if params["additional_args"]:
            cmd_parts.extend(params["additional_args"].split())
    
        return cmd_parts
  • Helper function to parse httpx JSONL output into standardized finding structures.
    def parse_httpx_output(stdout: str) -> list[dict[str, Any]]:
        """Parse httpx JSON output into structured findings."""
        findings = []
    
        if not stdout.strip():
            return findings
    
        for line in stdout.strip().split("\n"):
            line = line.strip()
            if not line:
                continue
    
            try:
                # Each line should be a JSON object
                result = json.loads(line)
    
                # Extract endpoint information
                url = result.get("url", "")
                status_code = result.get("status_code", 0)
                content_length = result.get("content_length", 0)
                title = result.get("title", "")
                tech_detect = result.get("tech", [])
                web_server = result.get("webserver", "")
                response_time = result.get("response_time", "")
                location = result.get("location", "")
    
                if url:
                    parsed_url = urlparse(url)
                    host = parsed_url.netloc
    
                    # Simple technology processing - use directly
                    techs = tech_detect if isinstance(tech_detect, list) else []
                    if web_server and web_server not in techs:
                        techs.append(web_server)
    
                    # Simple tags
                    tags = ["endpoint", "http", "httpx"]
                    if status_code:
                        tags.append(f"status-{status_code}")
                    if parsed_url.scheme == "https":
                        tags.append("https")
                    else:
                        tags.append("http")
    
                    finding = create_finding(
                        finding_type="endpoint",
                        target=host,
                        evidence={
                            "url": url,
                            "status_code": status_code,
                            "content_length": content_length,
                            "title": title,
                            "technologies": techs,
                            "web_server": web_server,
                            "response_time": response_time,
                            "location": location,
                            "scheme": parsed_url.scheme,
                            "port": parsed_url.port,
                            "path": parsed_url.path,
                            "discovered_by": "httpx",
                        },
                        severity="info",
                        confidence="medium",
                        tags=tags,
                        raw_ref=line,
                    )
                    findings.append(finding)
    
            except json.JSONDecodeError as e:
                logger.warning(f"Failed to parse httpx JSON line: {line} - {e}")
                continue
    
        return findings

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SlanyCukr/bugbounty-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server