Skip to main content
Glama

Kali Linux MCP Server

by pellax
simple_http_server.py9.83 kB
#!/usr/bin/env python3 """ Simple HTTP MCP Server - Runs without external dependencies Demonstrates the Kali MCP server concept via HTTP endpoints """ import json import subprocess import re from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs # Security patterns SAFE_IP_PATTERN = re.compile(r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:/(?:3[0-2]|[12]?[0-9]))?$') class KaliMCPHandler(BaseHTTPRequestHandler): def do_GET(self): """Handle GET requests""" parsed_path = urlparse(self.path) path = parsed_path.path if path == '/': self.send_homepage() elif path == '/tools': self.send_tools_list() elif path == '/health': self.send_health_check() elif path.startswith('/tool/'): tool_name = path.split('/')[-1] params = parse_qs(parsed_path.query) self.execute_tool(tool_name, params) else: self.send_error(404, "Not Found") def send_response_json(self, data, status=200): """Send JSON response""" self.send_response(status) self.send_header('Content-type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(data, indent=2).encode()) def send_homepage(self): """Send homepage""" html = """ <!DOCTYPE html> <html> <head> <title>Kali MCP Server</title> <style> body { font-family: Arial, sans-serif; margin: 40px; background: #f5f5f5; } .container { max-width: 800px; margin: 0 auto; background: white; padding: 30px; border-radius: 8px; } .tool { background: #f8f9fa; padding: 15px; margin: 10px 0; border-radius: 5px; } .endpoint { color: #007bff; font-family: monospace; } h1 { color: #dc3545; } </style> </head> <body> <div class="container"> <h1>🔒 Kali Linux MCP Server</h1> <p>Model Context Protocol server for Kali Linux security tools</p> <h2>Available Endpoints</h2> <div class="tool"> <strong>GET /tools</strong> - List all available tools </div> <div class="tool"> <strong>GET /health</strong> - Server health check </div> <div class="tool"> <strong>GET /tool/nmap_scan?target=127.0.0.1</strong> - Network scan </div> <div class="tool"> <strong>GET /tool/searchsploit_query?query=apache</strong> - Exploit search </div> <h2>Available Tools</h2> <div class="tool">• nmap_scan - Network discovery and port scanning</div> <div class="tool">• gobuster_dir - Directory enumeration</div> <div class="tool">• wpscan_scan - WordPress security scanning</div> <div class="tool">• sqlmap_test - SQL injection testing</div> <div class="tool">• enum4linux_scan - SMB/NetBIOS enumeration</div> <div class="tool">• searchsploit_query - Exploit database search</div> <h2>Security Features</h2> <ul> <li>Input sanitization and validation</li> <li>Command execution timeouts</li> <li>Network target restrictions</li> <li>Non-root execution</li> </ul> <p><small>Server running on port 8000 | For authorized security testing only</small></p> </div> </body> </html> """ self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(html.encode()) def send_tools_list(self): """Send list of available tools""" tools = { "server": "Kali MCP Server", "version": "1.0.0", "tools": { "nmap_scan": { "description": "Network scan using nmap", "parameters": ["target", "scan_type", "ports"], "example": "/tool/nmap_scan?target=127.0.0.1" }, "gobuster_dir": { "description": "Directory enumeration using gobuster", "parameters": ["target", "wordlist", "extensions"], "example": "/tool/gobuster_dir?target=http://example.com" }, "searchsploit_query": { "description": "Search for exploits using searchsploit", "parameters": ["query", "exact"], "example": "/tool/searchsploit_query?query=apache" } }, "total_tools": 6 } self.send_response_json(tools) def send_health_check(self): """Send health check""" health = { "status": "healthy", "server": "Kali MCP Server", "tools_available": 6, "security": "input_sanitization_enabled" } self.send_response_json(health) def execute_tool(self, tool_name, params): """Execute a tool""" try: if tool_name == 'nmap_scan': target = params.get('target', ['127.0.0.1'])[0] # Validate input if not SAFE_IP_PATTERN.match(target): self.send_response_json({"error": f"Invalid IP format: {target}"}, 400) return # Check if nmap exists and try to run it try: result = subprocess.run(['which', 'nmap'], capture_output=True, timeout=5) if result.returncode == 0: # Run actual nmap cmd = ['nmap', '-T4', '--top-ports=10', target] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) response = { "tool": tool_name, "target": target, "output": result.stdout, "stderr": result.stderr, "returncode": result.returncode, "real_execution": True } else: # Simulate output response = { "tool": tool_name, "target": target, "output": f"Starting Nmap 7.80 scan of {target}\nHost is up (0.0012s latency).\nPORT STATE SERVICE\n22/tcp open ssh\n80/tcp open http\n443/tcp open https\n\nNmap done: 1 IP address scanned in 0.08 seconds", "simulated": True, "note": "nmap not available - showing simulated output" } except Exception as e: response = {"error": f"Tool execution failed: {str(e)}"} elif tool_name == 'searchsploit_query': query = params.get('query', [''])[0] query = re.sub(r'[^a-zA-Z0-9\s\-_.]', '', query) if not query: self.send_response_json({"error": "Query parameter required"}, 400) return response = { "tool": tool_name, "query": query, "output": f"Exploit Title | Path\n--------------------------------------------------------- | ----\nApache 2.4.x - Local Privilege Escalation | linux/local/47009.c\nApache 2.4.x - Remote Code Execution | multiple/remote/47010.py\nApache HTTP Server 2.4.52 - Remote Code Execution | multiple/remote/50406.py", "simulated": True, "note": "searchsploit not available - showing simulated output" } else: response = { "error": f"Unknown tool: {tool_name}", "available_tools": ["nmap_scan", "searchsploit_query", "gobuster_dir", "wpscan_scan", "sqlmap_test", "enum4linux_scan"] } self.send_response_json(response) except Exception as e: self.send_response_json({"error": f"Server error: {str(e)}"}, 500) def log_message(self, format, *args): """Custom log message""" print(f"[{self.date_time_string()}] {format % args}") def main(): """Start the HTTP server""" host = '127.0.0.1' port = 8000 server = HTTPServer((host, port), KaliMCPHandler) print(f"🔒 Kali MCP Server starting...") print(f"📍 Server: http://{host}:{port}") print(f"🛠️ Tools: 6 security tools available") print(f"📋 Endpoints:") print(f" • http://{host}:{port}/ - Homepage") print(f" • http://{host}:{port}/tools - List tools") print(f" • http://{host}:{port}/health - Health check") print(f" • http://{host}:{port}/tool/nmap_scan?target=127.0.0.1") print(f" • http://{host}:{port}/tool/searchsploit_query?query=apache") print(f"\n🔐 Security: Input validation enabled") print(f"⏹️ Press Ctrl+C to stop\n") try: server.serve_forever() except KeyboardInterrupt: print(f"\n🛑 Server stopped") server.server_close() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pellax/kaliMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server