Skip to main content
Glama
vivashu27

SQL Injection MCP Server

by vivashu27

scan_urls_from_file

Scans a file of URLs for SQL injection vulnerabilities, returning results in batches to avoid timeouts. Supports multiple injection types and database systems.

Instructions

Scan multiple URLs from a file for SQL injection vulnerabilities. Returns results in chunks to avoid timeouts. Use continue_batch to get more results.

Args: file_path: Absolute path to file containing URLs (one URL per line) method: HTTP method - GET or POST injection_types: Comma-separated injection types to test database_types: Comma-separated database types to test headers: Custom headers as key:value pairs separated by | cookies: Cookies as key=value pairs separated by ; bearer_token: Bearer token for Authorization header proxy_url: Proxy URL for Burp Suite or other proxies verify_ssl: Verify SSL certificates waf_bypass: WAF bypass technique concurrency: Number of concurrent scans (1-10, default 3) timeout: Request timeout in seconds per URL (default 5) quick_mode: Use quick scan with fewer payloads (default True) max_urls_per_batch: Max URLs to scan in one call (default 10)

Returns: Batch scan results. If more URLs remain, use continue_batch with the batch_id.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
file_pathYes
methodNoGET
injection_typesNo
database_typesNo
headersNo
cookiesNo
bearer_tokenNo
proxy_urlNo
verify_sslNo
waf_bypassNonone
concurrencyNo
timeoutNo
quick_modeNo
max_urls_per_batchNo

