Skip to main content
Glama
server.py39.3 kB
import asyncio import sys import threading import time import uuid from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Any, Dict, Mapping, Union # Add the parent directory to the path to fix imports sys.path.append(str(Path(__file__).parent)) from logging_setup import setup_logger from mcp.server import Server from mcp.server.models import InitializationOptions from mcp.server.stdio import stdio_server from mcp.types import TextContent, Tool from models import CompleteScanArgs from scans.active import run_ascan from scans.ajax import AjaxOptions, run_ajax from scans.passive import passive_scan_impl from scans.spider import run_spider from utils import _finalize, _prepare from zap_control import ZAP_SESSION_ID LOG = setup_logger("zap_mcp.server") # Global scan management class ScanManager: def __init__(self): self.active_scans: Dict[str, Dict[str, Any]] = {} self.executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="zap-scan") self.lock = threading.Lock() def start_scan( self, scan_id: str, scan_type: str, target: str, args: Dict[str, Any] ) -> str: """Start a scan in background and return scan ID""" with self.lock: self.active_scans[scan_id] = { "type": scan_type, "target": target, "args": args, "status": "running", "progress": 0, "start_time": time.time(), "last_update": time.time(), "result": None, "error": None, "current_phase": "initializing", "message": "Scan starting...", } # Start scan in background future = self.executor.submit(self._run_scan, scan_id, scan_type, target, args) self.active_scans[scan_id]["future"] = future LOG.info( f"Started {scan_type} scan", extra={"extra": {"scan_id": scan_id, "target": target}}, ) return scan_id def _run_scan( self, scan_id: str, scan_type: str, target: str, args: Dict[str, Any] ): """Run the actual scan in background thread""" try: with self.lock: self.active_scans[scan_id]["status"] = "running" # Create a new session for each scan import time import requests from config import APIKEY, SESSION_NAME, ZAP_BASE session_name = f"{SESSION_NAME}_{int(time.time())}" params = {"name": session_name} if APIKEY: params["apikey"] = APIKEY try: url = f"{ZAP_BASE}/JSON/core/action/newSession/" response = requests.get(url, params=params, timeout=10) if response.status_code == 200: # Update global session ID for this scan global ZAP_SESSION_ID ZAP_SESSION_ID = session_name LOG.info(f"Created new session for scan {scan_id}: {session_name}") # Clear all alerts for clean scan start try: clear_params = {} if APIKEY: clear_params["apikey"] = APIKEY clear_url = f"{ZAP_BASE}/JSON/core/action/deleteAllAlerts/" clear_response = requests.get( clear_url, params=clear_params, timeout=10 ) if clear_response.status_code == 200: LOG.info(f"Cleared all alerts for scan {scan_id}") else: LOG.warning( f"Failed to clear alerts for scan {scan_id}: HTTP {clear_response.status_code}" ) except Exception as e: LOG.warning( f"Exception clearing alerts for scan {scan_id}: {e}" ) else: LOG.warning( f"Failed to create new session for scan {scan_id}, using existing session" ) except Exception as e: LOG.warning( f"Exception creating new session for scan {scan_id}: {e}, using existing session" ) # Create progress callback def progress_callback( scan_id: str, scan_phase: str, progress: int, message: str ): with self.lock: if scan_id in self.active_scans: self.active_scans[scan_id]["progress"] = progress self.active_scans[scan_id]["last_update"] = time.time() self.active_scans[scan_id]["current_phase"] = scan_phase self.active_scans[scan_id]["message"] = message if scan_type == "active": result = self._run_active_scan(scan_id, target, args, progress_callback) elif scan_type == "complete": result = self._run_complete_scan( scan_id, target, args, progress_callback ) elif scan_type == "passive": result = self._run_passive_scan( scan_id, target, args, progress_callback ) elif scan_type == "ajax": result = self._run_ajax_scan(scan_id, target, args, progress_callback) else: raise ValueError(f"Unknown scan type: {scan_type}") with self.lock: self.active_scans[scan_id]["status"] = "completed" self.active_scans[scan_id]["result"] = result self.active_scans[scan_id]["progress"] = 100 self.active_scans[scan_id]["last_update"] = time.time() self.active_scans[scan_id]["message"] = "Scan completed successfully" LOG.info( f"Completed {scan_type} scan", extra={"extra": {"scan_id": scan_id, "target": target}}, ) except Exception as e: with self.lock: self.active_scans[scan_id]["status"] = "failed" self.active_scans[scan_id]["error"] = str(e) self.active_scans[scan_id]["last_update"] = time.time() self.active_scans[scan_id]["message"] = f"Scan failed: {str(e)}" LOG.error( f"Failed {scan_type} scan", extra={"extra": {"scan_id": scan_id, "error": str(e)}}, ) def _run_active_scan( self, scan_id: str, target: str, args: Dict[str, Any], progress_callback: callable, ): """Run active scan synchronously""" target = _prepare(scan_id, target) # Spider first (optional) try: spider_timeout = min(args.get("spider_max_wait_seconds", 1800), 600) run_spider( scan_id, target, args.get("recurse", True), args.get("inScopeOnly", False), args.get("poll_interval_seconds", 1.5), spider_timeout, session_id=ZAP_SESSION_ID, progress_callback=progress_callback, ) except Exception as e: LOG.warning( f"Spider failed, continuing with active scan", extra={"extra": {"scan_id": scan_id, "error": str(e)}}, ) # Active scan ascan_id, ascan_duration = run_ascan( scan_id, target, args.get("recurse", True), args.get("inScopeOnly", False), args.get("scanPolicyName"), args.get("poll_interval_seconds", 1.5), args.get("ascan_max_wait_seconds", 7200), session_id=ZAP_SESSION_ID, progress_callback=progress_callback, ) result = _finalize( scan_id, target, args.get("include_findings", True), args.get("include_evidence", False), {"pscan": 0.0, "ajax": 0.0, "spider": 0.0, "ascan": ascan_duration}, mode="active", ) result["activeScan"] = { "scanId": ascan_id, "duration_seconds": round(ascan_duration, 2), } return result def _run_complete_scan( self, scan_id: str, target: str, args: Dict[str, Any], progress_callback: callable, ): """Run complete scan synchronously""" target = _prepare(scan_id, target) # AJAX opts = AjaxOptions( inScope=args.get("ajax_inScope", True), subtreeOnly=args.get("ajax_subtreeOnly", False), maxCrawlDepth=args.get("ajax_maxCrawlDepth", 10), maxCrawlStates=args.get("ajax_maxCrawlStates", 0), maxDuration=args.get("ajax_maxDuration", 60), eventWait=args.get("ajax_eventWait", 1000), clickDefaultElems=args.get("ajax_clickDefaultElems", True), clickElemsOnce=args.get("ajax_clickElemsOnce", True), browserId=args.get("ajax_browserId"), wait_seconds=args.get("ajax_wait_seconds", 300), poll_interval_seconds=args.get("poll_interval_seconds", 1.5), ) ok, ajax_duration, ajax_results = run_ajax( scan_id, target, opts, session_id=ZAP_SESSION_ID ) # Spider spider_id, spider_duration = run_spider( scan_id, target, args.get("recurse", True), args.get("inScopeOnly", False), args.get("poll_interval_seconds", 1.5), args.get("spider_max_wait_seconds", 1800), session_id=ZAP_SESSION_ID, progress_callback=progress_callback, ) # Active ascan_id, ascan_duration = run_ascan( scan_id, target, args.get("recurse", True), args.get("inScopeOnly", False), args.get("scanPolicyName"), args.get("poll_interval_seconds", 1.5), args.get("ascan_max_wait_seconds", 7200), session_id=ZAP_SESSION_ID, progress_callback=progress_callback, ) # Passive pscan_duration = 0.0 if args.get("wait_for_passive", True): from scans.passive import pscan_wait pscan_duration = pscan_wait( scan_id, args.get("passive_poll_interval_seconds", 0.5), args.get("passive_timeout_seconds", 600), session_id=ZAP_SESSION_ID, ) result = _finalize( scan_id, target, args.get("include_findings", True), args.get("include_evidence", False), { "pscan": pscan_duration, "ajax": ajax_duration, "spider": spider_duration, "ascan": ascan_duration, }, mode="complete", ) result.update( { "ajaxSpider": { "started": ok, "duration_seconds": round(ajax_duration, 2), "numberOfResults": ajax_results, }, "spider": { "scanId": spider_id, "duration_seconds": round(spider_duration, 2), }, "activeScan": { "scanId": ascan_id, "duration_seconds": round(ascan_duration, 2), }, "passiveScan": { "waited": bool(args.get("wait_for_passive", True)), "duration_seconds": round(pscan_duration, 2), }, } ) return result def _run_passive_scan( self, scan_id: str, target: str, args: Dict[str, Any], progress_callback: callable, ): """Run passive scan synchronously""" target = _prepare(scan_id, target) # Enable passive scanning from http_session import get_json try: get_json( scan_id, "/JSON/pscan/action/setEnabled/", {"enabled": "true"}, session_id=ZAP_SESSION_ID, ) get_json( scan_id, "/JSON/pscan/action/enableAllScanners/", {}, session_id=ZAP_SESSION_ID, ) except Exception as e: return {"error": f"Failed to enable passive scanning: {str(e)}"} # Wait for passive scan to complete timeout_seconds = args.get("timeout_seconds", 600) poll_interval = args.get("poll_interval_seconds", 0.5) from scans.passive import pscan_wait pscan_duration = pscan_wait( scan_id, poll_interval, timeout_seconds, session_id=ZAP_SESSION_ID ) # Get results from scans.passive import passive_scan_impl result = passive_scan_impl( scan_id, target, args.get("include_findings", True), args.get("include_evidence", False), session_id=ZAP_SESSION_ID, ) result.update( { "scanId": scan_id, "target": target, "duration_seconds": round(pscan_duration, 2), "type": "passive", } ) return result def _run_ajax_scan( self, scan_id: str, target: str, args: Dict[str, Any], progress_callback: callable, ): """Run AJAX scan synchronously""" target = _prepare(scan_id, target) # First run traditional spider to discover URLs try: spider_timeout = min( args.get("spider_max_wait_seconds", 300), 300 ) # Max 5 minutes for spider spider_id, spider_duration = run_spider( scan_id, target, args.get("recurse", True), args.get("inScopeOnly", False), args.get("poll_interval_seconds", 1.0), spider_timeout, session_id=ZAP_SESSION_ID, progress_callback=progress_callback, ) except Exception as e: LOG.warning( f"Spider failed, continuing with AJAX scan", extra={"extra": {"scan_id": scan_id, "error": str(e)}}, ) spider_id, spider_duration = "0", 0.0 # Configure AJAX options opts = AjaxOptions( inScope=args.get("inScope", True), subtreeOnly=args.get("subtreeOnly", False), maxCrawlDepth=args.get("maxCrawlDepth", 10), maxCrawlStates=args.get("maxCrawlStates", 0), maxDuration=args.get("maxDuration", 60), eventWait=args.get("eventWait", 1000), clickDefaultElems=args.get("clickDefaultElems", True), clickElemsOnce=args.get("clickElemsOnce", True), browserId=args.get("browserId"), wait_seconds=args.get("wait_seconds", 300), poll_interval_seconds=args.get("poll_interval_seconds", 1.0), ) # Then run AJAX spider to discover additional URLs ok, ajax_duration, ajax_results = run_ajax( scan_id, target, opts, session_id=ZAP_SESSION_ID ) result = { "scanId": scan_id, "target": target, "spider": { "scanId": spider_id, "duration_seconds": round(spider_duration, 2), }, "ajaxSpider": { "started": ok, "duration_seconds": round(ajax_duration, 2), "numberOfResults": ajax_results, }, "type": "ajax", } return result def get_scan_status(self, scan_id: str) -> Dict[str, Any]: """Get current status of a scan""" with self.lock: if scan_id not in self.active_scans: return {"error": "Scan not found"} scan_info = self.active_scans[scan_id].copy() # Remove future object from response scan_info.pop("future", None) # Add runtime information current_time = time.time() scan_info["runtime_seconds"] = round( current_time - scan_info["start_time"], 2 ) scan_info["last_update_ago_seconds"] = round( current_time - scan_info["last_update"], 2 ) return scan_info def cancel_scan(self, scan_id: str) -> bool: """Cancel a running scan""" with self.lock: if scan_id not in self.active_scans: return False scan_info = self.active_scans[scan_id] if scan_info["status"] == "running": # Try to cancel the future future = scan_info.get("future") if future: future.cancel() scan_info["status"] = "cancelled" scan_info["last_update"] = time.time() return True return False def list_scans(self) -> Dict[str, Any]: """List all active scans""" with self.lock: scans = {} current_time = time.time() for scan_id, scan_info in self.active_scans.items(): scans[scan_id] = { "type": scan_info["type"], "target": scan_info["target"], "status": scan_info["status"], "progress": scan_info["progress"], "start_time": scan_info["start_time"], "last_update": scan_info["last_update"], "runtime_seconds": round(current_time - scan_info["start_time"], 2), "last_update_ago_seconds": round( current_time - scan_info["last_update"], 2 ), "current_phase": scan_info.get("current_phase", "unknown"), "message": scan_info.get("message", ""), } return scans # Global scan manager instance scan_manager = ScanManager() # Heartbeat mechanism to prevent MCP server timeouts class HeartbeatManager: def __init__(self): self.last_heartbeat = time.time() self.heartbeat_interval = 30 # 30 seconds self.is_running = False self.heartbeat_task = None async def start_heartbeat(self): """Start the heartbeat mechanism""" if self.is_running: return self.is_running = True self.heartbeat_task = asyncio.create_task(self._heartbeat_loop()) LOG.info("Heartbeat mechanism started") async def stop_heartbeat(self): """Stop the heartbeat mechanism""" self.is_running = False if self.heartbeat_task: self.heartbeat_task.cancel() try: await self.heartbeat_task except asyncio.CancelledError: pass LOG.info("Heartbeat mechanism stopped") async def _heartbeat_loop(self): """Heartbeat loop to keep MCP server alive""" while self.is_running: try: await asyncio.sleep(self.heartbeat_interval) self.last_heartbeat = time.time() # Log heartbeat if there are active scans active_scans = scan_manager.list_scans() if active_scans: LOG.debug( f"Heartbeat: {len(active_scans)} active scans", extra={"extra": {"active_scans": len(active_scans)}}, ) except asyncio.CancelledError: break except Exception as e: LOG.error(f"Heartbeat error: {e}") await asyncio.sleep(5) # Wait before retrying # Global heartbeat manager instance heartbeat_manager = HeartbeatManager() app = Server("zap-mcp", "1.0.0") @app.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="start_active_scan", description="Start an active security scan on a target URL (asynchronous)", inputSchema={ "type": "object", "properties": { "url": { "type": "string", "format": "uri", "description": "Target URL to scan", }, "recurse": { "type": "boolean", "default": True, "description": "Whether to recurse into subdirectories", }, "inScopeOnly": { "type": "boolean", "default": False, "description": "Only scan URLs in scope", }, "scanPolicyName": { "type": "string", "description": "Name of the scan policy to use", }, "poll_interval_seconds": { "type": "number", "default": 1.5, "description": "Polling interval in seconds", }, "ascan_max_wait_seconds": { "type": "integer", "default": 7200, "description": "Maximum wait time for active scan (default: 2 hours)", }, "spider_max_wait_seconds": { "type": "integer", "default": 1800, "description": "Maximum wait time for spider (default: 30 minutes)", }, "include_findings": { "type": "boolean", "default": True, "description": "Include findings in results", }, "include_evidence": { "type": "boolean", "default": False, "description": "Include evidence in results", }, }, "required": ["url"], }, ), Tool( name="start_complete_scan", description="Start a complete security scan including AJAX, spider, and active scanning (asynchronous)", inputSchema={ "type": "object", "properties": { "url": { "type": "string", "format": "uri", "description": "Target URL to scan", }, "recurse": { "type": "boolean", "default": True, "description": "Whether to recurse into subdirectories", }, "inScopeOnly": { "type": "boolean", "default": False, "description": "Only scan URLs in scope", }, "scanPolicyName": { "type": "string", "description": "Name of the scan policy to use", }, "poll_interval_seconds": { "type": "number", "default": 1.5, "description": "Polling interval in seconds", }, "ascan_max_wait_seconds": { "type": "integer", "default": 7200, "description": "Maximum wait time for active scan (default: 2 hours)", }, "spider_max_wait_seconds": { "type": "integer", "default": 1800, "description": "Maximum wait time for spider (default: 30 minutes)", }, "include_findings": { "type": "boolean", "default": True, "description": "Include findings in results", }, "include_evidence": { "type": "boolean", "default": False, "description": "Include evidence in results", }, "ajax_inScope": { "type": "boolean", "default": True, "description": "AJAX spider in scope only", }, "ajax_subtreeOnly": { "type": "boolean", "default": False, "description": "AJAX spider subtree only", }, "ajax_maxCrawlDepth": { "type": "integer", "default": 10, "description": "Maximum crawl depth for AJAX spider", }, "ajax_maxCrawlStates": { "type": "integer", "default": 0, "description": "Maximum crawl states for AJAX spider", }, "ajax_maxDuration": { "type": "integer", "default": 60, "description": "Maximum duration for AJAX spider", }, "ajax_eventWait": { "type": "integer", "default": 1000, "description": "Event wait time for AJAX spider", }, "ajax_clickDefaultElems": { "type": "boolean", "default": True, "description": "Click default elements in AJAX spider", }, "ajax_clickElemsOnce": { "type": "boolean", "default": True, "description": "Click elements once in AJAX spider", }, "ajax_browserId": { "type": "string", "description": "Browser ID for AJAX spider", }, "ajax_wait_seconds": { "type": "integer", "default": 300, "description": "Wait time for AJAX spider", }, "wait_for_passive": { "type": "boolean", "default": True, "description": "Wait for passive scan to complete", }, "passive_poll_interval_seconds": { "type": "number", "default": 0.5, "description": "Passive scan polling interval", }, "passive_timeout_seconds": { "type": "integer", "default": 600, "description": "Passive scan timeout", }, "disable_websocket": { "type": "boolean", "default": False, "description": "Disable websocket scanning", }, }, "required": ["url"], }, ), Tool( name="start_passive_scan", description="Start a passive security scan on a target URL (asynchronous)", inputSchema={ "type": "object", "properties": { "url": { "type": "string", "format": "uri", "description": "Target URL to scan", }, "include_findings": { "type": "boolean", "default": True, "description": "Include findings in results", }, "include_evidence": { "type": "boolean", "default": False, "description": "Include evidence in results", }, "poll_interval_seconds": { "type": "number", "default": 0.5, "description": "Polling interval in seconds", }, "timeout_seconds": { "type": "integer", "default": 600, "description": "Maximum wait time for passive scan", }, }, "required": ["url"], }, ), Tool( name="get_scan_status", description="Get the status of a running scan", inputSchema={ "type": "object", "properties": { "scan_id": { "type": "string", "description": "Scan ID returned by start_*_scan", } }, "required": ["scan_id"], }, ), Tool( name="cancel_scan", description="Cancel a running scan", inputSchema={ "type": "object", "properties": { "scan_id": { "type": "string", "description": "Scan ID returned by start_*_scan", } }, "required": ["scan_id"], }, ), Tool( name="list_scans", description="List all active scans", inputSchema={"type": "object", "properties": {}}, ), Tool( name="start_heartbeat", description="Start heartbeat mechanism to prevent MCP server timeouts", inputSchema={"type": "object", "properties": {}}, ), Tool( name="create_new_session", description="Manually create a new ZAP session with unique timestamp", inputSchema={ "type": "object", "properties": { "session_name": { "type": "string", "description": "Optional custom session name (default: auto-generated with timestamp)", } }, "description": "Creates a new ZAP session and returns the session ID", }, ), Tool( name="start_ajax_scan", description="Start an AJAX spider scan on a target URL (asynchronous)", inputSchema={ "type": "object", "properties": { "url": { "type": "string", "format": "uri", "description": "Target URL to scan", }, "recurse": { "type": "boolean", "default": True, "description": "Whether to recurse into subdirectories", }, "inScopeOnly": { "type": "boolean", "default": False, "description": "Only scan URLs in scope", }, "spider_max_wait_seconds": { "type": "integer", "default": 300, "description": "Maximum wait time for spider (default: 5 minutes)", }, "inScope": { "type": "boolean", "default": True, "description": "AJAX spider in scope only", }, "subtreeOnly": { "type": "boolean", "default": False, "description": "AJAX spider subtree only", }, "maxCrawlDepth": { "type": "integer", "default": 10, "description": "Maximum crawl depth for AJAX spider", }, "maxCrawlStates": { "type": "integer", "default": 0, "description": "Maximum crawl states for AJAX spider", }, "maxDuration": { "type": "integer", "default": 60, "description": "Maximum duration for AJAX spider", }, "eventWait": { "type": "integer", "default": 1000, "description": "Event wait time for AJAX spider", }, "clickDefaultElems": { "type": "boolean", "default": True, "description": "Click default elements in AJAX spider", }, "clickElemsOnce": { "type": "boolean", "default": True, "description": "Click elements once in AJAX spider", }, "browserId": { "type": "string", "description": "Browser ID for AJAX spider", }, "wait_seconds": { "type": "integer", "default": 300, "description": "Wait time for AJAX spider", }, "poll_interval_seconds": { "type": "number", "default": 1.0, "description": "Polling interval in seconds", }, }, "required": ["url"], }, ), ] @app.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> list[TextContent]: if name == "start_active_scan": scan_id = scan_manager.start_scan( uuid.uuid4().hex[:8], "active", arguments["url"], arguments ) result = { "scan_id": scan_id, "status": "started", "message": "Active scan started in background", } return [TextContent(type="text", text=str(result))] elif name == "start_complete_scan": scan_id = scan_manager.start_scan( uuid.uuid4().hex[:8], "complete", arguments["url"], arguments ) result = { "scan_id": scan_id, "status": "started", "message": "Complete scan started in background", } return [TextContent(type="text", text=str(result))] elif name == "start_ajax_scan": scan_id = scan_manager.start_scan( uuid.uuid4().hex[:8], "ajax", arguments["url"], arguments ) result = { "scan_id": scan_id, "status": "started", "message": "AJAX scan started in background", } return [TextContent(type="text", text=str(result))] elif name == "start_passive_scan": scan_id = scan_manager.start_scan( uuid.uuid4().hex[:8], "passive", arguments["url"], arguments ) result = { "scan_id": scan_id, "status": "started", "message": "Passive scan started in background", } return [TextContent(type="text", text=str(result))] elif name == "get_scan_status": result = scan_manager.get_scan_status(arguments["scan_id"]) return [TextContent(type="text", text=str(result))] elif name == "cancel_scan": success = scan_manager.cancel_scan(arguments["scan_id"]) result = { "success": success, "message": "Scan cancelled" if success else "Scan not found or not running", } return [TextContent(type="text", text=str(result))] elif name == "list_scans": result = scan_manager.list_scans() return [TextContent(type="text", text=str(result))] elif name == "start_heartbeat": await heartbeat_manager.start_heartbeat() result = {"success": True, "message": "Heartbeat mechanism started"} return [TextContent(type="text", text=str(result))] elif name == "create_new_session": import time import urllib.parse import requests from config import APIKEY, SESSION_NAME, ZAP_BASE from zap_control import ZAP_SESSION_ID # Generate session name custom_name = arguments.get("session_name") if custom_name: session_name = custom_name else: session_name = f"{SESSION_NAME}_{int(time.time())}" # Create new session via ZAP API params = {"name": session_name} if APIKEY: params["apikey"] = APIKEY try: url = f"{ZAP_BASE}/JSON/core/action/newSession/" response = requests.get(url, params=params, timeout=10) if response.status_code == 200: # Update global session ID global ZAP_SESSION_ID ZAP_SESSION_ID = session_name result = { "success": True, "session_id": session_name, "message": f"New session '{session_name}' created successfully", "timestamp": int(time.time()), "zap_response": response.json() if response.text else "OK", } else: result = { "success": False, "error": f"Failed to create session. HTTP {response.status_code}", "response": response.text, "session_name": session_name, } except Exception as e: result = { "success": False, "error": f"Exception creating session: {str(e)}", "session_name": session_name, } return [TextContent(type="text", text=str(result))] else: raise ValueError(f"Unknown tool: {name}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/LisBerndt/zap-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server