scan_urls_from_file
Scans a file of URLs for SQL injection vulnerabilities, returning results in batches to avoid timeouts. Supports multiple injection types and database systems.
Instructions
Scan multiple URLs from a file for SQL injection vulnerabilities. Returns results in chunks to avoid timeouts. Use continue_batch to get more results.
Args: file_path: Absolute path to file containing URLs (one URL per line) method: HTTP method - GET or POST injection_types: Comma-separated injection types to test database_types: Comma-separated database types to test headers: Custom headers as key:value pairs separated by | cookies: Cookies as key=value pairs separated by ; bearer_token: Bearer token for Authorization header proxy_url: Proxy URL for Burp Suite or other proxies verify_ssl: Verify SSL certificates waf_bypass: WAF bypass technique concurrency: Number of concurrent scans (1-10, default 3) timeout: Request timeout in seconds per URL (default 5) quick_mode: Use quick scan with fewer payloads (default True) max_urls_per_batch: Max URLs to scan in one call (default 10)
Returns: Batch scan results. If more URLs remain, use continue_batch with the batch_id.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| file_path | Yes | ||
| method | No | GET | |
| injection_types | No | ||
| database_types | No | ||
| headers | No | ||
| cookies | No | ||
| bearer_token | No | ||
| proxy_url | No | ||
| verify_ssl | No | ||
| waf_bypass | No | none | |
| concurrency | No | ||
| timeout | No | ||
| quick_mode | No | ||
| max_urls_per_batch | No |
Implementation Reference
- src/sqli_mcp/server.py:716-791 (handler)The scan_urls_from_file tool handler. Reads a file of URLs, counts them, then delegates to scan_urls_batch for the actual scanning logic.
@mcp.tool() async def scan_urls_from_file( file_path: str, method: str = "GET", injection_types: Optional[str] = None, database_types: Optional[str] = None, headers: Optional[str] = None, cookies: Optional[str] = None, bearer_token: Optional[str] = None, proxy_url: Optional[str] = None, verify_ssl: bool = True, waf_bypass: str = "none", concurrency: int = 3, timeout: float = 5.0, quick_mode: bool = True, max_urls_per_batch: int = 10 ) -> dict: """ Scan multiple URLs from a file for SQL injection vulnerabilities. Returns results in chunks to avoid timeouts. Use continue_batch to get more results. Args: file_path: Absolute path to file containing URLs (one URL per line) method: HTTP method - GET or POST injection_types: Comma-separated injection types to test database_types: Comma-separated database types to test headers: Custom headers as key:value pairs separated by | cookies: Cookies as key=value pairs separated by ; bearer_token: Bearer token for Authorization header proxy_url: Proxy URL for Burp Suite or other proxies verify_ssl: Verify SSL certificates waf_bypass: WAF bypass technique concurrency: Number of concurrent scans (1-10, default 3) timeout: Request timeout in seconds per URL (default 5) quick_mode: Use quick scan with fewer payloads (default True) max_urls_per_batch: Max URLs to scan in one call (default 10) Returns: Batch scan results. If more URLs remain, use continue_batch with the batch_id. """ from pathlib import Path path = Path(file_path) if not path.exists(): return {"error": f"File not found: {file_path}"} try: with open(path, "r", encoding="utf-8") as f: urls = f.read() except Exception as e: return {"error": f"Failed to read file: {str(e)}"} # Count URLs for info url_count = len([u for u in urls.strip().split('\n') if u.strip() and u.strip().startswith('http')]) # Delegate to batch scanner result = await scan_urls_batch( urls=urls, method=method, injection_types=injection_types, database_types=database_types, headers=headers, cookies=cookies, bearer_token=bearer_token, proxy_url=proxy_url, verify_ssl=verify_ssl, waf_bypass=waf_bypass, concurrency=concurrency, timeout=timeout, quick_mode=quick_mode, max_urls_per_batch=max_urls_per_batch ) result["source_file"] = file_path result["total_urls_in_file"] = url_count return result - src/sqli_mcp/server.py:716-716 (registration)Tool registration via @mcp.tool() decorator on the scan_urls_from_file function.
@mcp.tool() - src/sqli_mcp/server.py:488-660 (helper)scan_urls_batch is the core batch scanning logic called by scan_urls_from_file. Handles URL parsing, concurrency control, Scanner invocation, and batch result assembly.
@mcp.tool() async def scan_urls_batch( urls: str, method: str = "GET", injection_types: Optional[str] = None, database_types: Optional[str] = None, headers: Optional[str] = None, cookies: Optional[str] = None, bearer_token: Optional[str] = None, proxy_url: Optional[str] = None, verify_ssl: bool = True, waf_bypass: str = "none", concurrency: int = 3, timeout: float = 5.0, quick_mode: bool = True, max_urls_per_batch: int = 10 ) -> dict: """ Scan multiple URLs for SQL injection vulnerabilities in batch. Use quick_mode=True (default) for faster scans that won't timeout. Args: urls: Newline-separated list of URLs to scan method: HTTP method - GET or POST injection_types: Comma-separated injection types (default: error_based only in quick_mode) database_types: Comma-separated database types (default: generic,mysql in quick_mode) headers: Custom headers as key:value pairs separated by | cookies: Cookies as key=value pairs separated by ; bearer_token: Bearer token for Authorization header proxy_url: Proxy URL for Burp Suite or other proxies verify_ssl: Verify SSL certificates waf_bypass: WAF bypass technique concurrency: Number of concurrent scans (1-10, default 3) timeout: Request timeout in seconds per URL (default 5) quick_mode: Use quick scan with fewer payloads (default True, recommended for many URLs) max_urls_per_batch: Max URLs to scan in one call (default 10, use continue_batch for more) Returns: Batch scan results. If more URLs remain, use continue_batch with the batch_id. """ import uuid import time # Parse URLs url_list = [u.strip() for u in urls.strip().split('\n') if u.strip() and u.strip().startswith('http')] if not url_list: return {"error": "No valid URLs provided. URLs must start with http:// or https://"} # Limit concurrency and batch size for stability concurrency = max(1, min(10, concurrency)) max_urls_per_batch = max(1, min(25, max_urls_per_batch)) batch_id = str(uuid.uuid4())[:8] start_time = time.time() # In quick_mode, use minimal payloads for speed if quick_mode: inj_types = [InjectionType.ERROR_BASED] db_types = [DatabaseType.GENERIC, DatabaseType.MYSQL] else: inj_types = list(InjectionType) db_types = list(DatabaseType) # Override with user-specified types if provided if injection_types: inj_types = [InjectionType(t.strip()) for t in injection_types.split(",")] if database_types: db_types = [DatabaseType(t.strip()) for t in database_types.split(",")] header_dict = {} if headers: for h in headers.split("|"): if ":" in h: k, v = h.split(":", 1) header_dict[k.strip()] = v.strip() cookie_dict = {} if cookies: for c in cookies.split(";"): if "=" in c: k, v = c.split("=", 1) cookie_dict[k.strip()] = v.strip() auth = AuthConfig(headers=header_dict, cookies=cookie_dict, bearer_token=bearer_token) proxy = ProxyConfig(http_proxy=proxy_url, https_proxy=proxy_url, verify_ssl=verify_ssl) if proxy_url else None # Split into current batch and remaining current_batch = url_list[:max_urls_per_batch] remaining_urls = url_list[max_urls_per_batch:] # Store remaining for continuation if remaining_urls: pending_scans[batch_id] = { "remaining_urls": remaining_urls, "method": method, "injection_types": injection_types, "database_types": database_types, "headers": headers, "cookies": cookies, "bearer_token": bearer_token, "proxy_url": proxy_url, "verify_ssl": verify_ssl, "waf_bypass": waf_bypass, "concurrency": concurrency, "timeout": timeout, "quick_mode": quick_mode, "max_urls_per_batch": max_urls_per_batch, "all_results": [] } # Semaphore for concurrency control semaphore = asyncio.Semaphore(concurrency) async def scan_single_url(url: str) -> dict: async with semaphore: try: config = ScanConfig( target_url=url, method=HttpMethod(method.upper()), injection_types=inj_types, database_types=db_types, auth=auth, proxy=proxy, waf_bypass=WAFBypassTechnique(waf_bypass), timeout=timeout ) scanner = Scanner(config) result = await scanner.scan() scan_results[result.scan_id] = result return { "url": url, "scan_id": result.scan_id, "status": "completed", "vulnerabilities_found": len(result.vulnerabilities), "vulnerable": len(result.vulnerabilities) > 0 } except asyncio.TimeoutError: return {"url": url, "status": "timeout", "vulnerable": False} except Exception as e: return {"url": url, "status": "error", "error": str(e)[:100], "vulnerable": False} # Run scans concurrently tasks = [scan_single_url(url) for url in current_batch] results = await asyncio.gather(*tasks) duration = time.time() - start_time # Summary total_in_batch = len(results) completed = sum(1 for r in results if r["status"] == "completed") errors = sum(1 for r in results if r["status"] in ["error", "timeout"]) vulnerable_count = sum(1 for r in results if r.get("vulnerable", False)) batch_result = { "batch_id": batch_id, "urls_in_this_batch": total_in_batch, "total_urls_submitted": len(url_list), "remaining_urls": len(remaining_urls), "completed": completed, "errors": errors, "vulnerable_urls": vulnerable_count, "duration_seconds": round(duration, 2), "quick_mode": quick_mode, "results": results, "vulnerable_urls_list": [r["url"] for r in results if r.get("vulnerable", False)], "has_more": len(remaining_urls) > 0, "continue_hint": f"Use continue_batch(batch_id='{batch_id}') to scan remaining {len(remaining_urls)} URLs" if remaining_urls else None } batch_results[batch_id] = batch_result return batch_result - src/sqli_mcp/server.py:663-713 (helper)continue_batch is used for chunked scanning support; called indirectly when scan_urls_from_file's batch has more URLs remaining.
@mcp.tool() async def continue_batch(batch_id: str) -> dict: """ Continue scanning remaining URLs from a previous batch. Use this when scan_urls_batch returns has_more=True. Args: batch_id: Batch ID from a previous scan_urls_batch call Returns: Next batch of scan results """ if batch_id not in pending_scans: return {"error": f"No pending scans for batch {batch_id}. Batch may be complete or expired."} pending = pending_scans[batch_id] remaining = pending["remaining_urls"] if not remaining: del pending_scans[batch_id] return {"message": "All URLs in this batch have been scanned", "batch_id": batch_id} # Build URL string for the next batch urls_str = "\n".join(remaining) # Scan next batch result = await scan_urls_batch( urls=urls_str, method=pending["method"], injection_types=pending["injection_types"], database_types=pending["database_types"], headers=pending["headers"], cookies=pending["cookies"], bearer_token=pending["bearer_token"], proxy_url=pending["proxy_url"], verify_ssl=pending["verify_ssl"], waf_bypass=pending["waf_bypass"], concurrency=pending["concurrency"], timeout=pending["timeout"], quick_mode=pending["quick_mode"], max_urls_per_batch=pending["max_urls_per_batch"] ) # Update the original batch_id reference result["original_batch_id"] = batch_id # Clean up if complete if not result.get("has_more", False) and batch_id in pending_scans: del pending_scans[batch_id] return result - src/sqli_mcp/server.py:1-27 (schema)Module imports showing the data models (AuthConfig, ProxyConfig, ScanConfig, InjectionType, etc.) used by the tool's schema parameters.
"""SQL Injection MCP Server - Main server implementation.""" import asyncio from typing import Optional from mcp.server.fastmcp import FastMCP from sqli_mcp.models import ( DatabaseType, InjectionType, HttpMethod, WAFBypassTechnique, AuthConfig, ProxyConfig, ScanConfig, ScanResult, VulnerabilityFinding ) from sqli_mcp.scanner import Scanner, quick_scan from sqli_mcp.payloads import ( get_all_payloads, get_payloads_by_type, get_payloads_by_database, load_custom_payloads, apply_waf_bypass, get_waf_bypass_variants, PAYLOAD_CATEGORIES ) from sqli_mcp.http_client import parse_url_params # Create MCP server mcp = FastMCP("SQLi-MCP") # Store scan results for later retrieval scan_results: dict[str, ScanResult] = {} custom_payloads_cache: dict[str, list] = {}