Skip to main content
Glama

Bug Bounty MCP Server

by SlanyCukr
nuclei.py9.99 kB
"""nuclei tool implementation.""" import json import logging from datetime import datetime from flask import request from src.rest_api_server.utils.commands import execute_command from src.rest_api_server.utils.registry import tool logger = logging.getLogger(__name__) # Aggressive preset for nuclei AGGRESSIVE_PRESET = { "severity": "info,low,medium,high,critical", "tags": "", # Use all templates "concurrency": 100, "timeout": 30, "additional_args": "-es info -etags intrusive", # Include intrusive templates } def apply_aggressive_preset(user_params: dict, aggressive: bool = False) -> dict: """Apply aggressive preset to user parameters if aggressive=True.""" if not aggressive: return user_params # Start with user params and apply aggressive preset merged_params = user_params.copy() # Apply aggressive preset for parameters not explicitly set by user for key, aggressive_value in AGGRESSIVE_PRESET.items(): if key not in user_params: merged_params[key] = aggressive_value else: # For certain key parameters, use aggressive values if user set defaults if key in ["concurrency", "severity"] and user_params.get(key) in [ 25, None, "", ]: merged_params[key] = aggressive_value return merged_params def extract_nuclei_params(data): """Extract nuclei parameters from request data.""" # Check for aggressive mode aggressive = data.get("aggressive", False) params = { "target": data["target"], "severity": data.get("severity"), "tags": data.get("tags"), "template": data.get("template"), "template_id": data.get("template_id"), "exclude_id": data.get("exclude_id"), "exclude_tags": data.get("exclude_tags"), "concurrency": data.get("concurrency", 25), "timeout": data.get("timeout"), "additional_args": data.get("additional_args"), # Add support for custom template directories "template_dir": data.get("template_dir"), "custom_templates": data.get("custom_templates"), # Add support for multiple targets "targets_file": data.get("targets_file"), "target_list": data.get("target_list"), } # Apply aggressive preset if requested return apply_aggressive_preset(params, aggressive) def build_nuclei_command(params): """Build nuclei command from parameters.""" cmd_parts = ["nuclei"] # Handle target(s) if params.get("targets_file"): cmd_parts.extend(["-l", params["targets_file"]]) elif params.get("target_list") and isinstance(params["target_list"], list): # Create a temporary file for multiple targets import tempfile with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: for target in params["target_list"]: f.write(f"{target}\n") cmd_parts.extend(["-l", f.name]) else: cmd_parts.extend(["-u", params["target"]]) # Template configuration if params.get("template_dir"): cmd_parts.extend(["-t", params["template_dir"]]) elif params.get("custom_templates"): if isinstance(params["custom_templates"], list): for template in params["custom_templates"]: cmd_parts.extend(["-t", template]) else: cmd_parts.extend(["-t", params["custom_templates"]]) elif params.get("template"): cmd_parts.extend(["-t", params["template"]]) # Other parameters if params["severity"]: cmd_parts.extend(["-severity", params["severity"]]) if params["tags"]: cmd_parts.extend(["-tags", params["tags"]]) if params["template_id"]: cmd_parts.extend(["-template-id", params["template_id"]]) if params["exclude_id"]: cmd_parts.extend(["-exclude-id", params["exclude_id"]]) if params["exclude_tags"]: cmd_parts.extend(["-exclude-tags", params["exclude_tags"]]) if params["concurrency"] != 25: cmd_parts.extend(["-c", str(params["concurrency"])]) if params["timeout"]: cmd_parts.extend(["-timeout", str(params["timeout"])]) if params["additional_args"]: cmd_parts.extend(params["additional_args"].split()) cmd_parts.extend(["-json"]) return " ".join(cmd_parts) def parse_nuclei_output(stdout: str) -> tuple[list[dict], dict]: """Parse nuclei JSON output and extract findings with statistics.""" findings = [] stats = {"findings": 0, "dupes": 0, "payload_bytes": len(stdout.encode("utf-8"))} seen_findings = set() # For deduplication if not stdout.strip(): return findings, stats try: # Parse JSON line-by-line for findings lines = stdout.strip().split("\n") for line in lines: line = line.strip() if not line: continue try: result = json.loads(line) if not isinstance(result, dict): continue # Extract finding information template_id = result.get("template-id", "") template_name = result.get("info", {}).get("name", "") severity = result.get("info", {}).get("severity", "info") matched_at = result.get("matched-at", "") # Skip if no meaningful data if not template_id and not matched_at: continue # Create deduplication key dedup_key = f"{template_id}:{matched_at}" if dedup_key in seen_findings: stats["dupes"] += 1 continue seen_findings.add(dedup_key) # Build evidence from nuclei result evidence = { "template_id": template_id, "template_name": template_name, "matched_at": matched_at, } # Add extracted data if available if "extracted-results" in result: evidence["extracted_results"] = result["extracted-results"] if "matcher-name" in result: evidence["matcher_name"] = result["matcher-name"] if "curl-command" in result: evidence["curl_command"] = result["curl-command"] # Add request/response info if available if "request" in result: evidence["request"] = result["request"] if "response" in result: evidence["response"] = result["response"] # Determine target from matched URL target = matched_at if matched_at else result.get("host", "") # Build tags from template info tags = [] template_info = result.get("info", {}) if "tags" in template_info: if isinstance(template_info["tags"], list): tags = template_info["tags"] else: tags = [template_info["tags"]] # Add template ID as tag if not already included if template_id and template_id not in tags: tags.append(template_id) finding = { "type": "vuln", "target": target, "evidence": evidence, "severity": severity, # Direct mapping, no conversion "confidence": "high", # High confidence for template matches "tags": tags, "raw_ref": f"nuclei_{len(findings)}", } findings.append(finding) stats["findings"] += 1 except json.JSONDecodeError as e: logger.debug( f"Failed to parse nuclei JSON line: {line[:100]}... Error: {e}" ) continue except Exception as e: logger.warning(f"Error processing nuclei result line: {e}") continue except Exception as e: logger.error(f"Failed to parse nuclei output: {e}") return findings, stats def parse_nuclei_result(execution_result, params, command, started_at, ended_at): """Parse nuclei execution result and format unified response.""" duration_ms = int((ended_at - started_at).total_seconds() * 1000) # Base response structure response = { "tool": "nuclei", "params": params, "started_at": started_at.isoformat(), "ended_at": ended_at.isoformat(), "duration_ms": duration_ms, "findings": [], "stats": {"findings": 0, "dupes": 0, "payload_bytes": 0}, "success": execution_result["success"], } # Parse findings if execution was successful if execution_result["success"] and execution_result.get("stdout"): findings, stats = parse_nuclei_output(execution_result["stdout"]) response["findings"] = findings response["stats"] = stats # Include error information if execution failed if not execution_result["success"]: response["error"] = { "message": execution_result.get("stderr", "Command execution failed"), "return_code": execution_result.get("return_code", -1), } return response @tool() def execute_nuclei(): """Execute Nuclei vulnerability scanner.""" data = request.get_json() params = extract_nuclei_params(data) logger.info(f"Executing Nuclei scan on {params['target']}") started_at = datetime.now() command = build_nuclei_command(params) execution_result = execute_command(command, timeout=600) ended_at = datetime.now() return parse_nuclei_result(execution_result, params, command, started_at, ended_at)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SlanyCukr/bugbounty-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server