Skip to main content
Glama

MCP Kali Pentest

by Root1856
ADDING_TOOLS.mdβ€’15 kB
# How to Add More Tools - Quick Reference ## Quick Start: Adding a New Tool in 5 Steps ### Step 1: Add Tool Function to `tools_extended.py` ```python async def my_awesome_tool( target: str, option: str = "default", aggressive: bool = False ) -> Dict[str, Any]: """ My awesome penetration testing tool Args: target: Target IP, hostname, or URL option: Tool-specific option aggressive: Enable aggressive mode Returns: Dict with tool results """ # Build command cmd = ["my-tool", target] if aggressive: cmd.append("--aggressive") cmd.extend(["--option", option]) cmd.extend(["--output", "/tmp/my_tool_output.json"]) # Execute result = await run_command(cmd, timeout=300) # Parse output findings = [] try: with open("/tmp/my_tool_output.json", "r") as f: findings = json.load(f) except Exception as e: logger.error(f"Error parsing output: {e}") return { "tool": "my_awesome_tool", "target": target, "success": result["success"], "findings": findings, "raw_output": result.get("stdout", "") } ``` ### Step 2: Register Tool in `tool_registry.py` Add to the `get_all_tool_definitions()` function: ```python Tool( name="my_awesome_tool", description="Detailed description of what the tool does and when to use it", inputSchema={ "type": "object", "properties": { "target": { "type": "string", "description": "Target IP address, hostname, or URL" }, "option": { "type": "string", "enum": ["fast", "thorough", "stealth"], "default": "fast", "description": "Scanning mode" }, "aggressive": { "type": "boolean", "default": False, "description": "Enable aggressive scanning (may be detected)" } }, "required": ["target"] } ), ``` ### Step 3: Add to Category (in `tool_registry.py`) ```python "web_scanning": ToolCategory( name="Web Application Scanning", description="Web vulnerability scanners and directory enumeration", tools=[ "nikto_scan", "nuclei_scan", # ... existing tools ... "my_awesome_tool" # Add here ] ), ``` ### Step 4: Import in `server.py` ```python # Add to imports at top from tools_extended import ( # ... existing imports ... my_awesome_tool ) # Add to handle_call_tool() function @server.call_tool() async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]: """Handle tool execution""" try: logger.info(f"Executing tool: {name} with arguments: {arguments}") # Add your tool here if name == "my_awesome_tool": result = await my_awesome_tool(**arguments) # ... existing tools ... return [TextContent(type="text", text=json.dumps(result, indent=2))] except Exception as e: logger.error(f"Error executing tool {name}: {str(e)}", exc_info=True) return [TextContent(type="text", text=f"Error: {str(e)}")] ``` ### Step 5: Update Documentation Add to `TOOLS_GUIDE.md`: ```markdown ### my_awesome_tool **Purpose**: Brief description **Usage**: \```python await my_awesome_tool( target="example.com", option="thorough", aggressive=False ) \``` **Output**: - List of vulnerabilities - Risk ratings - Remediation steps ``` --- ## Common Tool Integration Patterns ### Pattern 1: Simple Command-Line Tool ```python async def simple_tool(target: str) -> Dict[str, Any]: """Simple tool with basic output""" cmd = ["tool-name", target] result = await run_command(cmd, timeout=60) return { "tool": "simple_tool", "target": target, "success": result["success"], "output": result.get("stdout", "") } ``` ### Pattern 2: Tool with JSON Output ```python async def json_output_tool(target: str) -> Dict[str, Any]: """Tool that produces JSON output""" output_file = "/tmp/tool_output.json" cmd = ["tool-name", target, "-o", output_file] result = await run_command(cmd, timeout=300) findings = [] try: with open(output_file, "r") as f: findings = json.load(f) except Exception as e: logger.error(f"Error parsing JSON: {e}") return { "tool": "json_output_tool", "target": target, "success": result["success"], "findings": findings } ``` ### Pattern 3: Tool with XML Output (like Nmap) ```python async def xml_output_tool(target: str) -> Dict[str, Any]: """Tool that produces XML output""" output_file = "/tmp/tool_output.xml" cmd = ["tool-name", target, "-oX", output_file] result = await run_command(cmd, timeout=300) findings = parse_xml_output(output_file) return { "tool": "xml_output_tool", "target": target, "success": result["success"], "findings": findings } def parse_xml_output(xml_file: str) -> Dict[str, Any]: """Parse XML output""" try: tree = ET.parse(xml_file) root = tree.getroot() # Extract relevant data findings = {} for element in root.findall(".//important-element"): # Parse elements pass return findings except Exception as e: logger.error(f"Error parsing XML: {e}") return {"error": str(e)} ``` ### Pattern 4: HTTP API Integration ```python async def api_tool(target: str, api_key: str) -> Dict[str, Any]: """Tool with HTTP API""" import aiohttp async with aiohttp.ClientSession() as session: async with session.post( f"http://tool-api.local/scan", json={"target": target}, headers={"Authorization": f"Bearer {api_key}"} ) as resp: if resp.status == 200: data = await resp.json() return { "tool": "api_tool", "target": target, "success": True, "findings": data } return { "tool": "api_tool", "target": target, "success": False, "error": "API request failed" } ``` ### Pattern 5: Interactive Tool (needs wrapper) ```python async def interactive_tool(target: str, commands: List[str]) -> Dict[str, Any]: """Tool that requires interactive input""" # Create command script script_file = "/tmp/tool_commands.txt" with open(script_file, "w") as f: for cmd in commands: f.write(f"{cmd}\n") # Run tool with input from file cmd = ["tool-name", target] result = await run_command_with_input(cmd, script_file, timeout=300) return { "tool": "interactive_tool", "target": target, "success": result["success"], "output": result.get("stdout", "") } ``` ### Pattern 6: Tool Requiring Root/Sudo ```python async def privileged_tool(target: str) -> Dict[str, Any]: """Tool that requires root privileges""" # Check if running as root import os if os.geteuid() != 0: return { "tool": "privileged_tool", "target": target, "success": False, "error": "Tool requires root privileges" } cmd = ["tool-name", target] result = await run_command(cmd, timeout=300) return { "tool": "privileged_tool", "target": target, "success": result["success"], "output": result.get("stdout", "") } ``` --- ## Tool Categories Reference Choose the appropriate category for your tool: | Category | When to Use | |----------|-------------| | **reconnaissance** | Information gathering, OSINT | | **web_scanning** | Web vulnerability scanners | | **web_exploitation** | Active web attacks (SQLi, XSS) | | **wireless** | Wi-Fi security testing | | **network** | Network scanning and testing | | **brute_force** | Password attacks | | **password_cracking** | Hash cracking | | **exploitation** | Exploit frameworks | | **post_exploitation** | After gaining access | | **social_engineering** | Phishing, social attacks | | **mobile** | Android/iOS testing | | **api_testing** | REST, GraphQL, API security | | **forensics** | Digital forensics | | **reverse_engineering** | Binary analysis | | **cloud** | AWS, Azure, GCP testing | | **container** | Docker, Kubernetes | | **reporting** | Report generation | | **autonomous** | AI-powered testing | --- ## Testing Your New Tool ### 1. Unit Test ```python # test_new_tool.py import asyncio from tools_extended import my_awesome_tool async def test(): result = await my_awesome_tool( target="scanme.nmap.org", option="fast" ) print("Success:", result["success"]) print("Findings:", result["findings"]) asyncio.run(test()) ``` ### 2. Integration Test ```bash # Test via MCP server python3 example_usage.py ``` ### 3. Manual Test ```bash # Test the actual tool first my-tool scanme.nmap.org --option fast ``` --- ## Popular Tools to Add ### Web Security - [ ] Arachni - Web application scanner - [ ] Skipfish - Active web application security scanner - [ ] W3af - Web application attack framework - [ ] Vega - Web application scanner GUI - [ ] ZAP API scan - [ ] Acunetix (commercial) - [ ] Nessus (commercial) ### Network - [ ] OpenVAS - Vulnerability scanner - [ ] Nessus - Vulnerability scanner - [ ] Nexpose - Vulnerability management - [ ] NetSparker - Web application scanner - [ ] Legion - Network penetration testing framework - [ ] Responder - LLMNR/NBT-NS/MDNS poisoner ### Wireless - [ ] Fern WiFi Cracker - GUI wireless security auditor - [ ] Ghost Phisher - Wireless/Ethernet security auditor - [ ] PixieWPS - Offline WPS bruteforce tool - [ ] Wifiphisher - Wi-Fi phishing tool ### Password - [ ] RainbowCrack - Rainbow table generator - [ ] Cain & Abel (Windows) - [ ] Ophcrack - Windows password cracker - [ ] THC-Hydra modules - [ ] CeWL - Custom wordlist generator - [ ] Crunch - Wordlist generator ### Exploitation - [ ] Armitage - Metasploit GUI - [ ] BeEF - Browser Exploitation Framework - [ ] RouterSploit - Exploitation framework for routers - [ ] Veil - Payload generator - [ ] TheFatRat - Backdoor generator ### Web CMS - [ ] CMSmap - CMS scanner (multiple CMS) - [ ] Plecost - WordPress plugin scanner - [ ] Wappalyzer integration - [ ] BuiltWith API integration ### Mobile - [ ] APKTool - Android APK reverse engineering - [ ] QARK - Quick Android Review Kit - [ ] AndroBugs - Android vulnerability scanner - [ ] idb - iOS pentesting tool ### Cloud - [ ] Prowler - AWS security assessment - [ ] CloudSploit - Cloud security scanner - [ ] ScoutSuite - Multi-cloud auditing - [ ] Pacu - AWS exploitation - [ ] CloudBrute - Cloud infrastructure scanner ### Database - [ ] NoSQLMap - NoSQL database scanner - [ ] BBQSQL - Blind SQL injection framework - [ ] Mongoaudit - MongoDB auditing tool ### OSINT - [ ] Maltego - OSINT and forensics - [ ] SpiderFoot - OSINT automation - [ ] FOCA - Metadata analysis - [ ] Metagoofil - Metadata extraction - [ ] OSRFramework - OSINT framework --- ## Configuration Tips ### Add Tool Configuration to `config.json` ```json { "tools": { "my_awesome_tool": { "enabled": true, "default_option": "fast", "timeout": 300, "retry_on_failure": true, "max_retries": 3 } } } ``` ### Load Configuration in Tool ```python import json def load_config(): with open("/home/user/mcpkali/config.json") as f: return json.load(f) async def my_awesome_tool(target: str, option: str = None) -> Dict[str, Any]: config = load_config() tool_config = config["tools"].get("my_awesome_tool", {}) if not tool_config.get("enabled", True): return { "tool": "my_awesome_tool", "success": False, "error": "Tool is disabled in configuration" } if option is None: option = tool_config.get("default_option", "fast") timeout = tool_config.get("timeout", 300) # ... rest of implementation ``` --- ## Common Issues and Solutions ### Issue 1: Tool Not Found ```python # Check if tool is installed async def check_tool_available(tool_name: str) -> bool: cmd = ["which", tool_name] result = await run_command(cmd, timeout=5) return result["success"] # Use in tool function async def my_tool(target: str) -> Dict[str, Any]: if not await check_tool_available("my-tool"): return { "success": False, "error": "Tool 'my-tool' not found. Install with: apt install my-tool" } # ... rest of implementation ``` ### Issue 2: Timeout Issues ```python # Use appropriate timeouts async def long_running_tool(target: str) -> Dict[str, Any]: # For long-running tools, increase timeout result = await run_command(cmd, timeout=3600) # 1 hour # Or implement progress tracking # ... implementation ``` ### Issue 3: Output Parsing Failures ```python # Robust parsing with fallbacks async def tool_with_parsing(target: str) -> Dict[str, Any]: result = await run_command(cmd, timeout=300) # Try JSON parsing first try: findings = json.loads(result["stdout"]) except json.JSONDecodeError: # Fallback to regex parsing findings = parse_with_regex(result["stdout"]) except Exception as e: # Ultimate fallback: return raw output findings = {"raw": result["stdout"], "error": str(e)} return { "tool": "tool_with_parsing", "success": result["success"], "findings": findings } ``` --- ## Best Practices Checklist - [ ] Tool function is async - [ ] Proper error handling with try/except - [ ] Logging for debugging - [ ] Timeout configured appropriately - [ ] Output parsing is robust - [ ] Tool availability check - [ ] Configuration support - [ ] Clear documentation - [ ] Input validation - [ ] Return consistent format - [ ] Cleanup temporary files - [ ] Security considerations --- ## Quick Reference: File Locations ``` mcpkali/ β”œβ”€β”€ tools_extended.py # Add tool functions here β”œβ”€β”€ tool_registry.py # Register tools here β”œβ”€β”€ server.py # Add import and routing here β”œβ”€β”€ config.json # Add configuration here β”œβ”€β”€ TOOLS_GUIDE.md # Add documentation here β”œβ”€β”€ API.md # Add API docs here └── example_usage.py # Add examples here ``` --- ## Need Help? 1. Check existing tools in `tools_extended.py` for examples 2. Review `tool_registry.py` for schema patterns 3. Test with `example_usage.py` 4. Check logs in `/var/log/mcpkali/server.log` 5. Open an issue on GitHub --- **Happy tool integration!** πŸ”§

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Root1856/mcpkali'

If you have feedback or need assistance with the MCP directory API, please join our Discord server