from datetime import datetime
import json
import sys
import os
import time
import requests
import argparse
class addToScope():
def accessURL(self, ZAP_BASE_URL, target_url):
endpoint = f"{ZAP_BASE_URL}/JSON/core/action/accessUrl/"
params = {
"url": target_url,
"followRedirects": "true"
}
try:
response = requests.get(endpoint, params=params)
if response.status_code == 200:
return "Successfully accessed the URL via ZAP API!"
else:
return f"Failed to access URL: HTTP {response.status_code} - {response.text}"
except requests.exceptions.RequestException as e:
return f"An error occurred while accessing the URL: {e}"
class spider():
def startScan(self, ZAP_BASE_URL, target_url, recurse, max_children):
endpoint = f"{ZAP_BASE_URL}/JSON/spider/action/scan/"
params = {
"url": target_url,
"maxChildren": max_children,
"recurse": str(recurse).lower()
}
response = requests.get(endpoint, params=params)
response_data = response.json()
if "scan" in response_data:
return response_data["scan"]
def checkStatus(self, ZAP_BASE_URL, scanId):
endpoint = f"{ZAP_BASE_URL}/JSON/spider/view/status/"
params = {
"scanId": f"{scanId}"
}
status = 0
while status != 100:
response = requests.get(endpoint, params=params)
response_data = response.json()
if "status" in response_data:
status = int(response_data["status"])
time.sleep(2)
return status
def scanResults(self, ZAP_BASE_URL, scanId, status):
endpoint = f"{ZAP_BASE_URL}/JSON/spider/view/results/"
params = {
"scanId": f"{scanId}"
}
if status == 100:
response = requests.get(endpoint, params=params)
response_data = response.json()
return response_data
def save_results(self, results, tool_name):
"""
Save scan results to a JSON file with a specific naming format.
Example: AjaxSpider_ram_20250122_030130.json
"""
user_id = os.popen("whoami").read().strip()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{tool_name}_{user_id}_{timestamp}.json"
try:
output_dir = "output/"
os.makedirs(output_dir, exist_ok=True)
filepath = os.path.join(output_dir, filename)
with open(filepath, "w") as file:
json.dump(results, file, indent=4)
except Exception as e:
return f"Error saving results: {e}"
def DESTROY(self, ZAP_BASE_URL, scanId):
endpoint = f"{ZAP_BASE_URL}/JSON/spider/action/stop/"
params = {
"scanId": f"{scanId}"
}
response = requests.get(endpoint, params=params)
response_data = response.json()
if response_data.get("Result") == "OK":
return "STOPPED the Current Scan!"
else:
return "ERROR! Scan Not Stopped"
def run_zap_scan(url, recurse=False, max_children=0, port=8888, user_id=None, output=None):
"""
Run the ZAP scans programmatically.
"""
ZAP_BASE_URL = f"http://127.0.0.1:{port}"
spider_instance = spider()
add_to_scope = addToScope()
zap_process = None
try:
add_to_scope.accessURL(ZAP_BASE_URL, url)
spider_scan_id = spider_instance.startScan(ZAP_BASE_URL, url, recurse, max_children)
if spider_scan_id is None:
raise RuntimeError("Failed to start Spider scan.")
while True:
spider_status = spider_instance.checkStatus(ZAP_BASE_URL, spider_scan_id)
if spider_status == 100:
break
time.sleep(1)
spider_results = spider_instance.scanResults(ZAP_BASE_URL, spider_scan_id, 100)
except requests.exceptions.RequestException as req_err:
return f"Network error occurred: {req_err}"
except RuntimeError as run_err:
return f"Runtime error: {run_err}"
except Exception as e:
return f"An unexpected error occurred: {e}"
return spider_results