Skip to main content
Glama

Kali Linux MCP Server

by pellax
demo_server.py•7.36 kB
#!/usr/bin/env python3 """ Demo Kali MCP Server - Simplified version without external dependencies Shows the structure and simulates tool execution """ import asyncio import json import logging import os import re import subprocess import sys from typing import Dict, List, Optional, Any # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Security patterns (same as full server) SAFE_IP_PATTERN = re.compile(r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:/(?:3[0-2]|[12]?[0-9]))?$') SAFE_DOMAIN_PATTERN = re.compile(r'^[a-zA-Z0-9.-]+$') class KaliMCPDemo: def __init__(self): self.tools = {} self.register_tools() def register_tools(self): """Register available tools""" self.tools = { 'nmap_scan': { 'description': 'Network scan using nmap', 'parameters': ['target', 'scan_type', 'ports'] }, 'gobuster_dir': { 'description': 'Directory enumeration using gobuster', 'parameters': ['target', 'wordlist', 'extensions'] }, 'wpscan_scan': { 'description': 'WordPress security scan using WPScan', 'parameters': ['target', 'enumerate', 'api_token'] }, 'sqlmap_test': { 'description': 'SQL injection testing using sqlmap', 'parameters': ['target', 'parameter', 'risk', 'level'] }, 'enum4linux_scan': { 'description': 'SMB/NetBIOS enumeration using enum4linux', 'parameters': ['target', 'scan_type'] }, 'searchsploit_query': { 'description': 'Search for exploits using searchsploit', 'parameters': ['query', 'exact', 'case_sensitive'] } } def sanitize_input(self, value: str, pattern: re.Pattern) -> str: """Sanitize input against pattern""" if not isinstance(value, str): raise ValueError("Input must be a string") if not pattern.match(value): raise ValueError(f"Invalid input format: {value}") return value async def execute_tool(self, tool_name: str, **kwargs) -> Dict[str, Any]: """Simulate tool execution""" if tool_name not in self.tools: return {"error": f"Unknown tool: {tool_name}"} logger.info(f"Executing {tool_name} with parameters: {kwargs}") # Simulate different tool outputs if tool_name == 'nmap_scan': target = kwargs.get('target', '127.0.0.1') if not SAFE_IP_PATTERN.match(target): return {"error": f"Invalid IP format: {target}"} # Check if nmap is actually available try: result = subprocess.run(['which', 'nmap'], capture_output=True, text=True, timeout=5) if result.returncode == 0: # Run actual nmap command cmd = ['nmap', '-T4', '--top-ports=10', target] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) return { "tool": tool_name, "target": target, "output": result.stdout, "error": result.stderr if result.stderr else None, "returncode": result.returncode } else: return { "tool": tool_name, "target": target, "output": f"SIMULATED: Nmap scan of {target}\nHost is up (0.0010s latency)\nPORT STATE SERVICE\n22/tcp open ssh\n80/tcp open http\n443/tcp open https", "note": "nmap not found - showing simulated output" } except Exception as e: return {"error": f"Tool execution failed: {str(e)}"} elif tool_name == 'searchsploit_query': query = kwargs.get('query', '') query = re.sub(r'[^a-zA-Z0-9\s\-_.]', '', query) try: result = subprocess.run(['which', 'searchsploit'], capture_output=True, text=True, timeout=5) if result.returncode == 0: cmd = ['searchsploit', query] result = subprocess.run(cmd, capture_output=True, text=True, timeout=15) return { "tool": tool_name, "query": query, "output": result.stdout, "error": result.stderr if result.stderr else None } else: return { "tool": tool_name, "query": query, "output": f"SIMULATED: SearchSploit results for '{query}'\n----------------------------------------\nApache 2.4.x - Local Privilege Escalation | exploits/linux/local/47009.c\nApache 2.4.x - Remote Code Execution | exploits/multiple/remote/47010.py", "note": "searchsploit not found - showing simulated output" } except Exception as e: return {"error": f"Tool execution failed: {str(e)}"} else: return { "tool": tool_name, "output": f"SIMULATED: {tool_name} execution with parameters: {kwargs}", "note": "This is a demo - actual tool execution would happen here" } def list_tools(self): """List available tools""" return { "server": "Kali MCP Demo Server", "version": "1.0.0", "tools": self.tools, "total_tools": len(self.tools) } async def interactive_demo(): """Run interactive demo""" server = KaliMCPDemo() print("šŸ”’ Kali Linux MCP Server - Demo Mode") print("=" * 50) print(f"Available tools: {len(server.tools)}") for tool_name, info in server.tools.items(): print(f" • {tool_name}: {info['description']}") print("\n" + "=" * 50) print("Demo Executions:") print("-" * 20) # Demo nmap scan print("\n1. Testing nmap_scan:") result = await server.execute_tool('nmap_scan', target='127.0.0.1', scan_type='basic') print(json.dumps(result, indent=2)) # Demo searchsploit print("\n2. Testing searchsploit_query:") result = await server.execute_tool('searchsploit_query', query='apache') print(json.dumps(result, indent=2)) # Demo invalid input print("\n3. Testing input validation:") result = await server.execute_tool('nmap_scan', target='invalid; rm -rf /') print(json.dumps(result, indent=2)) print("\n" + "=" * 50) print("āœ“ Demo complete!") print("\nTo run the full MCP server:") print("1. Install dependencies: pip install fastmcp pydantic") print("2. Run: python3 -m kali_mcp_server.server") def main(): """Main entry point""" try: asyncio.run(interactive_demo()) except KeyboardInterrupt: print("\n\nDemo stopped by user") sys.exit(0) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pellax/kaliMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server