demo_server.pyā¢7.36 kB
#!/usr/bin/env python3
"""
Demo Kali MCP Server - Simplified version without external dependencies
Shows the structure and simulates tool execution
"""
import asyncio
import json
import logging
import os
import re
import subprocess
import sys
from typing import Dict, List, Optional, Any
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Security patterns (same as full server)
SAFE_IP_PATTERN = re.compile(r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:/(?:3[0-2]|[12]?[0-9]))?$')
SAFE_DOMAIN_PATTERN = re.compile(r'^[a-zA-Z0-9.-]+$')
class KaliMCPDemo:
def __init__(self):
self.tools = {}
self.register_tools()
def register_tools(self):
"""Register available tools"""
self.tools = {
'nmap_scan': {
'description': 'Network scan using nmap',
'parameters': ['target', 'scan_type', 'ports']
},
'gobuster_dir': {
'description': 'Directory enumeration using gobuster',
'parameters': ['target', 'wordlist', 'extensions']
},
'wpscan_scan': {
'description': 'WordPress security scan using WPScan',
'parameters': ['target', 'enumerate', 'api_token']
},
'sqlmap_test': {
'description': 'SQL injection testing using sqlmap',
'parameters': ['target', 'parameter', 'risk', 'level']
},
'enum4linux_scan': {
'description': 'SMB/NetBIOS enumeration using enum4linux',
'parameters': ['target', 'scan_type']
},
'searchsploit_query': {
'description': 'Search for exploits using searchsploit',
'parameters': ['query', 'exact', 'case_sensitive']
}
}
def sanitize_input(self, value: str, pattern: re.Pattern) -> str:
"""Sanitize input against pattern"""
if not isinstance(value, str):
raise ValueError("Input must be a string")
if not pattern.match(value):
raise ValueError(f"Invalid input format: {value}")
return value
async def execute_tool(self, tool_name: str, **kwargs) -> Dict[str, Any]:
"""Simulate tool execution"""
if tool_name not in self.tools:
return {"error": f"Unknown tool: {tool_name}"}
logger.info(f"Executing {tool_name} with parameters: {kwargs}")
# Simulate different tool outputs
if tool_name == 'nmap_scan':
target = kwargs.get('target', '127.0.0.1')
if not SAFE_IP_PATTERN.match(target):
return {"error": f"Invalid IP format: {target}"}
# Check if nmap is actually available
try:
result = subprocess.run(['which', 'nmap'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
# Run actual nmap command
cmd = ['nmap', '-T4', '--top-ports=10', target]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return {
"tool": tool_name,
"target": target,
"output": result.stdout,
"error": result.stderr if result.stderr else None,
"returncode": result.returncode
}
else:
return {
"tool": tool_name,
"target": target,
"output": f"SIMULATED: Nmap scan of {target}\nHost is up (0.0010s latency)\nPORT STATE SERVICE\n22/tcp open ssh\n80/tcp open http\n443/tcp open https",
"note": "nmap not found - showing simulated output"
}
except Exception as e:
return {"error": f"Tool execution failed: {str(e)}"}
elif tool_name == 'searchsploit_query':
query = kwargs.get('query', '')
query = re.sub(r'[^a-zA-Z0-9\s\-_.]', '', query)
try:
result = subprocess.run(['which', 'searchsploit'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
cmd = ['searchsploit', query]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
return {
"tool": tool_name,
"query": query,
"output": result.stdout,
"error": result.stderr if result.stderr else None
}
else:
return {
"tool": tool_name,
"query": query,
"output": f"SIMULATED: SearchSploit results for '{query}'\n----------------------------------------\nApache 2.4.x - Local Privilege Escalation | exploits/linux/local/47009.c\nApache 2.4.x - Remote Code Execution | exploits/multiple/remote/47010.py",
"note": "searchsploit not found - showing simulated output"
}
except Exception as e:
return {"error": f"Tool execution failed: {str(e)}"}
else:
return {
"tool": tool_name,
"output": f"SIMULATED: {tool_name} execution with parameters: {kwargs}",
"note": "This is a demo - actual tool execution would happen here"
}
def list_tools(self):
"""List available tools"""
return {
"server": "Kali MCP Demo Server",
"version": "1.0.0",
"tools": self.tools,
"total_tools": len(self.tools)
}
async def interactive_demo():
"""Run interactive demo"""
server = KaliMCPDemo()
print("š Kali Linux MCP Server - Demo Mode")
print("=" * 50)
print(f"Available tools: {len(server.tools)}")
for tool_name, info in server.tools.items():
print(f" ⢠{tool_name}: {info['description']}")
print("\n" + "=" * 50)
print("Demo Executions:")
print("-" * 20)
# Demo nmap scan
print("\n1. Testing nmap_scan:")
result = await server.execute_tool('nmap_scan', target='127.0.0.1', scan_type='basic')
print(json.dumps(result, indent=2))
# Demo searchsploit
print("\n2. Testing searchsploit_query:")
result = await server.execute_tool('searchsploit_query', query='apache')
print(json.dumps(result, indent=2))
# Demo invalid input
print("\n3. Testing input validation:")
result = await server.execute_tool('nmap_scan', target='invalid; rm -rf /')
print(json.dumps(result, indent=2))
print("\n" + "=" * 50)
print("ā Demo complete!")
print("\nTo run the full MCP server:")
print("1. Install dependencies: pip install fastmcp pydantic")
print("2. Run: python3 -m kali_mcp_server.server")
def main():
"""Main entry point"""
try:
asyncio.run(interactive_demo())
except KeyboardInterrupt:
print("\n\nDemo stopped by user")
sys.exit(0)
if __name__ == "__main__":
main()