import aiohttp
import urllib.parse
from modules.payloads import manager
async def scan_sqli(url: str, parameters: list = [], dbms: str = "auto", level: int = 1, risk: int = 1) -> dict:
"""
Scans for SQLi using PayloadsAllTheThings payloads.
"""
findings = []
# Fetch payloads - maybe differentiate by DBMS in a smarter version
payloads = manager.get_payloads('sqli', limit=50, random_shuffle=True)
async with aiohttp.ClientSession() as session:
for payload in payloads:
if "?" in url:
parsed = urllib.parse.urlparse(url)
qs = urllib.parse.parse_qs(parsed.query)
params_to_test = parameters if parameters else qs.keys()
for param in params_to_test:
# Simple GET fuzzing
fuzzed_query = f"{param}={urllib.parse.quote(payload)}"
target_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{fuzzed_query}"
try:
async with session.get(target_url, timeout=5) as response:
text = await response.text()
# Basic error-based detection
# A real scanner needs much more robust heuristics (time-based, boolean, etc.)
errors = ["sql syntax", "mysql_fetch", "ora-", "syntax error"]
if any(e in text.lower() for e in errors):
findings.append({
"type": "SQL Injection (Error-based)",
"parameter": param,
"payload": payload,
"evidence": "Database error detected"
})
except Exception:
continue
return {
"url": url,
"scanned_payloads": len(payloads),
"vulnerabilities": findings
}