Implementation Reference

  • The scan_urls_from_file tool handler. Reads a file of URLs, counts them, then delegates to scan_urls_batch for the actual scanning logic.
    @mcp.tool()
    async def scan_urls_from_file(
        file_path: str,
        method: str = "GET",
        injection_types: Optional[str] = None,
        database_types: Optional[str] = None,
        headers: Optional[str] = None,
        cookies: Optional[str] = None,
        bearer_token: Optional[str] = None,
        proxy_url: Optional[str] = None,
        verify_ssl: bool = True,
        waf_bypass: str = "none",
        concurrency: int = 3,
        timeout: float = 5.0,
        quick_mode: bool = True,
        max_urls_per_batch: int = 10
    ) -> dict:
        """
        Scan multiple URLs from a file for SQL injection vulnerabilities.
        Returns results in chunks to avoid timeouts. Use continue_batch to get more results.
        
        Args:
            file_path: Absolute path to file containing URLs (one URL per line)
            method: HTTP method - GET or POST
            injection_types: Comma-separated injection types to test
            database_types: Comma-separated database types to test
            headers: Custom headers as key:value pairs separated by |
            cookies: Cookies as key=value pairs separated by ;
            bearer_token: Bearer token for Authorization header
            proxy_url: Proxy URL for Burp Suite or other proxies
            verify_ssl: Verify SSL certificates
            waf_bypass: WAF bypass technique
            concurrency: Number of concurrent scans (1-10, default 3)
            timeout: Request timeout in seconds per URL (default 5)
            quick_mode: Use quick scan with fewer payloads (default True)
            max_urls_per_batch: Max URLs to scan in one call (default 10)
        
        Returns:
            Batch scan results. If more URLs remain, use continue_batch with the batch_id.
        """
        from pathlib import Path
        
        path = Path(file_path)
        if not path.exists():
            return {"error": f"File not found: {file_path}"}
        
        try:
            with open(path, "r", encoding="utf-8") as f:
                urls = f.read()
        except Exception as e:
            return {"error": f"Failed to read file: {str(e)}"}
        
        # Count URLs for info
        url_count = len([u for u in urls.strip().split('\n') if u.strip() and u.strip().startswith('http')])
        
        # Delegate to batch scanner
        result = await scan_urls_batch(
            urls=urls,
            method=method,
            injection_types=injection_types,
            database_types=database_types,
            headers=headers,
            cookies=cookies,
            bearer_token=bearer_token,
            proxy_url=proxy_url,
            verify_ssl=verify_ssl,
            waf_bypass=waf_bypass,
            concurrency=concurrency,
            timeout=timeout,
            quick_mode=quick_mode,
            max_urls_per_batch=max_urls_per_batch
        )
        
        result["source_file"] = file_path
        result["total_urls_in_file"] = url_count
        return result
  • Tool registration via @mcp.tool() decorator on the scan_urls_from_file function.
    @mcp.tool()
  • scan_urls_batch is the core batch scanning logic called by scan_urls_from_file. Handles URL parsing, concurrency control, Scanner invocation, and batch result assembly.
    @mcp.tool()
    async def scan_urls_batch(
        urls: str,
        method: str = "GET",
        injection_types: Optional[str] = None,
        database_types: Optional[str] = None,
        headers: Optional[str] = None,
        cookies: Optional[str] = None,
        bearer_token: Optional[str] = None,
        proxy_url: Optional[str] = None,
        verify_ssl: bool = True,
        waf_bypass: str = "none",
        concurrency: int = 3,
        timeout: float = 5.0,
        quick_mode: bool = True,
        max_urls_per_batch: int = 10
    ) -> dict:
        """
        Scan multiple URLs for SQL injection vulnerabilities in batch.
        Use quick_mode=True (default) for faster scans that won't timeout.
        
        Args:
            urls: Newline-separated list of URLs to scan
            method: HTTP method - GET or POST
            injection_types: Comma-separated injection types (default: error_based only in quick_mode)
            database_types: Comma-separated database types (default: generic,mysql in quick_mode)
            headers: Custom headers as key:value pairs separated by |
            cookies: Cookies as key=value pairs separated by ;
            bearer_token: Bearer token for Authorization header
            proxy_url: Proxy URL for Burp Suite or other proxies
            verify_ssl: Verify SSL certificates
            waf_bypass: WAF bypass technique
            concurrency: Number of concurrent scans (1-10, default 3)
            timeout: Request timeout in seconds per URL (default 5)
            quick_mode: Use quick scan with fewer payloads (default True, recommended for many URLs)
            max_urls_per_batch: Max URLs to scan in one call (default 10, use continue_batch for more)
        
        Returns:
            Batch scan results. If more URLs remain, use continue_batch with the batch_id.
        """
        import uuid
        import time
        
        # Parse URLs
        url_list = [u.strip() for u in urls.strip().split('\n') if u.strip() and u.strip().startswith('http')]
        
        if not url_list:
            return {"error": "No valid URLs provided. URLs must start with http:// or https://"}
        
        # Limit concurrency and batch size for stability
        concurrency = max(1, min(10, concurrency))
        max_urls_per_batch = max(1, min(25, max_urls_per_batch))
        
        batch_id = str(uuid.uuid4())[:8]
        start_time = time.time()
        
        # In quick_mode, use minimal payloads for speed
        if quick_mode:
            inj_types = [InjectionType.ERROR_BASED]
            db_types = [DatabaseType.GENERIC, DatabaseType.MYSQL]
        else:
            inj_types = list(InjectionType)
            db_types = list(DatabaseType)
        
        # Override with user-specified types if provided
        if injection_types:
            inj_types = [InjectionType(t.strip()) for t in injection_types.split(",")]
        if database_types:
            db_types = [DatabaseType(t.strip()) for t in database_types.split(",")]
        
        header_dict = {}
        if headers:
            for h in headers.split("|"):
                if ":" in h:
                    k, v = h.split(":", 1)
                    header_dict[k.strip()] = v.strip()
        
        cookie_dict = {}
        if cookies:
            for c in cookies.split(";"):
                if "=" in c:
                    k, v = c.split("=", 1)
                    cookie_dict[k.strip()] = v.strip()
        
        auth = AuthConfig(headers=header_dict, cookies=cookie_dict, bearer_token=bearer_token)
        proxy = ProxyConfig(http_proxy=proxy_url, https_proxy=proxy_url, verify_ssl=verify_ssl) if proxy_url else None
        
        # Split into current batch and remaining
        current_batch = url_list[:max_urls_per_batch]
        remaining_urls = url_list[max_urls_per_batch:]
        
        # Store remaining for continuation
        if remaining_urls:
            pending_scans[batch_id] = {
                "remaining_urls": remaining_urls,
                "method": method,
                "injection_types": injection_types,
                "database_types": database_types,
                "headers": headers,
                "cookies": cookies,
                "bearer_token": bearer_token,
                "proxy_url": proxy_url,
                "verify_ssl": verify_ssl,
                "waf_bypass": waf_bypass,
                "concurrency": concurrency,
                "timeout": timeout,
                "quick_mode": quick_mode,
                "max_urls_per_batch": max_urls_per_batch,
                "all_results": []
            }
        
        # Semaphore for concurrency control
        semaphore = asyncio.Semaphore(concurrency)
        
        async def scan_single_url(url: str) -> dict:
            async with semaphore:
                try:
                    config = ScanConfig(
                        target_url=url,
                        method=HttpMethod(method.upper()),
                        injection_types=inj_types,
                        database_types=db_types,
                        auth=auth,
                        proxy=proxy,
                        waf_bypass=WAFBypassTechnique(waf_bypass),
                        timeout=timeout
                    )
                    scanner = Scanner(config)
                    result = await scanner.scan()
                    scan_results[result.scan_id] = result
                    
                    return {
                        "url": url,
                        "scan_id": result.scan_id,
                        "status": "completed",
                        "vulnerabilities_found": len(result.vulnerabilities),
                        "vulnerable": len(result.vulnerabilities) > 0
                    }
                except asyncio.TimeoutError:
                    return {"url": url, "status": "timeout", "vulnerable": False}
                except Exception as e:
                    return {"url": url, "status": "error", "error": str(e)[:100], "vulnerable": False}
        
        # Run scans concurrently
        tasks = [scan_single_url(url) for url in current_batch]
        results = await asyncio.gather(*tasks)
        
        duration = time.time() - start_time
        
        # Summary
        total_in_batch = len(results)
        completed = sum(1 for r in results if r["status"] == "completed")
        errors = sum(1 for r in results if r["status"] in ["error", "timeout"])
        vulnerable_count = sum(1 for r in results if r.get("vulnerable", False))
        
        batch_result = {
            "batch_id": batch_id,
            "urls_in_this_batch": total_in_batch,
            "total_urls_submitted": len(url_list),
            "remaining_urls": len(remaining_urls),
            "completed": completed,
            "errors": errors,
            "vulnerable_urls": vulnerable_count,
            "duration_seconds": round(duration, 2),
            "quick_mode": quick_mode,
            "results": results,
            "vulnerable_urls_list": [r["url"] for r in results if r.get("vulnerable", False)],
            "has_more": len(remaining_urls) > 0,
            "continue_hint": f"Use continue_batch(batch_id='{batch_id}') to scan remaining {len(remaining_urls)} URLs" if remaining_urls else None
        }
        
        batch_results[batch_id] = batch_result
        return batch_result
  • continue_batch is used for chunked scanning support; called indirectly when scan_urls_from_file's batch has more URLs remaining.
    @mcp.tool()
    async def continue_batch(batch_id: str) -> dict:
        """
        Continue scanning remaining URLs from a previous batch.
        Use this when scan_urls_batch returns has_more=True.
        
        Args:
            batch_id: Batch ID from a previous scan_urls_batch call
        
        Returns:
            Next batch of scan results
        """
        if batch_id not in pending_scans:
            return {"error": f"No pending scans for batch {batch_id}. Batch may be complete or expired."}
        
        pending = pending_scans[batch_id]
        remaining = pending["remaining_urls"]
        
        if not remaining:
            del pending_scans[batch_id]
            return {"message": "All URLs in this batch have been scanned", "batch_id": batch_id}
        
        # Build URL string for the next batch
        urls_str = "\n".join(remaining)
        
        # Scan next batch
        result = await scan_urls_batch(
            urls=urls_str,
            method=pending["method"],
            injection_types=pending["injection_types"],
            database_types=pending["database_types"],
            headers=pending["headers"],
            cookies=pending["cookies"],
            bearer_token=pending["bearer_token"],
            proxy_url=pending["proxy_url"],
            verify_ssl=pending["verify_ssl"],
            waf_bypass=pending["waf_bypass"],
            concurrency=pending["concurrency"],
            timeout=pending["timeout"],
            quick_mode=pending["quick_mode"],
            max_urls_per_batch=pending["max_urls_per_batch"]
        )
        
        # Update the original batch_id reference
        result["original_batch_id"] = batch_id
        
        # Clean up if complete
        if not result.get("has_more", False) and batch_id in pending_scans:
            del pending_scans[batch_id]
        
        return result
  • Module imports showing the data models (AuthConfig, ProxyConfig, ScanConfig, InjectionType, etc.) used by the tool's schema parameters.
    """SQL Injection MCP Server - Main server implementation."""
    
    import asyncio
    from typing import Optional
    from mcp.server.fastmcp import FastMCP
    
    from sqli_mcp.models import (
        DatabaseType, InjectionType, HttpMethod, WAFBypassTechnique,
        AuthConfig, ProxyConfig, ScanConfig, ScanResult, VulnerabilityFinding
    )
    from sqli_mcp.scanner import Scanner, quick_scan
    from sqli_mcp.payloads import (
        get_all_payloads, get_payloads_by_type, get_payloads_by_database,
        load_custom_payloads, apply_waf_bypass, get_waf_bypass_variants,
        PAYLOAD_CATEGORIES
    )
    from sqli_mcp.http_client import parse_url_params
    
    
    # Create MCP server
    mcp = FastMCP("SQLi-MCP")
    
    # Store scan results for later retrieval
    scan_results: dict[str, ScanResult] = {}
    custom_payloads_cache: dict[str, list] = {}
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations, the description carries the full burden. It explains chunked results and continuation, but fails to disclose whether the scan is passive/active, if it can trigger WAF alerts, or any destructive potential. The safety profile (e.g., rate limiting, impact on target systems) is not addressed.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is well-structured with a brief intro, a key usage hint (batching/continue_batch), and a clear list of arguments. Every sentence adds value and is front-loaded. No redundancy.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness3/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given 14 parameters, no annotations, and no output schema, the description provides necessary parameter details and continuation guidance. However, it fails to describe the output structure (e.g., batch_id, result format), which is critical for an agent to process results. Behavioral gaps further reduce completeness.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The schema has 0% description coverage, so the description's Args section adds essential meaning: file_path requires absolute path, method specifies GET/POST, etc. It also includes defaults. However, some parameters (e.g., injection_types, waf_bypass) lack valid values or examples, which would improve clarity.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states it scans multiple URLs from a file for SQL injection vulnerabilities, distinguishing it from single-URL or manual input tools. However, it does not explicitly differentiate from sibling tools like scan_urls_batch, which may also handle multiple URLs but without a file.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines3/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description mentions batching and the continue_batch tool for retrieving more results, providing some guidance on usage flow. However, it lacks explicit when-to-use or when-not-to-use guidance compared to alternative scanning tools (e.g., scan_url, scan_urls_batch), and does not mention prerequisites like file permissions or access.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vivashu27/SQLinjector_MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server