kali_server.py•40.1 kB
#!/usr/bin/env python3
import argparse
import json
import logging
import os
import subprocess
import sys
import traceback
import threading
from typing import Dict, Any
from flask import Flask, request, jsonify
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Configuration
API_PORT = int(os.environ.get("API_PORT", 5000))
DEBUG_MODE = os.environ.get("DEBUG_MODE", "0").lower() in ("1", "true", "yes", "y")
COMMAND_TIMEOUT = 180 # 5 minutes default timeout
app = Flask(__name__)
class CommandExecutor:
"""Class to handle command execution with better timeout management"""
def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT):
self.command = command
self.timeout = timeout
self.process = None
self.stdout_data = ""
self.stderr_data = ""
self.stdout_thread = None
self.stderr_thread = None
self.return_code = None
self.timed_out = False
def _read_stdout(self):
"""Thread function to continuously read stdout"""
for line in iter(self.process.stdout.readline, ''):
self.stdout_data += line
def _read_stderr(self):
"""Thread function to continuously read stderr"""
for line in iter(self.process.stderr.readline, ''):
self.stderr_data += line
def execute(self) -> Dict[str, Any]:
"""Execute the command and handle timeout gracefully"""
logger.info(f"Executing command: {self.command}")
try:
self.process = subprocess.Popen(
self.command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1 # Line buffered
)
# Start threads to read output continuously
self.stdout_thread = threading.Thread(target=self._read_stdout)
self.stderr_thread = threading.Thread(target=self._read_stderr)
self.stdout_thread.daemon = True
self.stderr_thread.daemon = True
self.stdout_thread.start()
self.stderr_thread.start()
# Wait for the process to complete or timeout
try:
self.return_code = self.process.wait(timeout=self.timeout)
# Process completed, join the threads
self.stdout_thread.join()
self.stderr_thread.join()
except subprocess.TimeoutExpired:
# Process timed out but we might have partial results
self.timed_out = True
logger.warning(f"Command timed out after {self.timeout} seconds. Terminating process.")
# Try to terminate gracefully first
self.process.terminate()
try:
self.process.wait(timeout=5) # Give it 5 seconds to terminate
except subprocess.TimeoutExpired:
# Force kill if it doesn't terminate
logger.warning("Process not responding to termination. Killing.")
self.process.kill()
# Update final output
self.return_code = -1
# Always consider it a success if we have output, even with timeout
success = True if self.timed_out and (self.stdout_data or self.stderr_data) else (self.return_code == 0)
return {
"stdout": self.stdout_data,
"stderr": self.stderr_data,
"return_code": self.return_code,
"success": success,
"timed_out": self.timed_out,
"partial_results": self.timed_out and (self.stdout_data or self.stderr_data)
}
except Exception as e:
logger.error(f"Error executing command: {str(e)}")
logger.error(traceback.format_exc())
return {
"stdout": self.stdout_data,
"stderr": f"Error executing command: {str(e)}\n{self.stderr_data}",
"return_code": -1,
"success": False,
"timed_out": False,
"partial_results": bool(self.stdout_data or self.stderr_data)
}
def execute_command(command: str) -> Dict[str, Any]:
"""
Execute a shell command and return the result
Args:
command: The command to execute
Returns:
A dictionary containing the stdout, stderr, and return code
"""
executor = CommandExecutor(command)
return executor.execute()
@app.route("/api/command", methods=["POST"])
def generic_command():
"""Execute any command provided in the request."""
try:
params = request.json
command = params.get("command", "")
if not command:
logger.warning("Command endpoint called without command parameter")
return jsonify({
"error": "Command parameter is required"
}), 400
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in command endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/nmap", methods=["POST"])
def nmap():
"""Execute nmap scan with the provided parameters."""
try:
params = request.json
target = params.get("target", "")
scan_type = params.get("scan_type", "-sCV")
ports = params.get("ports", "")
additional_args = params.get("additional_args", "-T4 -Pn")
if not target:
logger.warning("Nmap called without target parameter")
return jsonify({
"error": "Target parameter is required"
}), 400
command = f"nmap {scan_type}"
if ports:
command += f" -p {ports}"
if additional_args:
# Basic validation for additional args - more sophisticated validation would be better
command += f" {additional_args}"
command += f" {target}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in nmap endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/gobuster", methods=["POST"])
def gobuster():
"""Execute gobuster with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
mode = params.get("mode", "dir")
wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("Gobuster called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
# Validate mode
if mode not in ["dir", "dns", "fuzz", "vhost"]:
logger.warning(f"Invalid gobuster mode: {mode}")
return jsonify({
"error": f"Invalid mode: {mode}. Must be one of: dir, dns, fuzz, vhost"
}), 400
command = f"gobuster {mode} -u {url} -w {wordlist}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in gobuster endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/dirb", methods=["POST"])
def dirb():
"""Execute dirb with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("Dirb called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
command = f"dirb {url} {wordlist}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in dirb endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/nikto", methods=["POST"])
def nikto():
"""Execute nikto with the provided parameters."""
try:
params = request.json
target = params.get("target", "")
additional_args = params.get("additional_args", "")
if not target:
logger.warning("Nikto called without target parameter")
return jsonify({
"error": "Target parameter is required"
}), 400
command = f"nikto -h {target}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in nikto endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/sqlmap", methods=["POST"])
def sqlmap():
"""Execute sqlmap with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
data = params.get("data", "")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("SQLMap called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
command = f"sqlmap -u {url} --batch"
if data:
command += f" --data=\"{data}\""
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in sqlmap endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/metasploit", methods=["POST"])
def metasploit():
"""Execute metasploit module with the provided parameters."""
try:
params = request.json
module = params.get("module", "")
options = params.get("options", {})
if not module:
logger.warning("Metasploit called without module parameter")
return jsonify({
"error": "Module parameter is required"
}), 400
# Format options for Metasploit
options_str = ""
for key, value in options.items():
options_str += f" {key}={value}"
# Create an MSF resource script
resource_content = f"use {module}\n"
for key, value in options.items():
resource_content += f"set {key} {value}\n"
resource_content += "exploit\n"
# Save resource script to a temporary file
resource_file = "/tmp/mcp_msf_resource.rc"
with open(resource_file, "w") as f:
f.write(resource_content)
command = f"msfconsole -q -r {resource_file}"
result = execute_command(command)
# Clean up the temporary file
try:
os.remove(resource_file)
except Exception as e:
logger.warning(f"Error removing temporary resource file: {str(e)}")
return jsonify(result)
except Exception as e:
logger.error(f"Error in metasploit endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/hydra", methods=["POST"])
def hydra():
"""Execute hydra with the provided parameters."""
try:
params = request.json
target = params.get("target", "")
service = params.get("service", "")
username = params.get("username", "")
username_file = params.get("username_file", "")
password = params.get("password", "")
password_file = params.get("password_file", "")
additional_args = params.get("additional_args", "")
if not target or not service:
logger.warning("Hydra called without target or service parameter")
return jsonify({
"error": "Target and service parameters are required"
}), 400
if not (username or username_file) or not (password or password_file):
logger.warning("Hydra called without username/password parameters")
return jsonify({
"error": "Username/username_file and password/password_file are required"
}), 400
command = f"hydra -t 4"
if username:
command += f" -l {username}"
elif username_file:
command += f" -L {username_file}"
if password:
command += f" -p {password}"
elif password_file:
command += f" -P {password_file}"
if additional_args:
command += f" {additional_args}"
command += f" {target} {service}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in hydra endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/john", methods=["POST"])
def john():
"""Execute john with the provided parameters."""
try:
params = request.json
hash_file = params.get("hash_file", "")
wordlist = params.get("wordlist", "/usr/share/wordlists/rockyou.txt")
format_type = params.get("format", "")
additional_args = params.get("additional_args", "")
if not hash_file:
logger.warning("John called without hash_file parameter")
return jsonify({
"error": "Hash file parameter is required"
}), 400
command = f"john"
if format_type:
command += f" --format={format_type}"
if wordlist:
command += f" --wordlist={wordlist}"
if additional_args:
command += f" {additional_args}"
command += f" {hash_file}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in john endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/wpscan", methods=["POST"])
def wpscan():
"""Execute wpscan with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("WPScan called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
command = f"wpscan --url {url}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in wpscan endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/enum4linux", methods=["POST"])
def enum4linux():
"""Execute enum4linux with the provided parameters."""
try:
params = request.json
target = params.get("target", "")
additional_args = params.get("additional_args", "-a")
if not target:
logger.warning("Enum4linux called without target parameter")
return jsonify({
"error": "Target parameter is required"
}), 400
command = f"enum4linux {additional_args} {target}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in enum4linux endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/subfinder", methods=["POST"])
def subfinder():
"""Execute subfinder with the provided parameters."""
try:
params = request.json
domain = params.get("domain", "")
additional_args = params.get("additional_args", "")
if not domain:
logger.warning("Subfinder called without domain parameter")
return jsonify({
"error": "Domain parameter is required"
}), 400
command = f"subfinder -d {domain}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in subfinder endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/sublister", methods=["POST"])
def sublister():
"""Execute sublister with the provided parameters."""
try:
params = request.json
domain = params.get("domain", "")
additional_args = params.get("additional_args", "")
if not domain:
logger.warning("Sublister called without domain parameter")
return jsonify({
"error": "Domain parameter is required"
}), 400
command = f"sublist3r -d {domain}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in sublister endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/subzy", methods=["POST"])
def subzy():
"""Execute subzy with the provided parameters."""
try:
params = request.json
target = params.get("target", "")
additional_args = params.get("additional_args", "")
if not target:
logger.warning("Subzy called without target parameter")
return jsonify({
"error": "Target parameter is required"
}), 400
# Check if target is a file or domain
if os.path.isfile(target):
command = f"subzy run --targets {target}"
else:
command = f"subzy run --target {target}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in subzy endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/subjack", methods=["POST"])
def subjack():
"""Execute subjack with the provided parameters."""
try:
params = request.json
target = params.get("target", "")
additional_args = params.get("additional_args", "")
if not target:
logger.warning("Subjack called without target parameter")
return jsonify({
"error": "Target parameter is required"
}), 400
# Check if target is a file or domain
if os.path.isfile(target):
command = f"subjack -w {target}"
else:
command = f"subjack -d {target}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in subjack endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/httpx", methods=["POST"])
def httpx():
"""Execute httpx with the provided parameters."""
try:
params = request.json
target = params.get("target", "")
additional_args = params.get("additional_args", "")
if not target:
logger.warning("Httpx called without target parameter")
return jsonify({
"error": "Target parameter is required"
}), 400
# Check if target is a file or single target
if os.path.isfile(target):
command = f"httpx -l {target}"
else:
command = f"echo '{target}' | httpx"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in httpx endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/nuclei", methods=["POST"])
def nuclei():
"""Execute nuclei with the provided parameters."""
try:
params = request.json
target = params.get("target", "")
templates = params.get("templates", "")
additional_args = params.get("additional_args", "")
if not target:
logger.warning("Nuclei called without target parameter")
return jsonify({
"error": "Target parameter is required"
}), 400
# Check if target is a file or single target
if os.path.isfile(target):
command = f"nuclei -l {target}"
else:
command = f"nuclei -u {target}"
if templates:
command += f" -t {templates}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in nuclei endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/amass", methods=["POST"])
def amass():
"""Execute amass with the provided parameters."""
try:
params = request.json
domain = params.get("domain", "")
mode = params.get("mode", "enum")
additional_args = params.get("additional_args", "")
if not domain:
logger.warning("Amass called without domain parameter")
return jsonify({
"error": "Domain parameter is required"
}), 400
command = f"amass {mode} -d {domain}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in amass endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/ffuf", methods=["POST"])
def ffuf():
"""Execute ffuf with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt")
mode = params.get("mode", "dir")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("Ffuf called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
command = f"ffuf -u {url} -w {wordlist}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in ffuf endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/waybackurls", methods=["POST"])
def waybackurls():
"""Execute waybackurls with the provided parameters."""
try:
params = request.json
domain = params.get("domain", "")
additional_args = params.get("additional_args", "")
if not domain:
logger.warning("Waybackurls called without domain parameter")
return jsonify({
"error": "Domain parameter is required"
}), 400
command = f"echo '{domain}' | waybackurls"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in waybackurls endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/gau", methods=["POST"])
def gau():
"""Execute gau with the provided parameters."""
try:
params = request.json
domain = params.get("domain", "")
additional_args = params.get("additional_args", "")
if not domain:
logger.warning("Gau called without domain parameter")
return jsonify({
"error": "Domain parameter is required"
}), 400
command = f"echo '{domain}' | gau"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in gau endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/assetfinder", methods=["POST"])
def assetfinder():
"""Execute assetfinder with the provided parameters."""
try:
params = request.json
domain = params.get("domain", "")
additional_args = params.get("additional_args", "")
if not domain:
logger.warning("Assetfinder called without domain parameter")
return jsonify({
"error": "Domain parameter is required"
}), 400
command = f"assetfinder {domain}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in assetfinder endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/masscan", methods=["POST"])
def masscan():
"""Execute masscan with the provided parameters."""
try:
params = request.json
target = params.get("target", "")
ports = params.get("ports", "1-65535")
additional_args = params.get("additional_args", "")
if not target:
logger.warning("Masscan called without target parameter")
return jsonify({
"error": "Target parameter is required"
}), 400
command = f"masscan {target} -p{ports} --rate=1000"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in masscan endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/rustscan", methods=["POST"])
def rustscan():
"""Execute rustscan with the provided parameters."""
try:
params = request.json
target = params.get("target", "")
ports = params.get("ports", "")
additional_args = params.get("additional_args", "")
if not target:
logger.warning("Rustscan called without target parameter")
return jsonify({
"error": "Target parameter is required"
}), 400
command = f"rustscan -a {target}"
if ports:
command += f" -p {ports}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in rustscan endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/feroxbuster", methods=["POST"])
def feroxbuster():
"""Execute feroxbuster with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("Feroxbuster called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
command = f"feroxbuster -u {url} -w {wordlist}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in feroxbuster endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/dirsearch", methods=["POST"])
def dirsearch():
"""Execute dirsearch with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
extensions = params.get("extensions", "php,html,js,txt")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("Dirsearch called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
command = f"dirsearch -u {url} -e {extensions}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in dirsearch endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/katana", methods=["POST"])
def katana():
"""Execute katana with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("Katana called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
command = f"katana -u {url}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in katana endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/gospider", methods=["POST"])
def gospider():
"""Execute gospider with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("Gospider called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
command = f"gospider -s {url}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in gospider endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/paramspider", methods=["POST"])
def paramspider():
"""Execute paramspider with the provided parameters."""
try:
params = request.json
domain = params.get("domain", "")
additional_args = params.get("additional_args", "")
if not domain:
logger.warning("Paramspider called without domain parameter")
return jsonify({
"error": "Domain parameter is required"
}), 400
command = f"paramspider -d {domain}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in paramspider endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/arjun", methods=["POST"])
def arjun():
"""Execute arjun with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("Arjun called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
command = f"arjun -u {url}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in arjun endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/dalfox", methods=["POST"])
def dalfox():
"""Execute dalfox with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("Dalfox called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
command = f"dalfox url {url}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in dalfox endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/gf", methods=["POST"])
def gf():
"""Execute gf with the provided parameters."""
try:
params = request.json
input_data = params.get("input_data", "")
pattern = params.get("pattern", "")
additional_args = params.get("additional_args", "")
if not input_data or not pattern:
logger.warning("Gf called without input_data or pattern parameter")
return jsonify({
"error": "Input_data and pattern parameters are required"
}), 400
# Check if input_data is a file or raw data
if os.path.isfile(input_data):
command = f"cat {input_data} | gf {pattern}"
else:
command = f"echo '{input_data}' | gf {pattern}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in gf endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
# Health check endpoint
@app.route("/health", methods=["GET"])
def health_check():
"""Health check endpoint."""
# Check if essential tools are installed
essential_tools = [
"nmap", "gobuster", "dirb", "nikto", "sqlmap", "hydra", "john",
"wpscan", "enum4linux", "subfinder", "sublist3r", "subzy", "subjack",
"httpx", "nuclei", "amass", "ffuf", "waybackurls", "gau", "assetfinder",
"masscan", "rustscan", "feroxbuster", "dirsearch", "katana", "gospider",
"paramspider", "arjun", "dalfox", "gf"
]
tools_status = {}
for tool in essential_tools:
try:
result = execute_command(f"which {tool}")
tools_status[tool] = result["success"]
except:
tools_status[tool] = False
all_essential_tools_available = all(tools_status.values())
return jsonify({
"status": "healthy",
"message": "Kali Linux Tools API Server is running",
"tools_status": tools_status,
"all_essential_tools_available": all_essential_tools_available
})
@app.route("/mcp/capabilities", methods=["GET"])
def get_capabilities():
# Return tool capabilities similar to our existing MCP server
pass
@app.route("/mcp/tools/kali_tools/<tool_name>", methods=["POST"])
def execute_tool(tool_name):
# Direct tool execution without going through the API server
pass
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Run the Kali Linux API Server")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
parser.add_argument("--port", type=int, default=API_PORT, help=f"Port for the API server (default: {API_PORT})")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
# Set configuration from command line arguments
if args.debug:
DEBUG_MODE = True
os.environ["DEBUG_MODE"] = "1"
logger.setLevel(logging.DEBUG)
if args.port != API_PORT:
API_PORT = args.port
logger.info(f"Starting Kali Linux Tools API Server on port {API_PORT}")
app.run(host="0.0.0.0", port=API_PORT, debug=DEBUG_MODE)