Skip to main content
Glama

Web-QA

by GroundNG
zap_scanner.py13.9 kB
# zap_scanner.py import logging import subprocess import os import shlex import time import json import requests from datetime import datetime from .utils import parse_json_file # Relative import ZAP_TIMEOUT_SECONDS = 1800 # 30 minutes default ZAP_API_PORT = 8080 # Default ZAP API port def run_zap_scan(target_url: str, output_dir="results", timeout=ZAP_TIMEOUT_SECONDS, zap_path=None, api_key=None, scan_mode="baseline"): """ Runs OWASP ZAP security scanner against a target URL. Args: target_url: The URL to scan output_dir: Directory to store scan results timeout: Maximum time in seconds for the scan zap_path: Path to ZAP installation (uses docker by default) api_key: ZAP API key if required scan_mode: Type of scan - 'baseline', 'full' or 'api' """ if not target_url: logging.error("ZAP target URL is required") return [] logging.info(f"Starting ZAP scan for target: {target_url}") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f"zap_output_{timestamp}.json" output_filepath = os.path.join(output_dir, output_filename) if not os.path.exists(output_dir): os.makedirs(output_dir) # Determine if using Docker or local ZAP installation use_docker = zap_path is None if use_docker: # Docker command to run ZAP in a container command = [ "docker", "run", "--rm", "-v", f"{os.path.abspath(output_dir)}:/zap/wrk:rw", "-t", "owasp/zap2docker-stable", "zap-" + scan_mode + "-scan.py", "-t", target_url, "-J", output_filename ] if api_key: command.extend(["-z", f"api.key={api_key}"]) else: # Local ZAP installation script_name = f"zap-{scan_mode}-scan.py" command = [ os.path.join(zap_path, script_name), "-t", target_url, "-J", output_filepath ] if api_key: command.extend(["-z", f"api.key={api_key}"]) logging.debug(f"Executing ZAP command: {' '.join(shlex.quote(cmd) for cmd in command)}") try: result = subprocess.run(command, capture_output=True, text=True, timeout=timeout, check=False) logging.info("ZAP process finished.") logging.debug(f"ZAP stdout:\n{result.stdout}") if result.returncode != 0: logging.warning(f"ZAP exited with non-zero status code: {result.returncode}") return [f"ZAP exited with non-zero status code: {result.returncode}"] # For Docker, the output will be in the mapped volume actual_output_path = output_filepath if not use_docker else os.path.join(output_dir, output_filename) # Parse the JSON output file report_data = parse_json_file(actual_output_path) if report_data and "site" in report_data: # Process ZAP findings from the report findings = [] # Structure varies based on scan mode but generally has sites with alerts for site in report_data.get("site", []): site_url = site.get("@name", "") for alert in site.get("alerts", []): finding = { 'tool': 'OWASP ZAP', 'severity': alert.get("riskdesc", "").split(" ", 1)[0], 'message': alert.get("name", ""), 'description': alert.get("desc", ""), 'url': site_url, 'solution': alert.get("solution", ""), 'references': alert.get("reference", ""), 'cweid': alert.get("cweid", ""), 'instances': len(alert.get("instances", [])), } findings.append(finding) logging.info(f"Successfully parsed {len(findings)} findings from ZAP output.") return findings else: logging.warning(f"Could not parse findings from ZAP output file: {actual_output_path}") return [f"Could not parse findings from ZAP output file: {actual_output_path}"] except subprocess.TimeoutExpired: logging.error(f"ZAP scan timed out after {timeout} seconds.") return [f"ZAP scan timed out after {timeout} seconds."] except FileNotFoundError as e: if use_docker: logging.error("Docker command not found. Is Docker installed and in PATH?") return ["Docker command not found. Is Docker installed and in PATH?"] else: logging.error(f"ZAP command not found at {zap_path}. Is ZAP installed?") return [f"ZAP command not found at {zap_path}. Is ZAP installed?"] except Exception as e: logging.error(f"An unexpected error occurred while running ZAP: {e}") return [f"An unexpected error occurred while running ZAP: {e}"] def run_zap_api_scan(target_url: str, api_definition: str, output_dir="results", timeout=ZAP_TIMEOUT_SECONDS, zap_path=None, api_key=None): """ Runs ZAP API scan against a REST API with OpenAPI/Swagger definition. Args: target_url: Base URL of the API api_definition: Path to OpenAPI/Swagger definition file output_dir: Directory to store scan results timeout: Maximum time in seconds for the scan zap_path: Path to ZAP installation (uses docker by default) api_key: ZAP API key if required """ if not os.path.isfile(api_definition): logging.error(f"API definition file not found: {api_definition}") return [] # Similar implementation as run_zap_scan but with API scanning options timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f"zap_api_output_{timestamp}.json" output_filepath = os.path.join(output_dir, output_filename) if not os.path.exists(output_dir): os.makedirs(output_dir) use_docker = zap_path is None if use_docker: # Volume mount for API definition file api_def_dir = os.path.dirname(os.path.abspath(api_definition)) api_def_file = os.path.basename(api_definition) command = [ "docker", "run", "--rm", "-v", f"{os.path.abspath(output_dir)}:/zap/wrk:rw", "-v", f"{api_def_dir}:/zap/api:ro", "-t", "owasp/zap2docker-stable", "zap-api-scan.py", "-t", target_url, "-f", f"/zap/api/{api_def_file}", "-J", output_filename ] else: command = [ os.path.join(zap_path, "zap-api-scan.py"), "-t", target_url, "-f", api_definition, "-J", output_filepath ] if api_key: command.extend(["-z", f"api.key={api_key}"]) logging.debug(f"Executing ZAP API scan command: {' '.join(shlex.quote(cmd) for cmd in command)}") # The rest of the implementation follows similar pattern to run_zap_scan try: result = subprocess.run(command, capture_output=True, text=True, timeout=timeout, check=False) # Processing similar to run_zap_scan logging.info("ZAP API scan process finished.") logging.debug(f"ZAP stdout:\n{result.stdout}") if result.returncode != 0: logging.warning(f"ZAP API scan exited with non-zero status code: {result.returncode}") return [f"ZAP API scan exited with non-zero status code: {result.returncode}"] # For Docker, the output will be in the mapped volume actual_output_path = output_filepath if not use_docker else os.path.join(output_dir, output_filename) # Parse the JSON output file - same processing as run_zap_scan report_data = parse_json_file(actual_output_path) if report_data and "site" in report_data: findings = [] for site in report_data.get("site", []): site_url = site.get("@name", "") for alert in site.get("alerts", []): finding = { 'tool': 'OWASP ZAP API Scan', 'severity': alert.get("riskdesc", "").split(" ", 1)[0], 'message': alert.get("name", ""), 'description': alert.get("desc", ""), 'url': site_url, 'solution': alert.get("solution", ""), 'references': alert.get("reference", ""), 'cweid': alert.get("cweid", ""), 'instances': len(alert.get("instances", [])), } findings.append(finding) logging.info(f"Successfully parsed {len(findings)} findings from ZAP API scan output.") return findings else: logging.warning(f"Could not parse findings from ZAP API scan output file: {actual_output_path}") return [f"Could not parse findings from ZAP API scan output file: {actual_output_path}"] except Exception as e: logging.error(f"An unexpected error occurred while running ZAP API scan: {e}") return [f"An unexpected error occurred while running ZAP API scan: {e}"] def discover_endpoints(target_url: str, output_dir="results", timeout=600, zap_path=None, api_key=None): """ Uses ZAP's spider to discover endpoints in a web application. Args: target_url: The URL to scan output_dir: Directory to store results timeout: Maximum time in seconds for the spider zap_path: Path to ZAP installation (uses docker by default) api_key: ZAP API key if required Returns: List of discovered endpoints and their details """ if not target_url: logging.error("Target URL is required for endpoint discovery") return [] logging.info(f"Starting ZAP endpoint discovery for: {target_url}") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f"zap_endpoints_{timestamp}.json" output_filepath = os.path.join(output_dir, output_filename) if not os.path.exists(output_dir): os.makedirs(output_dir) use_docker = zap_path is None if use_docker: command = [ "docker", "run", "--rm", "-v", f"{os.path.abspath(output_dir)}:/zap/wrk:rw", "-t", "owasp/zap2docker-stable", "zap-full-scan.py", "-t", target_url, "-J", output_filename, "-z", "-config spider.maxDuration=1", # Limit spider duration "--spider-first", # Run spider before the scan "-n", "endpoints.context" # Don't perform actual scan, just spider ] else: command = [ os.path.join(zap_path, "zap-full-scan.py"), "-t", target_url, "-J", output_filepath, "-z", "-config spider.maxDuration=1", "--spider-first", "-n", "endpoints.context" ] if api_key: command.extend(["-z", f"api.key={api_key}"]) logging.debug(f"Executing ZAP endpoint discovery: {' '.join(shlex.quote(cmd) for cmd in command)}") try: result = subprocess.run(command, capture_output=True, text=True, timeout=timeout, check=False) logging.info("ZAP endpoint discovery finished.") logging.debug(f"ZAP stdout:\n{result.stdout}") actual_output_path = output_filepath if not use_docker else os.path.join(output_dir, output_filename) # Parse the JSON output file report_data = parse_json_file(actual_output_path) if report_data: endpoints = [] # Extract endpoints from spider results if "site" in report_data: for site in report_data.get("site", []): site_url = site.get("@name", "") # Extract URLs from alerts and spider results urls = set() # Get URLs from alerts for alert in site.get("alerts", []): for instance in alert.get("instances", []): url = instance.get("uri", "") if url: urls.add(url) # Add discovered endpoints for url in urls: endpoint = { 'url': url, 'method': 'GET', # Default to GET, ZAP spider mainly discovers GET endpoints 'source': 'ZAP Spider', 'parameters': [], # Could be enhanced to parse URL parameters 'discovered_at': datetime.now().isoformat() } endpoints.append(endpoint) logging.info(f"Successfully discovered {len(endpoints)} endpoints.") # Save endpoints to a separate file endpoints_file = os.path.join(output_dir, f"discovered_endpoints_{timestamp}.json") with open(endpoints_file, 'w') as f: json.dump(endpoints, f, indent=2) logging.info(f"Saved discovered endpoints to: {endpoints_file}") return endpoints else: logging.warning("No endpoints discovered or parsing failed.") return [] except subprocess.TimeoutExpired: logging.error(f"Endpoint discovery timed out after {timeout} seconds.") return [] except Exception as e: logging.error(f"An error occurred during endpoint discovery: {e}") return []

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GroundNG/QA-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server