ADDING_TOOLS.mdβ’15 kB
# How to Add More Tools - Quick Reference
## Quick Start: Adding a New Tool in 5 Steps
### Step 1: Add Tool Function to `tools_extended.py`
```python
async def my_awesome_tool(
target: str,
option: str = "default",
aggressive: bool = False
) -> Dict[str, Any]:
"""
My awesome penetration testing tool
Args:
target: Target IP, hostname, or URL
option: Tool-specific option
aggressive: Enable aggressive mode
Returns:
Dict with tool results
"""
# Build command
cmd = ["my-tool", target]
if aggressive:
cmd.append("--aggressive")
cmd.extend(["--option", option])
cmd.extend(["--output", "/tmp/my_tool_output.json"])
# Execute
result = await run_command(cmd, timeout=300)
# Parse output
findings = []
try:
with open("/tmp/my_tool_output.json", "r") as f:
findings = json.load(f)
except Exception as e:
logger.error(f"Error parsing output: {e}")
return {
"tool": "my_awesome_tool",
"target": target,
"success": result["success"],
"findings": findings,
"raw_output": result.get("stdout", "")
}
```
### Step 2: Register Tool in `tool_registry.py`
Add to the `get_all_tool_definitions()` function:
```python
Tool(
name="my_awesome_tool",
description="Detailed description of what the tool does and when to use it",
inputSchema={
"type": "object",
"properties": {
"target": {
"type": "string",
"description": "Target IP address, hostname, or URL"
},
"option": {
"type": "string",
"enum": ["fast", "thorough", "stealth"],
"default": "fast",
"description": "Scanning mode"
},
"aggressive": {
"type": "boolean",
"default": False,
"description": "Enable aggressive scanning (may be detected)"
}
},
"required": ["target"]
}
),
```
### Step 3: Add to Category (in `tool_registry.py`)
```python
"web_scanning": ToolCategory(
name="Web Application Scanning",
description="Web vulnerability scanners and directory enumeration",
tools=[
"nikto_scan",
"nuclei_scan",
# ... existing tools ...
"my_awesome_tool" # Add here
]
),
```
### Step 4: Import in `server.py`
```python
# Add to imports at top
from tools_extended import (
# ... existing imports ...
my_awesome_tool
)
# Add to handle_call_tool() function
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Handle tool execution"""
try:
logger.info(f"Executing tool: {name} with arguments: {arguments}")
# Add your tool here
if name == "my_awesome_tool":
result = await my_awesome_tool(**arguments)
# ... existing tools ...
return [TextContent(type="text", text=json.dumps(result, indent=2))]
except Exception as e:
logger.error(f"Error executing tool {name}: {str(e)}", exc_info=True)
return [TextContent(type="text", text=f"Error: {str(e)}")]
```
### Step 5: Update Documentation
Add to `TOOLS_GUIDE.md`:
```markdown
### my_awesome_tool
**Purpose**: Brief description
**Usage**:
\```python
await my_awesome_tool(
target="example.com",
option="thorough",
aggressive=False
)
\```
**Output**:
- List of vulnerabilities
- Risk ratings
- Remediation steps
```
---
## Common Tool Integration Patterns
### Pattern 1: Simple Command-Line Tool
```python
async def simple_tool(target: str) -> Dict[str, Any]:
"""Simple tool with basic output"""
cmd = ["tool-name", target]
result = await run_command(cmd, timeout=60)
return {
"tool": "simple_tool",
"target": target,
"success": result["success"],
"output": result.get("stdout", "")
}
```
### Pattern 2: Tool with JSON Output
```python
async def json_output_tool(target: str) -> Dict[str, Any]:
"""Tool that produces JSON output"""
output_file = "/tmp/tool_output.json"
cmd = ["tool-name", target, "-o", output_file]
result = await run_command(cmd, timeout=300)
findings = []
try:
with open(output_file, "r") as f:
findings = json.load(f)
except Exception as e:
logger.error(f"Error parsing JSON: {e}")
return {
"tool": "json_output_tool",
"target": target,
"success": result["success"],
"findings": findings
}
```
### Pattern 3: Tool with XML Output (like Nmap)
```python
async def xml_output_tool(target: str) -> Dict[str, Any]:
"""Tool that produces XML output"""
output_file = "/tmp/tool_output.xml"
cmd = ["tool-name", target, "-oX", output_file]
result = await run_command(cmd, timeout=300)
findings = parse_xml_output(output_file)
return {
"tool": "xml_output_tool",
"target": target,
"success": result["success"],
"findings": findings
}
def parse_xml_output(xml_file: str) -> Dict[str, Any]:
"""Parse XML output"""
try:
tree = ET.parse(xml_file)
root = tree.getroot()
# Extract relevant data
findings = {}
for element in root.findall(".//important-element"):
# Parse elements
pass
return findings
except Exception as e:
logger.error(f"Error parsing XML: {e}")
return {"error": str(e)}
```
### Pattern 4: HTTP API Integration
```python
async def api_tool(target: str, api_key: str) -> Dict[str, Any]:
"""Tool with HTTP API"""
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(
f"http://tool-api.local/scan",
json={"target": target},
headers={"Authorization": f"Bearer {api_key}"}
) as resp:
if resp.status == 200:
data = await resp.json()
return {
"tool": "api_tool",
"target": target,
"success": True,
"findings": data
}
return {
"tool": "api_tool",
"target": target,
"success": False,
"error": "API request failed"
}
```
### Pattern 5: Interactive Tool (needs wrapper)
```python
async def interactive_tool(target: str, commands: List[str]) -> Dict[str, Any]:
"""Tool that requires interactive input"""
# Create command script
script_file = "/tmp/tool_commands.txt"
with open(script_file, "w") as f:
for cmd in commands:
f.write(f"{cmd}\n")
# Run tool with input from file
cmd = ["tool-name", target]
result = await run_command_with_input(cmd, script_file, timeout=300)
return {
"tool": "interactive_tool",
"target": target,
"success": result["success"],
"output": result.get("stdout", "")
}
```
### Pattern 6: Tool Requiring Root/Sudo
```python
async def privileged_tool(target: str) -> Dict[str, Any]:
"""Tool that requires root privileges"""
# Check if running as root
import os
if os.geteuid() != 0:
return {
"tool": "privileged_tool",
"target": target,
"success": False,
"error": "Tool requires root privileges"
}
cmd = ["tool-name", target]
result = await run_command(cmd, timeout=300)
return {
"tool": "privileged_tool",
"target": target,
"success": result["success"],
"output": result.get("stdout", "")
}
```
---
## Tool Categories Reference
Choose the appropriate category for your tool:
| Category | When to Use |
|----------|-------------|
| **reconnaissance** | Information gathering, OSINT |
| **web_scanning** | Web vulnerability scanners |
| **web_exploitation** | Active web attacks (SQLi, XSS) |
| **wireless** | Wi-Fi security testing |
| **network** | Network scanning and testing |
| **brute_force** | Password attacks |
| **password_cracking** | Hash cracking |
| **exploitation** | Exploit frameworks |
| **post_exploitation** | After gaining access |
| **social_engineering** | Phishing, social attacks |
| **mobile** | Android/iOS testing |
| **api_testing** | REST, GraphQL, API security |
| **forensics** | Digital forensics |
| **reverse_engineering** | Binary analysis |
| **cloud** | AWS, Azure, GCP testing |
| **container** | Docker, Kubernetes |
| **reporting** | Report generation |
| **autonomous** | AI-powered testing |
---
## Testing Your New Tool
### 1. Unit Test
```python
# test_new_tool.py
import asyncio
from tools_extended import my_awesome_tool
async def test():
result = await my_awesome_tool(
target="scanme.nmap.org",
option="fast"
)
print("Success:", result["success"])
print("Findings:", result["findings"])
asyncio.run(test())
```
### 2. Integration Test
```bash
# Test via MCP server
python3 example_usage.py
```
### 3. Manual Test
```bash
# Test the actual tool first
my-tool scanme.nmap.org --option fast
```
---
## Popular Tools to Add
### Web Security
- [ ] Arachni - Web application scanner
- [ ] Skipfish - Active web application security scanner
- [ ] W3af - Web application attack framework
- [ ] Vega - Web application scanner GUI
- [ ] ZAP API scan
- [ ] Acunetix (commercial)
- [ ] Nessus (commercial)
### Network
- [ ] OpenVAS - Vulnerability scanner
- [ ] Nessus - Vulnerability scanner
- [ ] Nexpose - Vulnerability management
- [ ] NetSparker - Web application scanner
- [ ] Legion - Network penetration testing framework
- [ ] Responder - LLMNR/NBT-NS/MDNS poisoner
### Wireless
- [ ] Fern WiFi Cracker - GUI wireless security auditor
- [ ] Ghost Phisher - Wireless/Ethernet security auditor
- [ ] PixieWPS - Offline WPS bruteforce tool
- [ ] Wifiphisher - Wi-Fi phishing tool
### Password
- [ ] RainbowCrack - Rainbow table generator
- [ ] Cain & Abel (Windows)
- [ ] Ophcrack - Windows password cracker
- [ ] THC-Hydra modules
- [ ] CeWL - Custom wordlist generator
- [ ] Crunch - Wordlist generator
### Exploitation
- [ ] Armitage - Metasploit GUI
- [ ] BeEF - Browser Exploitation Framework
- [ ] RouterSploit - Exploitation framework for routers
- [ ] Veil - Payload generator
- [ ] TheFatRat - Backdoor generator
### Web CMS
- [ ] CMSmap - CMS scanner (multiple CMS)
- [ ] Plecost - WordPress plugin scanner
- [ ] Wappalyzer integration
- [ ] BuiltWith API integration
### Mobile
- [ ] APKTool - Android APK reverse engineering
- [ ] QARK - Quick Android Review Kit
- [ ] AndroBugs - Android vulnerability scanner
- [ ] idb - iOS pentesting tool
### Cloud
- [ ] Prowler - AWS security assessment
- [ ] CloudSploit - Cloud security scanner
- [ ] ScoutSuite - Multi-cloud auditing
- [ ] Pacu - AWS exploitation
- [ ] CloudBrute - Cloud infrastructure scanner
### Database
- [ ] NoSQLMap - NoSQL database scanner
- [ ] BBQSQL - Blind SQL injection framework
- [ ] Mongoaudit - MongoDB auditing tool
### OSINT
- [ ] Maltego - OSINT and forensics
- [ ] SpiderFoot - OSINT automation
- [ ] FOCA - Metadata analysis
- [ ] Metagoofil - Metadata extraction
- [ ] OSRFramework - OSINT framework
---
## Configuration Tips
### Add Tool Configuration to `config.json`
```json
{
"tools": {
"my_awesome_tool": {
"enabled": true,
"default_option": "fast",
"timeout": 300,
"retry_on_failure": true,
"max_retries": 3
}
}
}
```
### Load Configuration in Tool
```python
import json
def load_config():
with open("/home/user/mcpkali/config.json") as f:
return json.load(f)
async def my_awesome_tool(target: str, option: str = None) -> Dict[str, Any]:
config = load_config()
tool_config = config["tools"].get("my_awesome_tool", {})
if not tool_config.get("enabled", True):
return {
"tool": "my_awesome_tool",
"success": False,
"error": "Tool is disabled in configuration"
}
if option is None:
option = tool_config.get("default_option", "fast")
timeout = tool_config.get("timeout", 300)
# ... rest of implementation
```
---
## Common Issues and Solutions
### Issue 1: Tool Not Found
```python
# Check if tool is installed
async def check_tool_available(tool_name: str) -> bool:
cmd = ["which", tool_name]
result = await run_command(cmd, timeout=5)
return result["success"]
# Use in tool function
async def my_tool(target: str) -> Dict[str, Any]:
if not await check_tool_available("my-tool"):
return {
"success": False,
"error": "Tool 'my-tool' not found. Install with: apt install my-tool"
}
# ... rest of implementation
```
### Issue 2: Timeout Issues
```python
# Use appropriate timeouts
async def long_running_tool(target: str) -> Dict[str, Any]:
# For long-running tools, increase timeout
result = await run_command(cmd, timeout=3600) # 1 hour
# Or implement progress tracking
# ... implementation
```
### Issue 3: Output Parsing Failures
```python
# Robust parsing with fallbacks
async def tool_with_parsing(target: str) -> Dict[str, Any]:
result = await run_command(cmd, timeout=300)
# Try JSON parsing first
try:
findings = json.loads(result["stdout"])
except json.JSONDecodeError:
# Fallback to regex parsing
findings = parse_with_regex(result["stdout"])
except Exception as e:
# Ultimate fallback: return raw output
findings = {"raw": result["stdout"], "error": str(e)}
return {
"tool": "tool_with_parsing",
"success": result["success"],
"findings": findings
}
```
---
## Best Practices Checklist
- [ ] Tool function is async
- [ ] Proper error handling with try/except
- [ ] Logging for debugging
- [ ] Timeout configured appropriately
- [ ] Output parsing is robust
- [ ] Tool availability check
- [ ] Configuration support
- [ ] Clear documentation
- [ ] Input validation
- [ ] Return consistent format
- [ ] Cleanup temporary files
- [ ] Security considerations
---
## Quick Reference: File Locations
```
mcpkali/
βββ tools_extended.py # Add tool functions here
βββ tool_registry.py # Register tools here
βββ server.py # Add import and routing here
βββ config.json # Add configuration here
βββ TOOLS_GUIDE.md # Add documentation here
βββ API.md # Add API docs here
βββ example_usage.py # Add examples here
```
---
## Need Help?
1. Check existing tools in `tools_extended.py` for examples
2. Review `tool_registry.py` for schema patterns
3. Test with `example_usage.py`
4. Check logs in `/var/log/mcpkali/server.log`
5. Open an issue on GitHub
---
**Happy tool integration!** π§