Skip to main content
Glama

Kali MCP Server

by letchupkt
kali_server.py40.1 kB
#!/usr/bin/env python3 import argparse import json import logging import os import subprocess import sys import traceback import threading from typing import Dict, Any from flask import Flask, request, jsonify # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) # Configuration API_PORT = int(os.environ.get("API_PORT", 5000)) DEBUG_MODE = os.environ.get("DEBUG_MODE", "0").lower() in ("1", "true", "yes", "y") COMMAND_TIMEOUT = 180 # 5 minutes default timeout app = Flask(__name__) class CommandExecutor: """Class to handle command execution with better timeout management""" def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT): self.command = command self.timeout = timeout self.process = None self.stdout_data = "" self.stderr_data = "" self.stdout_thread = None self.stderr_thread = None self.return_code = None self.timed_out = False def _read_stdout(self): """Thread function to continuously read stdout""" for line in iter(self.process.stdout.readline, ''): self.stdout_data += line def _read_stderr(self): """Thread function to continuously read stderr""" for line in iter(self.process.stderr.readline, ''): self.stderr_data += line def execute(self) -> Dict[str, Any]: """Execute the command and handle timeout gracefully""" logger.info(f"Executing command: {self.command}") try: self.process = subprocess.Popen( self.command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 # Line buffered ) # Start threads to read output continuously self.stdout_thread = threading.Thread(target=self._read_stdout) self.stderr_thread = threading.Thread(target=self._read_stderr) self.stdout_thread.daemon = True self.stderr_thread.daemon = True self.stdout_thread.start() self.stderr_thread.start() # Wait for the process to complete or timeout try: self.return_code = self.process.wait(timeout=self.timeout) # Process completed, join the threads self.stdout_thread.join() self.stderr_thread.join() except subprocess.TimeoutExpired: # Process timed out but we might have partial results self.timed_out = True logger.warning(f"Command timed out after {self.timeout} seconds. Terminating process.") # Try to terminate gracefully first self.process.terminate() try: self.process.wait(timeout=5) # Give it 5 seconds to terminate except subprocess.TimeoutExpired: # Force kill if it doesn't terminate logger.warning("Process not responding to termination. Killing.") self.process.kill() # Update final output self.return_code = -1 # Always consider it a success if we have output, even with timeout success = True if self.timed_out and (self.stdout_data or self.stderr_data) else (self.return_code == 0) return { "stdout": self.stdout_data, "stderr": self.stderr_data, "return_code": self.return_code, "success": success, "timed_out": self.timed_out, "partial_results": self.timed_out and (self.stdout_data or self.stderr_data) } except Exception as e: logger.error(f"Error executing command: {str(e)}") logger.error(traceback.format_exc()) return { "stdout": self.stdout_data, "stderr": f"Error executing command: {str(e)}\n{self.stderr_data}", "return_code": -1, "success": False, "timed_out": False, "partial_results": bool(self.stdout_data or self.stderr_data) } def execute_command(command: str) -> Dict[str, Any]: """ Execute a shell command and return the result Args: command: The command to execute Returns: A dictionary containing the stdout, stderr, and return code """ executor = CommandExecutor(command) return executor.execute() @app.route("/api/command", methods=["POST"]) def generic_command(): """Execute any command provided in the request.""" try: params = request.json command = params.get("command", "") if not command: logger.warning("Command endpoint called without command parameter") return jsonify({ "error": "Command parameter is required" }), 400 result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in command endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/nmap", methods=["POST"]) def nmap(): """Execute nmap scan with the provided parameters.""" try: params = request.json target = params.get("target", "") scan_type = params.get("scan_type", "-sCV") ports = params.get("ports", "") additional_args = params.get("additional_args", "-T4 -Pn") if not target: logger.warning("Nmap called without target parameter") return jsonify({ "error": "Target parameter is required" }), 400 command = f"nmap {scan_type}" if ports: command += f" -p {ports}" if additional_args: # Basic validation for additional args - more sophisticated validation would be better command += f" {additional_args}" command += f" {target}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in nmap endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/gobuster", methods=["POST"]) def gobuster(): """Execute gobuster with the provided parameters.""" try: params = request.json url = params.get("url", "") mode = params.get("mode", "dir") wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt") additional_args = params.get("additional_args", "") if not url: logger.warning("Gobuster called without URL parameter") return jsonify({ "error": "URL parameter is required" }), 400 # Validate mode if mode not in ["dir", "dns", "fuzz", "vhost"]: logger.warning(f"Invalid gobuster mode: {mode}") return jsonify({ "error": f"Invalid mode: {mode}. Must be one of: dir, dns, fuzz, vhost" }), 400 command = f"gobuster {mode} -u {url} -w {wordlist}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in gobuster endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/dirb", methods=["POST"]) def dirb(): """Execute dirb with the provided parameters.""" try: params = request.json url = params.get("url", "") wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt") additional_args = params.get("additional_args", "") if not url: logger.warning("Dirb called without URL parameter") return jsonify({ "error": "URL parameter is required" }), 400 command = f"dirb {url} {wordlist}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in dirb endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/nikto", methods=["POST"]) def nikto(): """Execute nikto with the provided parameters.""" try: params = request.json target = params.get("target", "") additional_args = params.get("additional_args", "") if not target: logger.warning("Nikto called without target parameter") return jsonify({ "error": "Target parameter is required" }), 400 command = f"nikto -h {target}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in nikto endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/sqlmap", methods=["POST"]) def sqlmap(): """Execute sqlmap with the provided parameters.""" try: params = request.json url = params.get("url", "") data = params.get("data", "") additional_args = params.get("additional_args", "") if not url: logger.warning("SQLMap called without URL parameter") return jsonify({ "error": "URL parameter is required" }), 400 command = f"sqlmap -u {url} --batch" if data: command += f" --data=\"{data}\"" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in sqlmap endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/metasploit", methods=["POST"]) def metasploit(): """Execute metasploit module with the provided parameters.""" try: params = request.json module = params.get("module", "") options = params.get("options", {}) if not module: logger.warning("Metasploit called without module parameter") return jsonify({ "error": "Module parameter is required" }), 400 # Format options for Metasploit options_str = "" for key, value in options.items(): options_str += f" {key}={value}" # Create an MSF resource script resource_content = f"use {module}\n" for key, value in options.items(): resource_content += f"set {key} {value}\n" resource_content += "exploit\n" # Save resource script to a temporary file resource_file = "/tmp/mcp_msf_resource.rc" with open(resource_file, "w") as f: f.write(resource_content) command = f"msfconsole -q -r {resource_file}" result = execute_command(command) # Clean up the temporary file try: os.remove(resource_file) except Exception as e: logger.warning(f"Error removing temporary resource file: {str(e)}") return jsonify(result) except Exception as e: logger.error(f"Error in metasploit endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/hydra", methods=["POST"]) def hydra(): """Execute hydra with the provided parameters.""" try: params = request.json target = params.get("target", "") service = params.get("service", "") username = params.get("username", "") username_file = params.get("username_file", "") password = params.get("password", "") password_file = params.get("password_file", "") additional_args = params.get("additional_args", "") if not target or not service: logger.warning("Hydra called without target or service parameter") return jsonify({ "error": "Target and service parameters are required" }), 400 if not (username or username_file) or not (password or password_file): logger.warning("Hydra called without username/password parameters") return jsonify({ "error": "Username/username_file and password/password_file are required" }), 400 command = f"hydra -t 4" if username: command += f" -l {username}" elif username_file: command += f" -L {username_file}" if password: command += f" -p {password}" elif password_file: command += f" -P {password_file}" if additional_args: command += f" {additional_args}" command += f" {target} {service}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in hydra endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/john", methods=["POST"]) def john(): """Execute john with the provided parameters.""" try: params = request.json hash_file = params.get("hash_file", "") wordlist = params.get("wordlist", "/usr/share/wordlists/rockyou.txt") format_type = params.get("format", "") additional_args = params.get("additional_args", "") if not hash_file: logger.warning("John called without hash_file parameter") return jsonify({ "error": "Hash file parameter is required" }), 400 command = f"john" if format_type: command += f" --format={format_type}" if wordlist: command += f" --wordlist={wordlist}" if additional_args: command += f" {additional_args}" command += f" {hash_file}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in john endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/wpscan", methods=["POST"]) def wpscan(): """Execute wpscan with the provided parameters.""" try: params = request.json url = params.get("url", "") additional_args = params.get("additional_args", "") if not url: logger.warning("WPScan called without URL parameter") return jsonify({ "error": "URL parameter is required" }), 400 command = f"wpscan --url {url}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in wpscan endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/enum4linux", methods=["POST"]) def enum4linux(): """Execute enum4linux with the provided parameters.""" try: params = request.json target = params.get("target", "") additional_args = params.get("additional_args", "-a") if not target: logger.warning("Enum4linux called without target parameter") return jsonify({ "error": "Target parameter is required" }), 400 command = f"enum4linux {additional_args} {target}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in enum4linux endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/subfinder", methods=["POST"]) def subfinder(): """Execute subfinder with the provided parameters.""" try: params = request.json domain = params.get("domain", "") additional_args = params.get("additional_args", "") if not domain: logger.warning("Subfinder called without domain parameter") return jsonify({ "error": "Domain parameter is required" }), 400 command = f"subfinder -d {domain}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in subfinder endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/sublister", methods=["POST"]) def sublister(): """Execute sublister with the provided parameters.""" try: params = request.json domain = params.get("domain", "") additional_args = params.get("additional_args", "") if not domain: logger.warning("Sublister called without domain parameter") return jsonify({ "error": "Domain parameter is required" }), 400 command = f"sublist3r -d {domain}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in sublister endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/subzy", methods=["POST"]) def subzy(): """Execute subzy with the provided parameters.""" try: params = request.json target = params.get("target", "") additional_args = params.get("additional_args", "") if not target: logger.warning("Subzy called without target parameter") return jsonify({ "error": "Target parameter is required" }), 400 # Check if target is a file or domain if os.path.isfile(target): command = f"subzy run --targets {target}" else: command = f"subzy run --target {target}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in subzy endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/subjack", methods=["POST"]) def subjack(): """Execute subjack with the provided parameters.""" try: params = request.json target = params.get("target", "") additional_args = params.get("additional_args", "") if not target: logger.warning("Subjack called without target parameter") return jsonify({ "error": "Target parameter is required" }), 400 # Check if target is a file or domain if os.path.isfile(target): command = f"subjack -w {target}" else: command = f"subjack -d {target}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in subjack endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/httpx", methods=["POST"]) def httpx(): """Execute httpx with the provided parameters.""" try: params = request.json target = params.get("target", "") additional_args = params.get("additional_args", "") if not target: logger.warning("Httpx called without target parameter") return jsonify({ "error": "Target parameter is required" }), 400 # Check if target is a file or single target if os.path.isfile(target): command = f"httpx -l {target}" else: command = f"echo '{target}' | httpx" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in httpx endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/nuclei", methods=["POST"]) def nuclei(): """Execute nuclei with the provided parameters.""" try: params = request.json target = params.get("target", "") templates = params.get("templates", "") additional_args = params.get("additional_args", "") if not target: logger.warning("Nuclei called without target parameter") return jsonify({ "error": "Target parameter is required" }), 400 # Check if target is a file or single target if os.path.isfile(target): command = f"nuclei -l {target}" else: command = f"nuclei -u {target}" if templates: command += f" -t {templates}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in nuclei endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/amass", methods=["POST"]) def amass(): """Execute amass with the provided parameters.""" try: params = request.json domain = params.get("domain", "") mode = params.get("mode", "enum") additional_args = params.get("additional_args", "") if not domain: logger.warning("Amass called without domain parameter") return jsonify({ "error": "Domain parameter is required" }), 400 command = f"amass {mode} -d {domain}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in amass endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/ffuf", methods=["POST"]) def ffuf(): """Execute ffuf with the provided parameters.""" try: params = request.json url = params.get("url", "") wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt") mode = params.get("mode", "dir") additional_args = params.get("additional_args", "") if not url: logger.warning("Ffuf called without URL parameter") return jsonify({ "error": "URL parameter is required" }), 400 command = f"ffuf -u {url} -w {wordlist}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in ffuf endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/waybackurls", methods=["POST"]) def waybackurls(): """Execute waybackurls with the provided parameters.""" try: params = request.json domain = params.get("domain", "") additional_args = params.get("additional_args", "") if not domain: logger.warning("Waybackurls called without domain parameter") return jsonify({ "error": "Domain parameter is required" }), 400 command = f"echo '{domain}' | waybackurls" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in waybackurls endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/gau", methods=["POST"]) def gau(): """Execute gau with the provided parameters.""" try: params = request.json domain = params.get("domain", "") additional_args = params.get("additional_args", "") if not domain: logger.warning("Gau called without domain parameter") return jsonify({ "error": "Domain parameter is required" }), 400 command = f"echo '{domain}' | gau" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in gau endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/assetfinder", methods=["POST"]) def assetfinder(): """Execute assetfinder with the provided parameters.""" try: params = request.json domain = params.get("domain", "") additional_args = params.get("additional_args", "") if not domain: logger.warning("Assetfinder called without domain parameter") return jsonify({ "error": "Domain parameter is required" }), 400 command = f"assetfinder {domain}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in assetfinder endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/masscan", methods=["POST"]) def masscan(): """Execute masscan with the provided parameters.""" try: params = request.json target = params.get("target", "") ports = params.get("ports", "1-65535") additional_args = params.get("additional_args", "") if not target: logger.warning("Masscan called without target parameter") return jsonify({ "error": "Target parameter is required" }), 400 command = f"masscan {target} -p{ports} --rate=1000" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in masscan endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/rustscan", methods=["POST"]) def rustscan(): """Execute rustscan with the provided parameters.""" try: params = request.json target = params.get("target", "") ports = params.get("ports", "") additional_args = params.get("additional_args", "") if not target: logger.warning("Rustscan called without target parameter") return jsonify({ "error": "Target parameter is required" }), 400 command = f"rustscan -a {target}" if ports: command += f" -p {ports}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in rustscan endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/feroxbuster", methods=["POST"]) def feroxbuster(): """Execute feroxbuster with the provided parameters.""" try: params = request.json url = params.get("url", "") wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt") additional_args = params.get("additional_args", "") if not url: logger.warning("Feroxbuster called without URL parameter") return jsonify({ "error": "URL parameter is required" }), 400 command = f"feroxbuster -u {url} -w {wordlist}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in feroxbuster endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/dirsearch", methods=["POST"]) def dirsearch(): """Execute dirsearch with the provided parameters.""" try: params = request.json url = params.get("url", "") extensions = params.get("extensions", "php,html,js,txt") additional_args = params.get("additional_args", "") if not url: logger.warning("Dirsearch called without URL parameter") return jsonify({ "error": "URL parameter is required" }), 400 command = f"dirsearch -u {url} -e {extensions}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in dirsearch endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/katana", methods=["POST"]) def katana(): """Execute katana with the provided parameters.""" try: params = request.json url = params.get("url", "") additional_args = params.get("additional_args", "") if not url: logger.warning("Katana called without URL parameter") return jsonify({ "error": "URL parameter is required" }), 400 command = f"katana -u {url}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in katana endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/gospider", methods=["POST"]) def gospider(): """Execute gospider with the provided parameters.""" try: params = request.json url = params.get("url", "") additional_args = params.get("additional_args", "") if not url: logger.warning("Gospider called without URL parameter") return jsonify({ "error": "URL parameter is required" }), 400 command = f"gospider -s {url}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in gospider endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/paramspider", methods=["POST"]) def paramspider(): """Execute paramspider with the provided parameters.""" try: params = request.json domain = params.get("domain", "") additional_args = params.get("additional_args", "") if not domain: logger.warning("Paramspider called without domain parameter") return jsonify({ "error": "Domain parameter is required" }), 400 command = f"paramspider -d {domain}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in paramspider endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/arjun", methods=["POST"]) def arjun(): """Execute arjun with the provided parameters.""" try: params = request.json url = params.get("url", "") additional_args = params.get("additional_args", "") if not url: logger.warning("Arjun called without URL parameter") return jsonify({ "error": "URL parameter is required" }), 400 command = f"arjun -u {url}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in arjun endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/dalfox", methods=["POST"]) def dalfox(): """Execute dalfox with the provided parameters.""" try: params = request.json url = params.get("url", "") additional_args = params.get("additional_args", "") if not url: logger.warning("Dalfox called without URL parameter") return jsonify({ "error": "URL parameter is required" }), 400 command = f"dalfox url {url}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in dalfox endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 @app.route("/api/tools/gf", methods=["POST"]) def gf(): """Execute gf with the provided parameters.""" try: params = request.json input_data = params.get("input_data", "") pattern = params.get("pattern", "") additional_args = params.get("additional_args", "") if not input_data or not pattern: logger.warning("Gf called without input_data or pattern parameter") return jsonify({ "error": "Input_data and pattern parameters are required" }), 400 # Check if input_data is a file or raw data if os.path.isfile(input_data): command = f"cat {input_data} | gf {pattern}" else: command = f"echo '{input_data}' | gf {pattern}" if additional_args: command += f" {additional_args}" result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in gf endpoint: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ "error": f"Server error: {str(e)}" }), 500 # Health check endpoint @app.route("/health", methods=["GET"]) def health_check(): """Health check endpoint.""" # Check if essential tools are installed essential_tools = [ "nmap", "gobuster", "dirb", "nikto", "sqlmap", "hydra", "john", "wpscan", "enum4linux", "subfinder", "sublist3r", "subzy", "subjack", "httpx", "nuclei", "amass", "ffuf", "waybackurls", "gau", "assetfinder", "masscan", "rustscan", "feroxbuster", "dirsearch", "katana", "gospider", "paramspider", "arjun", "dalfox", "gf" ] tools_status = {} for tool in essential_tools: try: result = execute_command(f"which {tool}") tools_status[tool] = result["success"] except: tools_status[tool] = False all_essential_tools_available = all(tools_status.values()) return jsonify({ "status": "healthy", "message": "Kali Linux Tools API Server is running", "tools_status": tools_status, "all_essential_tools_available": all_essential_tools_available }) @app.route("/mcp/capabilities", methods=["GET"]) def get_capabilities(): # Return tool capabilities similar to our existing MCP server pass @app.route("/mcp/tools/kali_tools/<tool_name>", methods=["POST"]) def execute_tool(tool_name): # Direct tool execution without going through the API server pass def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="Run the Kali Linux API Server") parser.add_argument("--debug", action="store_true", help="Enable debug mode") parser.add_argument("--port", type=int, default=API_PORT, help=f"Port for the API server (default: {API_PORT})") return parser.parse_args() if __name__ == "__main__": args = parse_args() # Set configuration from command line arguments if args.debug: DEBUG_MODE = True os.environ["DEBUG_MODE"] = "1" logger.setLevel(logging.DEBUG) if args.port != API_PORT: API_PORT = args.port logger.info(f"Starting Kali Linux Tools API Server on port {API_PORT}") app.run(host="0.0.0.0", port=API_PORT, debug=DEBUG_MODE)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/letchupkt/kali-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server