import os
import requests
import logging
logger = logging.getLogger("PayloadEngine")
PAYLOAD_URLS = {
"sqli": "https://raw.githubusercontent.com/Swisskyrepo/PayloadsAllTheThings/master/SQL%20Injection/Intruder/detect/Generic_SQLi.txt",
"xss": "https://raw.githubusercontent.com/Swisskyrepo/PayloadsAllTheThings/master/XSS%20Injection/Intruder/xss-payloads.txt",
"rce": "https://raw.githubusercontent.com/Swisskyrepo/PayloadsAllTheThings/master/Command%20Injection/Intruder/command-injection-payloads.txt",
"lfi": "https://raw.githubusercontent.com/Swisskyrepo/PayloadsAllTheThings/master/File%20Inclusion/Intruder/LFI-Payloads.txt",
"ssti": "https://raw.githubusercontent.com/Swisskyrepo/PayloadsAllTheThings/master/Server%20Side%20Template%20Injection/Intruder/ssti-payloads.txt",
"xxe": "https://raw.githubusercontent.com/Swisskyrepo/PayloadsAllTheThings/master/XXE%20Injection/Intruder/xxe-payloads.txt"
}
DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
def update_payloads():
"""
Downloads and updates payloads from PayloadsAllTheThings repository.
"""
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
results = {}
for category, url in PAYLOAD_URLS.items():
try:
logger.info(f"Downloading {category} payloads from {url}...")
response = requests.get(url, timeout=10)
response.raise_for_status()
file_path = os.path.join(DATA_DIR, f"{category}.txt")
with open(file_path, "w", encoding="utf-8") as f:
f.write(response.text)
count = len(response.text.splitlines())
results[category] = f"Updated ({count} payloads)"
logger.info(f"Successfully updated {category} payloads.")
except Exception as e:
logger.error(f"Failed to update {category} payloads: {e}")
results[category] = f"Failed: {str(e)}"
return results