Skip to main content
Glama
sast_server.py68.9 kB
#!/usr/bin/env python3 """ ================================================================================ MCP-SAST-Server - Security Analysis Server for Claude Code ================================================================================ A comprehensive SAST (Static Application Security Testing) server that provides security code analysis tools through HTTP API endpoints. Designed to work with the MCP (Model Context Protocol) client for Claude Code integration. FEATURES: - 15+ security scanning tools integration - Cross-platform path resolution (Windows ↔ Linux) - Timeout handling for long-running scans - JSON output for easy parsing - Health check endpoint for monitoring SUPPORTED TOOLS: Code Analysis: - Semgrep: Multi-language static analysis (30+ languages) - Bandit: Python security scanner - ESLint Security: JavaScript/TypeScript security - Gosec: Go security checker - Brakeman: Ruby on Rails security scanner - Graudit: Grep-based code auditing - Bearer: Security and privacy risk scanner Secret Detection: - TruffleHog: Secrets scanner for repos and filesystems - Gitleaks: Git secrets detector Dependency Scanning: - Safety: Python dependency checker - npm audit: Node.js dependency checker - OWASP Dependency-Check: Multi-language scanner Infrastructure as Code: - Checkov: Terraform, CloudFormation, Kubernetes scanner - tfsec: Terraform security scanner - Trivy: Container and IaC vulnerability scanner CONFIGURATION: Set via environment variables or .env file: - API_PORT: Server port (default: 6000) - DEBUG_MODE: Enable debug logging (default: 0) - COMMAND_TIMEOUT: Scan timeout in seconds (default: 3600) - MOUNT_POINT: Linux mount path (default: /mnt/work) - WINDOWS_BASE: Windows base path (default: F:/work) USAGE: python3 sast_server.py --port 6000 python3 sast_server.py --port 6000 --debug AUTHOR: MCP-SAST-Server Contributors LICENSE: MIT ================================================================================ """ import argparse import json import logging import os import subprocess import sys import traceback import threading import re from typing import Dict, Any, List, Optional from flask import Flask, request, jsonify from datetime import datetime import tempfile import shutil # ============================================================================ # ENVIRONMENT & CONFIGURATION # ============================================================================ # Load environment variables from .env file if it exists try: from dotenv import load_dotenv load_dotenv() except ImportError: # python-dotenv not installed, will use system environment variables pass # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler(sys.stdout)] ) logger = logging.getLogger(__name__) # Server Configuration API_PORT = int(os.environ.get("API_PORT", 6000)) DEBUG_MODE = os.environ.get("DEBUG_MODE", "0").lower() in ("1", "true", "yes", "y") COMMAND_TIMEOUT = int(os.environ.get("COMMAND_TIMEOUT", 3600)) # 1 hour default MAX_TIMEOUT = int(os.environ.get("MAX_TIMEOUT", 86400)) # 24 hours default (was 2 hours) # Tool-specific timeouts (in seconds) - configurable via environment NIKTO_TIMEOUT = int(os.environ.get("NIKTO_TIMEOUT", 3600)) # 1 hour NMAP_TIMEOUT = int(os.environ.get("NMAP_TIMEOUT", 7200)) # 2 hours SQLMAP_TIMEOUT = int(os.environ.get("SQLMAP_TIMEOUT", 7200)) # 2 hours WPSCAN_TIMEOUT = int(os.environ.get("WPSCAN_TIMEOUT", 3600)) # 1 hour DIRB_TIMEOUT = int(os.environ.get("DIRB_TIMEOUT", 7200)) # 2 hours LYNIS_TIMEOUT = int(os.environ.get("LYNIS_TIMEOUT", 1800)) # 30 minutes SNYK_TIMEOUT = int(os.environ.get("SNYK_TIMEOUT", 3600)) # 1 hour CLAMAV_TIMEOUT = int(os.environ.get("CLAMAV_TIMEOUT", 14400)) # 4 hours SEMGREP_TIMEOUT = int(os.environ.get("SEMGREP_TIMEOUT", 7200)) # 2 hours BANDIT_TIMEOUT = int(os.environ.get("BANDIT_TIMEOUT", 1800)) # 30 minutes TRUFFLEHOG_TIMEOUT = int(os.environ.get("TRUFFLEHOG_TIMEOUT", 3600)) # 1 hour # Path Resolution Configuration # These settings enable cross-platform operation (Windows client -> Linux server) MOUNT_POINT = os.environ.get("MOUNT_POINT", "/mnt/work") # Linux mount point WINDOWS_BASE = os.environ.get("WINDOWS_BASE", "F:/work") # Windows base path # Initialize Flask application app = Flask(__name__) # ============================================================================ # PATH RESOLUTION # ============================================================================ def resolve_windows_path(windows_path: str) -> str: """ Convert Windows path to Linux mount path Mount mapping: F:/ <-> /mnt/work Examples: F:/MyProject/file.txt -> /mnt/work/MyProject/file.txt F:\\work\\project\\scan.json -> /mnt/work/work/project/scan.json F:/scan-results.txt -> /mnt/work/scan-results.txt """ # Normalize path separators normalized_path = windows_path.replace('\\', '/') logger.info(f"Resolving path: {windows_path} -> normalized: {normalized_path}") # Try different Windows path patterns # F:/ -> /mnt/work patterns = [ (r'^F:/', '/mnt/work/'), # F:/... -> /mnt/work/... (r'^F:$', '/mnt/work'), # F: -> /mnt/work (r'^/f/', '/mnt/work/'), # Git bash: /f/... -> /mnt/work/... (r'^/f$', '/mnt/work'), # Git bash: /f -> /mnt/work (r'^f:/', '/mnt/work/'), # Lowercase: f:/... -> /mnt/work/... (r'^f:$', '/mnt/work'), # Lowercase: f: -> /mnt/work ] for pattern, replacement in patterns: if re.match(pattern, normalized_path, re.IGNORECASE): # Replace the Windows base with Linux mount point linux_path = re.sub(pattern, replacement, normalized_path, flags=re.IGNORECASE) logger.info(f"✓ Pattern matched: {pattern}") logger.info(f"✓ Path resolved: {windows_path} -> {linux_path}") # Verify path exists if os.path.exists(linux_path): logger.info(f"✓ Path exists: {linux_path}") return linux_path else: logger.warning(f"⚠ Resolved path does not exist: {linux_path}") # Return it anyway, let the tool fail with proper error return linux_path # If path is already a valid Linux path starting with /mnt/work, return as-is if normalized_path.startswith('/mnt/work'): logger.info(f"✓ Path already valid Linux path: {normalized_path}") return normalized_path # If path starts with / and exists, it's already a Linux path if normalized_path.startswith('/') and os.path.exists(normalized_path): logger.info(f"✓ Path is valid Linux path: {normalized_path}") return normalized_path # If no pattern matched, return original logger.warning(f"⚠ Could not resolve path: {windows_path}") logger.warning(f"⚠ Returning original path as-is") return windows_path def verify_mount() -> Dict[str, Any]: """ Verify that the Windows share is mounted and accessible Returns dict with status information """ issues = [] # Check if mount point exists if not os.path.exists(MOUNT_POINT): issues.append(f"Mount point does not exist: {MOUNT_POINT}") # Check if mount point is actually mounted elif not os.path.ismount(MOUNT_POINT): # Try to check if it's a directory with files (might not show as mount on all systems) try: files = os.listdir(MOUNT_POINT) if not files: issues.append(f"Mount point exists but appears empty: {MOUNT_POINT}") except PermissionError: issues.append(f"No read permission on mount point: {MOUNT_POINT}") except Exception as e: issues.append(f"Error accessing mount point: {str(e)}") # Try to test read access else: try: os.listdir(MOUNT_POINT) except PermissionError: issues.append(f"No read permission on mount point: {MOUNT_POINT}") except Exception as e: issues.append(f"Error reading mount point: {str(e)}") is_healthy = len(issues) == 0 return { "is_mounted": is_healthy, "mount_point": MOUNT_POINT, "windows_base": WINDOWS_BASE, "issues": issues } def save_scan_output_to_file(output_file: str, stdout_data: str, format_type: str = "json") -> Dict[str, Any]: """ Save scan output to file and return summary info Args: output_file: Windows path where to save the file (e.g., F:/work/results.json) stdout_data: The scan output data to save format_type: Output format (json, text, xml, etc.) Returns: Dict with file info and summary stats """ try: # Resolve Windows path to Linux mount path resolved_output_path = resolve_windows_path(output_file) # Ensure directory exists output_dir = os.path.dirname(resolved_output_path) if output_dir: os.makedirs(output_dir, exist_ok=True) # Write output to file with open(resolved_output_path, 'w', encoding='utf-8') as f: f.write(stdout_data) file_size = os.path.getsize(resolved_output_path) # Generate summary based on format summary = {"total_lines": len(stdout_data.splitlines())} if format_type == "json" and stdout_data: try: parsed = json.loads(stdout_data) if isinstance(parsed, dict): # Semgrep format if "results" in parsed: summary["total_findings"] = len(parsed["results"]) # Count by severity severity_counts = {} for result in parsed["results"]: sev = result.get("extra", {}).get("severity", "UNKNOWN") severity_counts[sev] = severity_counts.get(sev, 0) + 1 summary["by_severity"] = severity_counts if "errors" in parsed: summary["total_errors"] = len(parsed["errors"]) # npm audit format if "vulnerabilities" in parsed: summary["vulnerabilities"] = parsed["vulnerabilities"] except json.JSONDecodeError: logger.warning("Could not parse JSON output for summary") logger.info(f"✓ Scan output saved to {resolved_output_path} ({file_size} bytes)") return { "file_saved": True, "linux_path": resolved_output_path, "windows_path": output_file, "file_size_bytes": file_size, "summary": summary } except Exception as e: logger.error(f"Error saving output to file: {str(e)}") logger.error(traceback.format_exc()) return { "file_saved": False, "error": str(e) } class CommandExecutor: """ Enhanced command executor with proper timeout and output handling. This class handles running shell commands with: - Configurable timeouts (prevents hanging on long scans) - Real-time output capture (stdout and stderr) - Graceful termination (SIGTERM then SIGKILL if needed) - Partial result support (returns output even if timed out) Attributes: command: Shell command to execute timeout: Maximum execution time in seconds cwd: Working directory for command execution stdout_data: Captured standard output stderr_data: Captured standard error return_code: Command exit code timed_out: Whether the command exceeded timeout """ def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT, cwd: Optional[str] = None): """ Initialize the command executor. Args: command: Shell command to execute timeout: Maximum execution time (capped at MAX_TIMEOUT) cwd: Working directory for execution (optional) """ self.command = command self.timeout = min(timeout, MAX_TIMEOUT) # Enforce maximum timeout self.cwd = cwd self.process = None self.stdout_data = "" self.stderr_data = "" self.stdout_thread = None self.stderr_thread = None self.return_code = None self.timed_out = False def _read_stdout(self): """Thread function to continuously read stdout""" try: for line in iter(self.process.stdout.readline, ''): self.stdout_data += line except Exception as e: logger.error(f"Error reading stdout: {e}") def _read_stderr(self): """Thread function to continuously read stderr""" try: for line in iter(self.process.stderr.readline, ''): self.stderr_data += line except Exception as e: logger.error(f"Error reading stderr: {e}") def execute(self) -> Dict[str, Any]: """Execute the command and handle timeout gracefully""" logger.info(f"Executing command: {self.command[:200]}...") if self.cwd: logger.info(f"Working directory: {self.cwd}") try: self.process = subprocess.Popen( self.command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, cwd=self.cwd ) # Start threads to read output continuously self.stdout_thread = threading.Thread(target=self._read_stdout) self.stderr_thread = threading.Thread(target=self._read_stderr) self.stdout_thread.daemon = True self.stderr_thread.daemon = True self.stdout_thread.start() self.stderr_thread.start() # Wait for the process to complete or timeout try: self.return_code = self.process.wait(timeout=self.timeout) self.stdout_thread.join(timeout=5) self.stderr_thread.join(timeout=5) except subprocess.TimeoutExpired: self.timed_out = True logger.warning(f"Command timed out after {self.timeout} seconds") self.process.terminate() try: self.process.wait(timeout=5) except subprocess.TimeoutExpired: logger.warning("Killing unresponsive process") self.process.kill() self.return_code = -1 # Consider success if we have output even with timeout success = ( (self.timed_out and (self.stdout_data or self.stderr_data)) or (self.return_code == 0) ) return { "stdout": self.stdout_data, "stderr": self.stderr_data, "return_code": self.return_code, "success": success, "timed_out": self.timed_out, "partial_results": self.timed_out and bool(self.stdout_data or self.stderr_data), "command": self.command[:200] # First 200 chars for logging } except Exception as e: logger.error(f"Error executing command: {str(e)}") logger.error(traceback.format_exc()) return { "stdout": self.stdout_data, "stderr": f"Error executing command: {str(e)}\n{self.stderr_data}", "return_code": -1, "success": False, "timed_out": False, "partial_results": bool(self.stdout_data or self.stderr_data), "error": str(e) } def execute_command(command: str, cwd: Optional[str] = None, timeout: int = COMMAND_TIMEOUT) -> Dict[str, Any]: """Execute a shell command and return the result""" executor = CommandExecutor(command, timeout=timeout, cwd=cwd) return executor.execute() # ============================================================================ # SAST TOOL ENDPOINTS # ============================================================================ @app.route("/api/sast/semgrep", methods=["POST"]) def semgrep(): """ Execute Semgrep static analysis Parameters: - target: Path to code directory or file - config: Semgrep config (auto, p/security-audit, p/owasp-top-ten, etc.) - lang: Language filter (python, javascript, go, java, etc.) - severity: Filter by severity (ERROR, WARNING, INFO) - output_format: json, sarif, text, gitlab-sast - output_file: Path to save results (Windows format: F:/path/file.json) - additional_args: Additional Semgrep arguments """ try: params = request.json target = params.get("target", ".") config = params.get("config", "auto") lang = params.get("lang", "") severity = params.get("severity", "") output_format = params.get("output_format", "json") output_file = params.get("output_file", "") additional_args = params.get("additional_args", "") # Resolve Windows path to Linux mount path resolved_target = resolve_windows_path(target) command = f"semgrep --config={config}" if lang: command += f" --lang={lang}" if severity: command += f" --severity={severity}" command += f" --{output_format}" if additional_args: command += f" {additional_args}" command += f" {resolved_target}" result = execute_command(command, timeout=SEMGREP_TIMEOUT) # Add path resolution info to result result["original_path"] = target result["resolved_path"] = resolved_target # Try to parse JSON output if output_format == "json" and result["stdout"]: try: result["parsed_output"] = json.loads(result["stdout"]) except: pass # Handle output_file if provided if output_file and result.get("stdout"): file_info = save_scan_output_to_file(output_file, result["stdout"], output_format) result["output_file_info"] = file_info # If file saved successfully, truncate stdout to save tokens if file_info.get("file_saved"): result["stdout_truncated"] = True result["stdout"] = f"[Output saved to {file_info['windows_path']}]\n\nSummary: {json.dumps(file_info['summary'], indent=2)}" # Remove parsed_output to save more tokens if "parsed_output" in result: del result["parsed_output"] return jsonify(result) except Exception as e: logger.error(f"Error in semgrep endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/sast/bearer", methods=["POST"]) def bearer(): """ Execute Bearer security scanner Parameters: - target: Path to code directory - scanner: Type of scan (sast, secrets) - format: Output format (json, yaml, sarif, html) - only_policy: Only check specific policy - severity: Filter by severity (critical, high, medium, low, warning) - additional_args: Additional Bearer arguments """ try: params = request.json target = params.get("target", ".") scanner = params.get("scanner", "") output_format = params.get("format", "json") only_policy = params.get("only_policy", "") severity = params.get("severity", "") additional_args = params.get("additional_args", "") # Resolve Windows path to Linux mount path resolved_target = resolve_windows_path(target) command = f"bearer scan {resolved_target}" if scanner: command += f" --scanner={scanner}" # Suppress verbose output - results will be in the output file command += " --quiet" if output_format: command += f" --format={output_format}" if only_policy: command += f" --only-policy={only_policy}" if severity: command += f" --severity={severity}" if additional_args: command += f" {additional_args}" # Redirect all output to suppress verbose logging command += " 2>&1" result = execute_command(command, timeout=3600) # Add path resolution info to result result["original_path"] = target result["resolved_path"] = resolved_target if output_format == "json" and result["stdout"]: try: result["parsed_output"] = json.loads(result["stdout"]) except: pass return jsonify(result) except Exception as e: logger.error(f"Error in bearer endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/sast/graudit", methods=["POST"]) def graudit(): """ Execute Graudit source code auditing Parameters: - target: Path to code directory or file - database: Signature database to use (default, all, or specific like asp, c, perl, php, python, etc.) - additional_args: Additional graudit arguments """ try: params = request.json target = params.get("target", ".") database = params.get("database", "all") additional_args = params.get("additional_args", "") # Resolve Windows path to Linux mount path resolved_target = resolve_windows_path(target) command = f"graudit -d {database}" if additional_args: command += f" {additional_args}" command += f" {resolved_target}" result = execute_command(command, timeout=300) # Add path resolution info to result result["original_path"] = target result["resolved_path"] = resolved_target return jsonify(result) except Exception as e: logger.error(f"Error in graudit endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/sast/bandit", methods=["POST"]) def bandit(): """ Execute Bandit Python security scanner Parameters: - target: Path to Python code directory or file - severity_level: Report only issues of a given severity (low, medium, high) - confidence_level: Report only issues of given confidence (low, medium, high) - format: Output format (json, csv, txt, html, xml) - additional_args: Additional Bandit arguments """ try: params = request.json target = params.get("target", ".") severity_level = params.get("severity_level", "") confidence_level = params.get("confidence_level", "") output_format = params.get("format", "json") additional_args = params.get("additional_args", "") # Resolve Windows path to Linux mount path resolved_target = resolve_windows_path(target) command = f"bandit -r {resolved_target} -f {output_format}" if severity_level: command += f" -ll -l {severity_level.upper()}" if confidence_level: command += f" -ii -i {confidence_level.upper()}" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=BANDIT_TIMEOUT) # Add path resolution info to result result["original_path"] = target result["resolved_path"] = resolved_target if output_format == "json" and result["stdout"]: try: result["parsed_output"] = json.loads(result["stdout"]) except: pass return jsonify(result) except Exception as e: logger.error(f"Error in bandit endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/sast/gosec", methods=["POST"]) def gosec(): """ Execute Gosec Go security checker Parameters: - target: Path to Go code directory - format: Output format (json, yaml, csv, junit-xml, html, sonarqube, golint, sarif, text) - severity: Filter by severity (low, medium, high) - confidence: Filter by confidence (low, medium, high) - additional_args: Additional gosec arguments """ try: params = request.json target = params.get("target", "./...") output_format = params.get("format", "json") severity = params.get("severity", "") confidence = params.get("confidence", "") additional_args = params.get("additional_args", "") command = f"gosec -fmt={output_format}" if severity: command += f" -severity={severity}" if confidence: command += f" -confidence={confidence}" if additional_args: command += f" {additional_args}" command += f" {target}" result = execute_command(command, timeout=300) if output_format == "json" and result["stdout"]: try: result["parsed_output"] = json.loads(result["stdout"]) except: pass return jsonify(result) except Exception as e: logger.error(f"Error in gosec endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/sast/brakeman", methods=["POST"]) def brakeman(): """ Execute Brakeman Rails security scanner Parameters: - target: Path to Rails application directory - format: Output format (json, html, csv, tabs, text) - confidence_level: Minimum confidence level (1-3, 1 is highest) - additional_args: Additional Brakeman arguments """ try: params = request.json target = params.get("target", ".") output_format = params.get("format", "json") confidence_level = params.get("confidence_level", "") additional_args = params.get("additional_args", "") # Resolve Windows path to Linux mount path resolved_target = resolve_windows_path(target) command = f"brakeman -p {resolved_target} -f {output_format}" if confidence_level: command += f" -w {confidence_level}" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=300) # Add path resolution info to result result["original_path"] = target result["resolved_path"] = resolved_target if output_format == "json" and result["stdout"]: try: result["parsed_output"] = json.loads(result["stdout"]) except: pass return jsonify(result) except Exception as e: logger.error(f"Error in brakeman endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/sast/nodejsscan", methods=["POST"]) def nodejsscan(): """ Execute NodeJSScan Node.js security scanner Parameters: - target: Path to Node.js code directory - output_file: Output file path (optional) """ try: params = request.json target = params.get("target", ".") output_file = params.get("output_file", "") command = f"nodejsscan -d {target}" if output_file: command += f" -o {output_file}" result = execute_command(command, timeout=3600) return jsonify(result) except Exception as e: logger.error(f"Error in nodejsscan endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/sast/eslint-security", methods=["POST"]) def eslint_security(): """ Execute ESLint with security plugins Parameters: - target: Path to JavaScript/TypeScript code - config: ESLint config file path - format: Output format (stylish, json, html, etc.) - fix: Automatically fix problems (boolean) - additional_args: Additional ESLint arguments """ try: params = request.json target = params.get("target", ".") config = params.get("config", "") output_format = params.get("format", "json") fix = params.get("fix", False) additional_args = params.get("additional_args", "") command = f"eslint {target} -f {output_format}" if config: command += f" -c {config}" if fix: command += " --fix" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=3600) if output_format == "json" and result["stdout"]: try: result["parsed_output"] = json.loads(result["stdout"]) except: pass return jsonify(result) except Exception as e: logger.error(f"Error in eslint-security endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 # ============================================================================ # SECRET SCANNING ENDPOINTS # ============================================================================ @app.route("/api/secrets/trufflehog", methods=["POST"]) def trufflehog(): """ Execute TruffleHog secrets scanner Parameters: - target: Git repository URL or filesystem path - scan_type: Type of scan (git, filesystem, github, gitlab, s3, etc.) - json_output: Return JSON format (boolean) - only_verified: Only show verified secrets (boolean) - additional_args: Additional TruffleHog arguments """ try: params = request.json target = params.get("target", ".") scan_type = params.get("scan_type", "filesystem") json_output = params.get("json_output", True) only_verified = params.get("only_verified", False) additional_args = params.get("additional_args", "") # Resolve Windows path to Linux mount path resolved_target = resolve_windows_path(target) command = f"trufflehog {scan_type} {resolved_target}" if json_output: command += " --json" if only_verified: command += " --only-verified" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=TRUFFLEHOG_TIMEOUT) # Add path resolution info to result result["original_path"] = target result["resolved_path"] = resolved_target # Parse JSON lines output if json_output and result["stdout"]: try: secrets = [] for line in result["stdout"].strip().split('\n'): if line.strip(): secrets.append(json.loads(line)) result["parsed_secrets"] = secrets except: pass return jsonify(result) except Exception as e: logger.error(f"Error in trufflehog endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/secrets/gitleaks", methods=["POST"]) def gitleaks(): """ Execute Gitleaks secret scanner Parameters: - target: Path to git repository or directory - config: Path to gitleaks config file - report_format: Output format (json, csv, sarif) - report_path: Path to save report - verbose: Enable verbose output (boolean) - additional_args: Additional gitleaks arguments """ try: params = request.json target = params.get("target", ".") config = params.get("config", "") report_format = params.get("report_format", "json") report_path = params.get("report_path", "") verbose = params.get("verbose", False) additional_args = params.get("additional_args", "") # Resolve Windows path to Linux mount path resolved_target = resolve_windows_path(target) command = f"gitleaks detect --source={resolved_target} --report-format={report_format}" if config: command += f" --config={config}" if report_path: command += f" --report-path={report_path}" if verbose: command += " -v" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=300) # Add path resolution info to result result["original_path"] = target result["resolved_path"] = resolved_target # Read report file if specified if report_path and os.path.exists(report_path): try: with open(report_path, 'r') as f: if report_format == "json": result["parsed_report"] = json.load(f) else: result["report_content"] = f.read() except Exception as e: logger.warning(f"Error reading report file: {e}") return jsonify(result) except Exception as e: logger.error(f"Error in gitleaks endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 # ============================================================================ # DEPENDENCY SCANNING ENDPOINTS # ============================================================================ @app.route("/api/dependencies/safety", methods=["POST"]) def safety(): """ Execute Safety Python dependency checker Parameters: - requirements_file: Path to requirements.txt - json_output: Return JSON format (boolean) - full_report: Include full report (boolean) - additional_args: Additional Safety arguments """ try: params = request.json requirements_file = params.get("requirements_file", "requirements.txt") json_output = params.get("json_output", True) full_report = params.get("full_report", False) additional_args = params.get("additional_args", "") command = f"safety check -r {requirements_file}" if json_output: command += " --json" if full_report: command += " --full-report" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=1800) if json_output and result["stdout"]: try: result["parsed_output"] = json.loads(result["stdout"]) except: pass return jsonify(result) except Exception as e: logger.error(f"Error in safety endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/dependencies/npm-audit", methods=["POST"]) def npm_audit(): """ Execute npm audit for Node.js dependencies Parameters: - target: Path to Node.js project directory - json_output: Return JSON format (boolean) - audit_level: Minimum level to report (info, low, moderate, high, critical) - production: Only audit production dependencies (boolean) - additional_args: Additional npm audit arguments """ try: params = request.json target = params.get("target", ".") json_output = params.get("json_output", True) audit_level = params.get("audit_level", "") production = params.get("production", False) additional_args = params.get("additional_args", "") # Resolve Windows path to Linux mount path resolved_target = resolve_windows_path(target) command = "npm audit" if json_output: command += " --json" if audit_level: command += f" --audit-level={audit_level}" if production: command += " --production" if additional_args: command += f" {additional_args}" result = execute_command(command, cwd=resolved_target, timeout=180) # Add path resolution info to result result["original_path"] = target result["resolved_path"] = resolved_target if json_output and result["stdout"]: try: result["parsed_output"] = json.loads(result["stdout"]) except: pass return jsonify(result) except Exception as e: logger.error(f"Error in npm-audit endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/dependencies/dependency-check", methods=["POST"]) def dependency_check(): """ Execute OWASP Dependency-Check Parameters: - target: Path to project directory - project_name: Name of the project - format: Output format (HTML, XML, CSV, JSON, JUNIT, SARIF, ALL) - scan: Comma-separated list of paths to scan - additional_args: Additional dependency-check arguments """ try: params = request.json target = params.get("target", ".") project_name = params.get("project_name", "project") output_format = params.get("format", "JSON") scan = params.get("scan", target) additional_args = params.get("additional_args", "") # Create temporary output directory output_dir = tempfile.mkdtemp() command = f"dependency-check --project {project_name} --scan {scan} --format {output_format} --out {output_dir}" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=900) # 15 minutes for large projects # Read generated report try: report_files = os.listdir(output_dir) for report_file in report_files: report_path = os.path.join(output_dir, report_file) with open(report_path, 'r') as f: if report_file.endswith('.json'): result["parsed_report"] = json.load(f) else: result["report_content"] = f.read() except Exception as e: logger.warning(f"Error reading report: {e}") finally: # Cleanup try: shutil.rmtree(output_dir) except: pass return jsonify(result) except Exception as e: logger.error(f"Error in dependency-check endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 # ============================================================================ # INFRASTRUCTURE AS CODE SCANNING # ============================================================================ @app.route("/api/iac/checkov", methods=["POST"]) def checkov(): """ Execute Checkov IaC security scanner Parameters: - target: Path to IaC directory - framework: Framework to scan (terraform, cloudformation, kubernetes, helm, etc.) - output_format: Output format (cli, json, junitxml, sarif, github_failed_only) - compact: Compact output (boolean) - quiet: Quiet mode (boolean) - additional_args: Additional Checkov arguments """ try: params = request.json target = params.get("target", ".") framework = params.get("framework", "") output_format = params.get("output_format", "json") compact = params.get("compact", False) quiet = params.get("quiet", False) additional_args = params.get("additional_args", "") # Resolve Windows path to Linux mount path resolved_target = resolve_windows_path(target) command = f"checkov -d {resolved_target} -o {output_format}" if framework: command += f" --framework {framework}" if compact: command += " --compact" if quiet: command += " --quiet" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=3600) # Add path resolution info to result result["original_path"] = target result["resolved_path"] = resolved_target if output_format == "json" and result["stdout"]: try: result["parsed_output"] = json.loads(result["stdout"]) except: pass return jsonify(result) except Exception as e: logger.error(f"Error in checkov endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/iac/tfsec", methods=["POST"]) def tfsec(): """ Execute tfsec Terraform security scanner Parameters: - target: Path to Terraform directory - format: Output format (default, json, csv, checkstyle, junit, sarif) - minimum_severity: Minimum severity to report (LOW, MEDIUM, HIGH, CRITICAL) - additional_args: Additional tfsec arguments """ try: params = request.json target = params.get("target", ".") output_format = params.get("format", "json") minimum_severity = params.get("minimum_severity", "") additional_args = params.get("additional_args", "") command = f"tfsec {target} --format {output_format}" if minimum_severity: command += f" --minimum-severity {minimum_severity}" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=3600) if output_format == "json" and result["stdout"]: try: result["parsed_output"] = json.loads(result["stdout"]) except: pass return jsonify(result) except Exception as e: logger.error(f"Error in tfsec endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 # ============================================================================ # CONTAINER SECURITY # ============================================================================ @app.route("/api/container/trivy", methods=["POST"]) def trivy(): """ Execute Trivy container/IaC security scanner Parameters: - target: Image name, directory, or repository - scan_type: Type of scan (image, fs, repo, config) - format: Output format (table, json, sarif, template) - severity: Severities to include (UNKNOWN,LOW,MEDIUM,HIGH,CRITICAL) - additional_args: Additional Trivy arguments """ try: params = request.json target = params.get("target", "") scan_type = params.get("scan_type", "fs") output_format = params.get("format", "json") severity = params.get("severity", "") additional_args = params.get("additional_args", "") if not target: return jsonify({"error": "Target parameter is required"}), 400 # Resolve Windows path to Linux mount path resolved_target = resolve_windows_path(target) command = f"trivy {scan_type} --format {output_format}" if severity: command += f" --severity {severity}" if additional_args: command += f" {additional_args}" command += f" {resolved_target}" result = execute_command(command, timeout=3600) # Add path resolution info to result result["original_path"] = target result["resolved_path"] = resolved_target if output_format == "json" and result["stdout"]: try: result["parsed_output"] = json.loads(result["stdout"]) except: pass return jsonify(result) except Exception as e: logger.error(f"Error in trivy endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 # ============================================================================ # ADDITIONAL KALI LINUX SECURITY TOOLS # ============================================================================ @app.route("/api/web/nikto", methods=["POST"]) def nikto(): """ Execute Nikto web server scanner Parameters: - target: Target host (IP or domain) - port: Port to scan (default: 80) - ssl: Use SSL/HTTPS (boolean) - output_format: Output format (txt, html, csv, xml) - output_file: Path to save output file - additional_args: Additional Nikto arguments """ try: params = request.json target = params.get("target", "") port = params.get("port", "80") ssl = params.get("ssl", False) output_format = params.get("output_format", "txt") output_file = params.get("output_file", "") additional_args = params.get("additional_args", "") if not target: return jsonify({"error": "Target parameter is required"}), 400 # Resolve output file path if specified resolved_output_file = "" if output_file: resolved_output_file = resolve_windows_path(output_file) command = f"nikto -h {target} -p {port}" if ssl: command += " -ssl" if resolved_output_file: command += f" -Format {output_format} -output {resolved_output_file}" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=NIKTO_TIMEOUT) # Add path info if output_file: result["output_file_original"] = output_file result["output_file_resolved"] = resolved_output_file # Read output file if specified if resolved_output_file and os.path.exists(resolved_output_file): try: with open(resolved_output_file, 'r') as f: result["file_content"] = f.read() except Exception as e: logger.warning(f"Error reading output file: {e}") return jsonify(result) except Exception as e: logger.error(f"Error in nikto endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/network/nmap", methods=["POST"]) def nmap(): """ Execute Nmap network/port scanner Parameters: - target: Target host(s) to scan (IP, domain, or CIDR) - scan_type: Scan type (default: -sV for version detection) Options: -sS (SYN), -sT (TCP Connect), -sU (UDP), -sV (Version), -sC (Script scan), -A (Aggressive), -sn (Ping scan) - ports: Port specification (e.g., "80,443" or "1-1000") - output_format: Output format (normal, xml, grepable) - output_file: Path to save output - additional_args: Additional Nmap arguments """ try: params = request.json target = params.get("target", "") scan_type = params.get("scan_type", "-sV") ports = params.get("ports", "") output_format = params.get("output_format", "normal") output_file = params.get("output_file", "") additional_args = params.get("additional_args", "") if not target: return jsonify({"error": "Target parameter is required"}), 400 # Resolve output file path if specified resolved_output_file = "" if output_file: resolved_output_file = resolve_windows_path(output_file) command = f"nmap {scan_type}" if ports: command += f" -p {ports}" if resolved_output_file: if output_format == "xml": command += f" -oX {resolved_output_file}" elif output_format == "grepable": command += f" -oG {resolved_output_file}" else: command += f" -oN {resolved_output_file}" if additional_args: command += f" {additional_args}" command += f" {target}" result = execute_command(command, timeout=NMAP_TIMEOUT) # Add path info if output_file: result["output_file_original"] = output_file result["output_file_resolved"] = resolved_output_file # Read output file if specified if resolved_output_file and os.path.exists(resolved_output_file): try: with open(resolved_output_file, 'r') as f: result["file_content"] = f.read() except Exception as e: logger.warning(f"Error reading output file: {e}") return jsonify(result) except Exception as e: logger.error(f"Error in nmap endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/web/sqlmap", methods=["POST"]) def sqlmap(): """ Execute SQLMap for SQL injection testing Parameters: - target: Target URL to test - data: POST data string - cookie: HTTP Cookie header value - level: Level of tests (1-5, default: 1) - risk: Risk of tests (1-3, default: 1) - batch: Never ask for user input, use default behavior (boolean) - output_dir: Directory to save output files - additional_args: Additional SQLMap arguments """ try: params = request.json target = params.get("target", "") data = params.get("data", "") cookie = params.get("cookie", "") level = params.get("level", "1") risk = params.get("risk", "1") batch = params.get("batch", True) output_dir = params.get("output_dir", "") additional_args = params.get("additional_args", "") if not target: return jsonify({"error": "Target parameter is required"}), 400 # Resolve output directory path if specified resolved_output_dir = "" if output_dir: resolved_output_dir = resolve_windows_path(output_dir) command = f"sqlmap -u '{target}' --level={level} --risk={risk}" if batch: command += " --batch" if data: command += f" --data='{data}'" if cookie: command += f" --cookie='{cookie}'" if resolved_output_dir: command += f" --output-dir={resolved_output_dir}" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=SQLMAP_TIMEOUT) # Add path info if output_dir: result["output_dir_original"] = output_dir result["output_dir_resolved"] = resolved_output_dir return jsonify(result) except Exception as e: logger.error(f"Error in sqlmap endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/web/wpscan", methods=["POST"]) def wpscan(): """ Execute WPScan WordPress security scanner Parameters: - target: Target WordPress URL - enumerate: What to enumerate (u: users, p: plugins, t: themes, vp: vulnerable plugins) - api_token: WPScan API token for vulnerability data - output_file: Path to save output (JSON format) - additional_args: Additional WPScan arguments """ try: params = request.json target = params.get("target", "") enumerate = params.get("enumerate", "vp") api_token = params.get("api_token", "") output_file = params.get("output_file", "") additional_args = params.get("additional_args", "") if not target: return jsonify({"error": "Target parameter is required"}), 400 # Resolve output file path if specified resolved_output_file = "" if output_file: resolved_output_file = resolve_windows_path(output_file) command = f"wpscan --url {target}" if enumerate: command += f" --enumerate {enumerate}" if api_token: command += f" --api-token {api_token}" if resolved_output_file: command += f" --output {resolved_output_file} --format json" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=WPSCAN_TIMEOUT) # Add path info if output_file: result["output_file_original"] = output_file result["output_file_resolved"] = resolved_output_file # Read output file if specified if resolved_output_file and os.path.exists(resolved_output_file): try: with open(resolved_output_file, 'r') as f: result["parsed_output"] = json.load(f) except Exception as e: logger.warning(f"Error reading output file: {e}") return jsonify(result) except Exception as e: logger.error(f"Error in wpscan endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/web/dirb", methods=["POST"]) def dirb(): """ Execute DIRB web content scanner Parameters: - target: Target URL to scan - wordlist: Path to wordlist file (default: /usr/share/dirb/wordlists/common.txt) - extensions: File extensions to check (e.g., "php,html,js") - output_file: Path to save output - additional_args: Additional DIRB arguments """ try: params = request.json target = params.get("target", "") wordlist = params.get("wordlist", "/usr/share/dirb/wordlists/common.txt") extensions = params.get("extensions", "") output_file = params.get("output_file", "") additional_args = params.get("additional_args", "") if not target: return jsonify({"error": "Target parameter is required"}), 400 # Resolve output file path if specified resolved_output_file = "" if output_file: resolved_output_file = resolve_windows_path(output_file) command = f"dirb {target} {wordlist}" if extensions: command += f" -X {extensions}" if resolved_output_file: command += f" -o {resolved_output_file}" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=DIRB_TIMEOUT) # Add path info if output_file: result["output_file_original"] = output_file result["output_file_resolved"] = resolved_output_file # Read output file if specified if resolved_output_file and os.path.exists(resolved_output_file): try: with open(resolved_output_file, 'r') as f: result["file_content"] = f.read() except Exception as e: logger.warning(f"Error reading output file: {e}") return jsonify(result) except Exception as e: logger.error(f"Error in dirb endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/system/lynis", methods=["POST"]) def lynis(): """ Execute Lynis security auditing tool for Unix/Linux systems Parameters: - target: Target directory or system to audit (default: system audit) - audit_mode: Audit mode (system, dockerfile) - quick: Quick scan mode (boolean) - log_file: Path to save log file - report_file: Path to save report file - additional_args: Additional Lynis arguments """ try: params = request.json target = params.get("target", "") audit_mode = params.get("audit_mode", "system") quick = params.get("quick", False) log_file = params.get("log_file", "") report_file = params.get("report_file", "") additional_args = params.get("additional_args", "") # Resolve path if provided resolved_target = "" if target: resolved_target = resolve_windows_path(target) # Resolve log/report file paths if provided resolved_log_file = "" resolved_report_file = "" if log_file: resolved_log_file = resolve_windows_path(log_file) if report_file: resolved_report_file = resolve_windows_path(report_file) command = f"lynis audit {audit_mode}" if audit_mode == "dockerfile" and resolved_target: command += f" {resolved_target}" if quick: command += " --quick" if resolved_log_file: command += f" --logfile {resolved_log_file}" if resolved_report_file: command += f" --report-file {resolved_report_file}" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=LYNIS_TIMEOUT) # Add path info if target: result["target_original"] = target result["target_resolved"] = resolved_target if log_file: result["log_file_original"] = log_file result["log_file_resolved"] = resolved_log_file if report_file: result["report_file_original"] = report_file result["report_file_resolved"] = resolved_report_file # Read log/report files if specified if resolved_log_file and os.path.exists(resolved_log_file): try: with open(resolved_log_file, 'r') as f: result["log_content"] = f.read() except Exception as e: logger.warning(f"Error reading log file: {e}") if resolved_report_file and os.path.exists(resolved_report_file): try: with open(resolved_report_file, 'r') as f: result["report_content"] = f.read() except Exception as e: logger.warning(f"Error reading report file: {e}") return jsonify(result) except Exception as e: logger.error(f"Error in lynis endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/dependencies/snyk", methods=["POST"]) def snyk(): """ Execute Snyk security scanner for dependencies and containers Parameters: - target: Path to project directory (default: current directory) - test_type: Type of test (test, container, iac, code) - severity_threshold: Minimum severity to report (low, medium, high, critical) - json_output: Output in JSON format (boolean) - output_file: Path to save output - additional_args: Additional Snyk arguments """ try: params = request.json target = params.get("target", ".") test_type = params.get("test_type", "test") severity_threshold = params.get("severity_threshold", "") json_output = params.get("json_output", True) output_file = params.get("output_file", "") additional_args = params.get("additional_args", "") # Resolve Windows path to Linux mount path resolved_target = resolve_windows_path(target) # Resolve output file path if specified resolved_output_file = "" if output_file: resolved_output_file = resolve_windows_path(output_file) command = f"snyk {test_type} {resolved_target}" if json_output: command += " --json" if severity_threshold: command += f" --severity-threshold={severity_threshold}" if resolved_output_file: command += f" > {resolved_output_file}" if additional_args: command += f" {additional_args}" result = execute_command(command, timeout=SNYK_TIMEOUT) # Add path resolution info result["original_path"] = target result["resolved_path"] = resolved_target if output_file: result["output_file_original"] = output_file result["output_file_resolved"] = resolved_output_file # Parse JSON output if json_output and result.get("stdout"): try: result["parsed_output"] = json.loads(result["stdout"]) except: pass # Read output file if specified if resolved_output_file and os.path.exists(resolved_output_file): try: with open(resolved_output_file, 'r') as f: content = f.read() result["file_content"] = content if json_output: try: result["parsed_output"] = json.loads(content) except: pass except Exception as e: logger.warning(f"Error reading output file: {e}") return jsonify(result) except Exception as e: logger.error(f"Error in snyk endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/malware/clamav", methods=["POST"]) def clamav(): """ Execute ClamAV antivirus scanner Parameters: - target: Path to file or directory to scan - recursive: Scan directories recursively (boolean) - infected_only: Only show infected files (boolean) - output_file: Path to save scan log - additional_args: Additional ClamAV arguments """ try: params = request.json target = params.get("target", "") recursive = params.get("recursive", True) infected_only = params.get("infected_only", False) output_file = params.get("output_file", "") additional_args = params.get("additional_args", "") if not target: return jsonify({"error": "Target parameter is required"}), 400 # Resolve Windows path to Linux mount path resolved_target = resolve_windows_path(target) # Resolve output file path if specified resolved_output_file = "" if output_file: resolved_output_file = resolve_windows_path(output_file) command = "clamscan" if recursive: command += " -r" if infected_only: command += " -i" if resolved_output_file: command += f" -l {resolved_output_file}" if additional_args: command += f" {additional_args}" command += f" {resolved_target}" result = execute_command(command, timeout=CLAMAV_TIMEOUT) # Add path resolution info result["original_path"] = target result["resolved_path"] = resolved_target if output_file: result["output_file_original"] = output_file result["output_file_resolved"] = resolved_output_file # Read output file if specified if resolved_output_file and os.path.exists(resolved_output_file): try: with open(resolved_output_file, 'r') as f: result["file_content"] = f.read() except Exception as e: logger.warning(f"Error reading output file: {e}") return jsonify(result) except Exception as e: logger.error(f"Error in clamav endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({"error": f"Server error: {str(e)}"}), 500 # ============================================================================ # UTILITY ENDPOINTS # ============================================================================ @app.route("/api/command", methods=["POST"]) def generic_command(): """Execute any command provided in the request""" try: params = request.json command = params.get("command", "") cwd = params.get("cwd", None) timeout = params.get("timeout", COMMAND_TIMEOUT) if not command: return jsonify({"error": "Command parameter is required"}), 400 result = execute_command(command, cwd=cwd, timeout=timeout) return jsonify(result) except Exception as e: logger.error(f"Error in command endpoint: {str(e)}") return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/health", methods=["GET"]) def health_check(): """Health check endpoint with tool availability""" # Essential SAST tools to check essential_tools = { "semgrep": "semgrep --version", "bandit": "bandit --version", "eslint": "eslint --version", "npm": "npm --version", "safety": "safety --version", "trufflehog": "trufflehog --version", "gitleaks": "gitleaks version" } # Additional SAST tools additional_tools = { "bearer": "bearer version", "graudit": "which graudit", "gosec": "gosec -version", "brakeman": "brakeman --version", "checkov": "checkov --version", "tfsec": "tfsec --version", "trivy": "trivy --version", "dependency-check": "dependency-check.sh --version" } # Kali Linux security tools kali_tools = { "nikto": "nikto -Version", "nmap": "nmap --version", "sqlmap": "sqlmap --version", "wpscan": "wpscan --version", "dirb": "which dirb", "lynis": "lynis --version", "snyk": "snyk --version", "clamscan": "clamscan --version" } tools_status = {} # Check essential tools for tool, check_cmd in essential_tools.items(): try: result = execute_command(check_cmd, timeout=10) tools_status[tool] = result["success"] except: tools_status[tool] = False # Check additional tools for tool, check_cmd in additional_tools.items(): try: result = execute_command(check_cmd, timeout=10) tools_status[tool] = result["success"] except: tools_status[tool] = False # Check Kali tools for tool, check_cmd in kali_tools.items(): try: result = execute_command(check_cmd, timeout=10) tools_status[tool] = result["success"] except: tools_status[tool] = False all_essential_available = all([tools_status.get(tool, False) for tool in essential_tools.keys()]) available_count = sum(1 for available in tools_status.values() if available) total_count = len(tools_status) kali_tools_count = sum(1 for tool in kali_tools.keys() if tools_status.get(tool, False)) return jsonify({ "status": "healthy", "message": "SAST Tools API Server is running", "tools_status": tools_status, "all_essential_tools_available": all_essential_available, "total_tools_available": available_count, "total_tools_count": total_count, "kali_tools_available": kali_tools_count, "kali_tools_total": len(kali_tools), "version": "2.0.0" }) def parse_args(): """Parse command line arguments""" parser = argparse.ArgumentParser(description="Run the SAST Tools API Server") parser.add_argument("--debug", action="store_true", help="Enable debug mode") parser.add_argument("--port", type=int, default=API_PORT, help=f"Port for the API server (default: {API_PORT})") return parser.parse_args() if __name__ == "__main__": args = parse_args() if args.debug: DEBUG_MODE = True os.environ["DEBUG_MODE"] = "1" logger.setLevel(logging.DEBUG) if args.port != API_PORT: API_PORT = args.port logger.info(f"Starting SAST Tools API Server on port {API_PORT}") logger.info("Supported SAST tools: Semgrep, Bearer, Graudit, Bandit, Gosec, Brakeman, ESLint, TruffleHog, Gitleaks, Safety, npm audit, Checkov, tfsec, Trivy, OWASP Dependency-Check") logger.info("Supported Kali tools: Nikto, Nmap, SQLMap, WPScan, DIRB, Lynis, Snyk, ClamAV") app.run(host="0.0.0.0", port=API_PORT, debug=DEBUG_MODE)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Sengtocxoen/sast-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server