import os
import subprocess
import sys
import time
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
import requests
# Add the parent directory to the path to fix imports
sys.path.append(str(Path(__file__).parent))
from config import (
SESSION_NAME,
SESSION_STRATEGY,
ZAP_AUTOSTART,
ZAP_BASE,
ZAP_STARTUP_POLL,
ZAP_STARTUP_TIMEOUT,
)
from http_session import get_json
from logging_setup import setup_logger
LOG = setup_logger("zap_mcp.zap_control")
ZAP_SESSION_ID: Optional[str] = None
def zap_is_running() -> bool:
try:
url = f"{ZAP_BASE}/JSON/core/view/version/"
r = requests.get(url, timeout=5)
if r.status_code == 200:
LOG.debug(
"zap.api.accessible",
extra={"extra": {"url": url, "status": r.status_code}},
)
return True
else:
LOG.debug(
"zap.api.inaccessible",
extra={"extra": {"url": url, "status": r.status_code}},
)
return False
except Exception as e:
LOG.debug("zap.api.error", extra={"extra": {"url": url, "error": repr(e)}})
return False
def _find_zap_directory() -> Optional[str]:
"""Find the ZAP installation directory by locating zap.bat"""
try:
result = subprocess.run(
["where", "zap.bat"], capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
zap_bat_path = result.stdout.strip().split("\n")[0]
# Get the directory containing zap.bat
zap_dir = os.path.dirname(zap_bat_path)
return zap_dir
except Exception:
pass
return None
def _check_zap_ports() -> list:
"""Check common ZAP ports to see if ZAP is running"""
common_ports = [8080, 8081, 8082, 8083, 8084, 8085]
running_ports = []
for port in common_ports:
try:
url = f"http://127.0.0.1:{port}/JSON/core/view/version/"
r = requests.get(url, timeout=2)
if r.status_code == 200:
running_ports.append(port)
LOG.info("zap.found.on.port", extra={"extra": {"port": port}})
except Exception:
continue
return running_ports
def _build_zap_cmd(zap_dir: str) -> list[str]:
p = urlparse(ZAP_BASE)
host = p.hostname or "127.0.0.1"
port = p.port or (443 if p.scheme == "https" else 8080)
# Return command as list for subprocess.Popen with shell=False
return ["cmd", "/c", f"cd /d \"{zap_dir}\" && zap.bat -daemon -port {port} -host {host} -config api.disablekey=true"]
def ensure_zap_running() -> None:
p = urlparse(ZAP_BASE)
host = (p.hostname or "").lower()
is_local = host in ("127.0.0.1", "localhost")
if not is_local:
LOG.info("zap.autostart.skip.remote", extra={"extra": {"host": host}})
return
if zap_is_running():
LOG.info("zap.detected.running")
return
# Check if ZAP is running on other ports
running_ports = _check_zap_ports()
if running_ports:
LOG.info(
"zap.detected.on.alternate.port", extra={"extra": {"ports": running_ports}}
)
return
if not ZAP_AUTOSTART:
LOG.warning("zap.autostart.disabled")
return
if os.name != "nt":
LOG.warning("zap.autostart.skip.non_windows")
return
zap_dir = _find_zap_directory()
if not zap_dir:
LOG.error(
"zap.autostart.not_in_path",
extra={"extra": {"message": "zap.bat not found in PATH"}},
)
return
cmd = _build_zap_cmd(zap_dir)
LOG.info("zap.autostart.exec", extra={"extra": {"cmd": cmd}})
try:
subprocess.Popen(cmd, shell=False)
except Exception as e:
LOG.error("zap.autostart.spawn_failed", extra={"extra": {"err": repr(e)}})
return
t0 = time.monotonic()
while time.monotonic() - t0 < ZAP_STARTUP_TIMEOUT:
if zap_is_running():
LOG.info("zap.autostart.ready")
return
time.sleep(max(0.25, ZAP_STARTUP_POLL))
LOG.error(
"zap.autostart.timeout", extra={"extra": {"timeout_s": ZAP_STARTUP_TIMEOUT}}
)
def ensure_session() -> Optional[str]:
global ZAP_SESSION_ID
if ZAP_SESSION_ID:
return ZAP_SESSION_ID
name = SESSION_NAME
if SESSION_STRATEGY == "unique":
name = f"{SESSION_NAME}_{int(time.time())}"
def _params(extra=None):
p = {"name": name}
if extra:
p.update(extra)
return p
try:
if SESSION_STRATEGY in ("reuse", "unique"):
r = requests.get(
f"{ZAP_BASE}/JSON/core/action/loadSession/", params=_params(), timeout=30
)
if r.status_code == 200:
ZAP_SESSION_ID = name
return ZAP_SESSION_ID
except Exception:
pass
for params in (_params(), _params({"overwrite": "true"})):
try:
r = requests.get(f"{ZAP_BASE}/JSON/core/action/newSession/", params=params, timeout=30)
if r.status_code == 200:
ZAP_SESSION_ID = name
return ZAP_SESSION_ID
except Exception:
continue
return None
def access_url(scan_id: str, url: str, follow_redirects: bool = True):
try:
get_json(
scan_id,
"/JSON/core/action/accessUrl/",
{"url": url, "followRedirects": str(follow_redirects).lower()},
session_id=ZAP_SESSION_ID,
)
except Exception as e:
LOG.warning(
"accessUrl.warn", extra={"extra": {"scan_id": scan_id, "err": repr(e)}}
)