zap_scanner.py•13.9 kB
# zap_scanner.py
import logging
import subprocess
import os
import shlex
import time
import json
import requests
from datetime import datetime
from .utils import parse_json_file # Relative import
ZAP_TIMEOUT_SECONDS = 1800 # 30 minutes default
ZAP_API_PORT = 8080 # Default ZAP API port
def run_zap_scan(target_url: str, output_dir="results", timeout=ZAP_TIMEOUT_SECONDS,
zap_path=None, api_key=None, scan_mode="baseline"):
"""
Runs OWASP ZAP security scanner against a target URL.
Args:
target_url: The URL to scan
output_dir: Directory to store scan results
timeout: Maximum time in seconds for the scan
zap_path: Path to ZAP installation (uses docker by default)
api_key: ZAP API key if required
scan_mode: Type of scan - 'baseline', 'full' or 'api'
"""
if not target_url:
logging.error("ZAP target URL is required")
return []
logging.info(f"Starting ZAP scan for target: {target_url}")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"zap_output_{timestamp}.json"
output_filepath = os.path.join(output_dir, output_filename)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Determine if using Docker or local ZAP installation
use_docker = zap_path is None
if use_docker:
# Docker command to run ZAP in a container
command = [
"docker", "run", "--rm", "-v", f"{os.path.abspath(output_dir)}:/zap/wrk:rw",
"-t", "owasp/zap2docker-stable", "zap-" + scan_mode + "-scan.py",
"-t", target_url,
"-J", output_filename
]
if api_key:
command.extend(["-z", f"api.key={api_key}"])
else:
# Local ZAP installation
script_name = f"zap-{scan_mode}-scan.py"
command = [
os.path.join(zap_path, script_name),
"-t", target_url,
"-J", output_filepath
]
if api_key:
command.extend(["-z", f"api.key={api_key}"])
logging.debug(f"Executing ZAP command: {' '.join(shlex.quote(cmd) for cmd in command)}")
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=timeout, check=False)
logging.info("ZAP process finished.")
logging.debug(f"ZAP stdout:\n{result.stdout}")
if result.returncode != 0:
logging.warning(f"ZAP exited with non-zero status code: {result.returncode}")
return [f"ZAP exited with non-zero status code: {result.returncode}"]
# For Docker, the output will be in the mapped volume
actual_output_path = output_filepath if not use_docker else os.path.join(output_dir, output_filename)
# Parse the JSON output file
report_data = parse_json_file(actual_output_path)
if report_data and "site" in report_data:
# Process ZAP findings from the report
findings = []
# Structure varies based on scan mode but generally has sites with alerts
for site in report_data.get("site", []):
site_url = site.get("@name", "")
for alert in site.get("alerts", []):
finding = {
'tool': 'OWASP ZAP',
'severity': alert.get("riskdesc", "").split(" ", 1)[0],
'message': alert.get("name", ""),
'description': alert.get("desc", ""),
'url': site_url,
'solution': alert.get("solution", ""),
'references': alert.get("reference", ""),
'cweid': alert.get("cweid", ""),
'instances': len(alert.get("instances", [])),
}
findings.append(finding)
logging.info(f"Successfully parsed {len(findings)} findings from ZAP output.")
return findings
else:
logging.warning(f"Could not parse findings from ZAP output file: {actual_output_path}")
return [f"Could not parse findings from ZAP output file: {actual_output_path}"]
except subprocess.TimeoutExpired:
logging.error(f"ZAP scan timed out after {timeout} seconds.")
return [f"ZAP scan timed out after {timeout} seconds."]
except FileNotFoundError as e:
if use_docker:
logging.error("Docker command not found. Is Docker installed and in PATH?")
return ["Docker command not found. Is Docker installed and in PATH?"]
else:
logging.error(f"ZAP command not found at {zap_path}. Is ZAP installed?")
return [f"ZAP command not found at {zap_path}. Is ZAP installed?"]
except Exception as e:
logging.error(f"An unexpected error occurred while running ZAP: {e}")
return [f"An unexpected error occurred while running ZAP: {e}"]
def run_zap_api_scan(target_url: str, api_definition: str, output_dir="results",
timeout=ZAP_TIMEOUT_SECONDS, zap_path=None, api_key=None):
"""
Runs ZAP API scan against a REST API with OpenAPI/Swagger definition.
Args:
target_url: Base URL of the API
api_definition: Path to OpenAPI/Swagger definition file
output_dir: Directory to store scan results
timeout: Maximum time in seconds for the scan
zap_path: Path to ZAP installation (uses docker by default)
api_key: ZAP API key if required
"""
if not os.path.isfile(api_definition):
logging.error(f"API definition file not found: {api_definition}")
return []
# Similar implementation as run_zap_scan but with API scanning options
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"zap_api_output_{timestamp}.json"
output_filepath = os.path.join(output_dir, output_filename)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
use_docker = zap_path is None
if use_docker:
# Volume mount for API definition file
api_def_dir = os.path.dirname(os.path.abspath(api_definition))
api_def_file = os.path.basename(api_definition)
command = [
"docker", "run", "--rm",
"-v", f"{os.path.abspath(output_dir)}:/zap/wrk:rw",
"-v", f"{api_def_dir}:/zap/api:ro",
"-t", "owasp/zap2docker-stable", "zap-api-scan.py",
"-t", target_url,
"-f", f"/zap/api/{api_def_file}",
"-J", output_filename
]
else:
command = [
os.path.join(zap_path, "zap-api-scan.py"),
"-t", target_url,
"-f", api_definition,
"-J", output_filepath
]
if api_key:
command.extend(["-z", f"api.key={api_key}"])
logging.debug(f"Executing ZAP API scan command: {' '.join(shlex.quote(cmd) for cmd in command)}")
# The rest of the implementation follows similar pattern to run_zap_scan
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=timeout, check=False)
# Processing similar to run_zap_scan
logging.info("ZAP API scan process finished.")
logging.debug(f"ZAP stdout:\n{result.stdout}")
if result.returncode != 0:
logging.warning(f"ZAP API scan exited with non-zero status code: {result.returncode}")
return [f"ZAP API scan exited with non-zero status code: {result.returncode}"]
# For Docker, the output will be in the mapped volume
actual_output_path = output_filepath if not use_docker else os.path.join(output_dir, output_filename)
# Parse the JSON output file - same processing as run_zap_scan
report_data = parse_json_file(actual_output_path)
if report_data and "site" in report_data:
findings = []
for site in report_data.get("site", []):
site_url = site.get("@name", "")
for alert in site.get("alerts", []):
finding = {
'tool': 'OWASP ZAP API Scan',
'severity': alert.get("riskdesc", "").split(" ", 1)[0],
'message': alert.get("name", ""),
'description': alert.get("desc", ""),
'url': site_url,
'solution': alert.get("solution", ""),
'references': alert.get("reference", ""),
'cweid': alert.get("cweid", ""),
'instances': len(alert.get("instances", [])),
}
findings.append(finding)
logging.info(f"Successfully parsed {len(findings)} findings from ZAP API scan output.")
return findings
else:
logging.warning(f"Could not parse findings from ZAP API scan output file: {actual_output_path}")
return [f"Could not parse findings from ZAP API scan output file: {actual_output_path}"]
except Exception as e:
logging.error(f"An unexpected error occurred while running ZAP API scan: {e}")
return [f"An unexpected error occurred while running ZAP API scan: {e}"]
def discover_endpoints(target_url: str, output_dir="results", timeout=600, zap_path=None, api_key=None):
"""
Uses ZAP's spider to discover endpoints in a web application.
Args:
target_url: The URL to scan
output_dir: Directory to store results
timeout: Maximum time in seconds for the spider
zap_path: Path to ZAP installation (uses docker by default)
api_key: ZAP API key if required
Returns:
List of discovered endpoints and their details
"""
if not target_url:
logging.error("Target URL is required for endpoint discovery")
return []
logging.info(f"Starting ZAP endpoint discovery for: {target_url}")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"zap_endpoints_{timestamp}.json"
output_filepath = os.path.join(output_dir, output_filename)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
use_docker = zap_path is None
if use_docker:
command = [
"docker", "run", "--rm",
"-v", f"{os.path.abspath(output_dir)}:/zap/wrk:rw",
"-t", "owasp/zap2docker-stable",
"zap-full-scan.py",
"-t", target_url,
"-J", output_filename,
"-z", "-config spider.maxDuration=1", # Limit spider duration
"--spider-first", # Run spider before the scan
"-n", "endpoints.context" # Don't perform actual scan, just spider
]
else:
command = [
os.path.join(zap_path, "zap-full-scan.py"),
"-t", target_url,
"-J", output_filepath,
"-z", "-config spider.maxDuration=1",
"--spider-first",
"-n", "endpoints.context"
]
if api_key:
command.extend(["-z", f"api.key={api_key}"])
logging.debug(f"Executing ZAP endpoint discovery: {' '.join(shlex.quote(cmd) for cmd in command)}")
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=timeout, check=False)
logging.info("ZAP endpoint discovery finished.")
logging.debug(f"ZAP stdout:\n{result.stdout}")
actual_output_path = output_filepath if not use_docker else os.path.join(output_dir, output_filename)
# Parse the JSON output file
report_data = parse_json_file(actual_output_path)
if report_data:
endpoints = []
# Extract endpoints from spider results
if "site" in report_data:
for site in report_data.get("site", []):
site_url = site.get("@name", "")
# Extract URLs from alerts and spider results
urls = set()
# Get URLs from alerts
for alert in site.get("alerts", []):
for instance in alert.get("instances", []):
url = instance.get("uri", "")
if url:
urls.add(url)
# Add discovered endpoints
for url in urls:
endpoint = {
'url': url,
'method': 'GET', # Default to GET, ZAP spider mainly discovers GET endpoints
'source': 'ZAP Spider',
'parameters': [], # Could be enhanced to parse URL parameters
'discovered_at': datetime.now().isoformat()
}
endpoints.append(endpoint)
logging.info(f"Successfully discovered {len(endpoints)} endpoints.")
# Save endpoints to a separate file
endpoints_file = os.path.join(output_dir, f"discovered_endpoints_{timestamp}.json")
with open(endpoints_file, 'w') as f:
json.dump(endpoints, f, indent=2)
logging.info(f"Saved discovered endpoints to: {endpoints_file}")
return endpoints
else:
logging.warning("No endpoints discovered or parsing failed.")
return []
except subprocess.TimeoutExpired:
logging.error(f"Endpoint discovery timed out after {timeout} seconds.")
return []
except Exception as e:
logging.error(f"An error occurred during endpoint discovery: {e}")
return []