Skip to main content
Glama
index.ts20.2 kB
#!/usr/bin/env node import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { CallToolRequestSchema, ListToolsRequestSchema, ErrorCode, McpError, CallToolRequest } from "@modelcontextprotocol/sdk/types.js"; import { CommandExecutor, InteractiveSession, stripAnsiCodes } from "./executor.js"; import { EventEmitter } from 'events'; import { ClientChannel, ConnectConfig, ExecOptions } from 'ssh2'; // 启用调试模式 process.env.DEBUG = 'true'; // 全局日志函数,确保所有日志都通过stderr输出 export const log = { debug: (message: string, ...args: any[]) => { if (process.env.DEBUG === 'true') { console.error(`[DEBUG] ${message}`, ...args); } }, info: (message: string, ...args: any[]) => { console.error(`[INFO] ${message}`, ...args); }, warn: (message: string, ...args: any[]) => { console.error(`[WARN] ${message}`, ...args); }, error: (message: string, ...args: any[]) => { console.error(`[ERROR] ${message}`, ...args); } }; // Kali Linux 渗透测试环境配置 const KALI_CONFIG = { host: "localhost", // 本地Kali主机 port: 2222, // SSH端口,改为2222端口 username: "root", // Kali默认用户名 privateKeyPath: "C:\\Users\\hack004\\.ssh\\kali000" // 私钥文件路径 }; // 实时推送配置 - 默认全局启用 const realtimePusherConfig = { viewerUrl: process.env.REALTIME_VIEWER_URL || 'http://localhost:3000', enabled: true // 默认启用实时推送 }; const commandExecutor = new CommandExecutor(realtimePusherConfig); // 存储活跃的交互式会话 const activeSessions: Map<string, InteractiveSession> = new Map(); // 添加全局变量存储当前命令 (global as any).currentInteractiveCommand = ''; // 创建服务器 function createServer() { const server = new Server( { name: "kali-pentest-mcp-server", version: "0.1.0", }, { capabilities: { tools: {}, }, } ); server.setRequestHandler(ListToolsRequestSchema, async () => { return { tools: [ { name: "execute_command", description: "(无需交互式比如ping 127.0.0.1)在Kali Linux渗透测试环境中执行命令。支持所有Kali Linux内置的安全测试工具和常规Linux命令。", inputSchema: { type: "object", properties: { command: { type: "string", description: "要在Kali Linux环境中执行的命令。可以是任何安全测试、漏洞扫描、密码破解等渗透测试命令。" } }, required: ["command"] } }, { name: "start_interactive_command", description: "(需要交互式比如mysql -u root -p)在Kali Linux环境中启动一个交互式命令,并返回会话ID。交互式命令可以接收用户输入,可以在不close_interactive_command的情况下同时执行execute_command。", inputSchema: { type: "object", properties: { command: { type: "string", description: "要在Kali Linux环境中执行的交互式命令。" } }, required: ["command"] } }, { name: "send_input_to_command", description: "(自行判断是AI输入还是用户手动输入)向正在运行的交互式命令发送用户输入。", inputSchema: { type: "object", properties: { session_id: { type: "string", description: "交互式会话ID。" }, input: { type: "string", description: "发送给命令的输入文本。" }, end_line: { type: "boolean", description: "是否在输入后添加换行符。默认为true。" } }, required: ["session_id", "input"] } }, { name: "get_command_output", description: "获取交互式命令的最新输出。", inputSchema: { type: "object", properties: { session_id: { type: "string", description: "交互式会话ID。" } }, required: ["session_id"] } }, { name: "close_interactive_command", description: "关闭交互式命令会话。", inputSchema: { type: "object", properties: { session_id: { type: "string", description: "交互式会话ID。" } }, required: ["session_id"] } } ] }; }); server.setRequestHandler(CallToolRequestSchema, async (request: CallToolRequest) => { try { const toolName = request.params.name; // 确保已连接 if (!commandExecutor.isConnected) { await commandExecutor.connect({ host: KALI_CONFIG.host, port: KALI_CONFIG.port, username: KALI_CONFIG.username, privateKeyPath: KALI_CONFIG.privateKeyPath }); } // 根据工具名称处理不同的请求 switch (toolName) { // 执行非交互式命令 case "execute_command": { const command = String(request.params.arguments?.command); if (!command) { throw new McpError(ErrorCode.InvalidParams, "命令是必需的"); } const env = {}; const timeout = 30000000; try { log.info(`准备执行命令: ${command}`); // 执行命令,启用实时推送 const result = await commandExecutor.executeCommand(command, { timeout: timeout, env: env as Record<string, string>, enableRealtime: true }); log.info("命令执行成功"); return { content: [{ type: "text", text: `命令输出:\nstdout: ${result.stdout}\nstderr: ${result.stderr}` }] }; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); log.error(`命令执行失败: ${errorMessage}`); throw new McpError( ErrorCode.InternalError, `无法执行Kali Linux命令: ${errorMessage}` ); } } // 启动交互式命令 case "start_interactive_command": { let command = String(request.params.arguments?.command); if (!command) { throw new McpError(ErrorCode.InvalidParams, "命令是必需的"); } // 存储当前命令到全局变量 (global as any).currentInteractiveCommand = command; // 如果是 msfconsole,添加 -q 参数 if (command.trim() === 'msfconsole') { command = 'msfconsole -q'; log.info("检测到 msfconsole,自动添加 -q 参数启动。"); } try { log.info(`准备启动交互式命令: ${command}`); // 显式设置pty选项,特别是对msfconsole这类特殊终端程序 const ptyOptions = { waitForPrompt: true, // 等待提示符后再返回 maxWaitTime: command.includes('msfconsole') ? 3000000 : 30000, // msfconsole等待更长时间(2分钟),其他命令30秒 forcePty: command.includes('msfconsole'), // 为msfconsole强制分配PTY term: "xterm-256color", // 设置终端类型 cols: 100, // 设置列数 rows: 40 // 设置行数 }; // 对于msfconsole特别提示用户可能需要等待较长时间 if (command.includes('msfconsole')) { log.info(`正在启动msfconsole,这可能需要1-2分钟时间,请耐心等待...`); } // 创建交互式会话 const session = await commandExecutor.createInteractiveSession(command, ptyOptions); activeSessions.set(session.sessionId, session); // 显示实时输出信息 log.info(`交互式会话已创建并等待提示符,ID: ${session.sessionId}`); // 添加对msfconsole会话的特别提示 let responseMessage: any = { status: "success", session_id: session.sessionId, initial_output: session.stdout, waiting_for_input: session.isWaitingForInput }; log.info(`输入状态: ${session.isWaitingForInput ? '等待输入' : '不等待输入'}`); return { content: [{ type: "text", text: JSON.stringify(responseMessage) }] }; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); log.error(`创建交互式会话失败: ${errorMessage}`); throw new McpError( ErrorCode.InternalError, `无法创建交互式会话: ${errorMessage}` ); } } // 向命令发送输入 case "send_input_to_command": { const sessionId = String(request.params.arguments?.session_id); const input = String(request.params.arguments?.input); const endLine = request.params.arguments?.end_line !== false; // 默认为true if (!sessionId || input === undefined) { throw new McpError(ErrorCode.InvalidParams, "会话ID和输入是必需的"); } const session = activeSessions.get(sessionId); if (!session) { throw new McpError(ErrorCode.InvalidParams, `找不到会话ID: ${sessionId}`); } try { log.info(`向会话 ${sessionId} 发送输入: ${input}`); // 检查当前命令是否包含msf且输入为exit if ((global as any).currentInteractiveCommand.includes('msf') && input.trim() === 'exit') { log.info(`检测到用户退出msfconsole命令`); (global as any).currentInteractiveCommand = 'exit'; } // 记录输入前的输出长度 const beforeLength = session.stdout.length; // 发送输入,根据需要添加换行符 session.write(endLine ? `${input}\n` : input); // 等待命令执行完成并出现输入提示 const maxWaitTime = 3000000; // 较长等待时间(50分钟) // 使用Promise等待输入状态变为true await new Promise<void>((resolve, reject) => { // 如果已经是等待输入状态,立即解析 if (session.isWaitingForInput) { resolve(); return; } // 等待"waiting-for-input"事件 const waitHandler = () => { clearTimeout(timeoutId); resolve(); }; // 设置超时 const timeoutId = setTimeout(() => { session.removeListener('waiting-for-input', waitHandler); log.info(`等待输入提示超时,已等待${maxWaitTime}毫秒`); // 即使超时,也返回当前状态 resolve(); }, maxWaitTime); // 添加事件监听器 session.once('waiting-for-input', waitHandler); // 添加错误处理 session.once('error', (err) => { clearTimeout(timeoutId); session.removeListener('waiting-for-input', waitHandler); reject(err); }); // 添加关闭处理 session.once('close', () => { clearTimeout(timeoutId); session.removeListener('waiting-for-input', waitHandler); resolve(); // 会话已关闭,直接返回 }); }); // 获取新输出,从上次输出的位置开始 if (!(session as any).lastOutputPosition) { (session as any).lastOutputPosition = beforeLength; } const newOutput = session.stdout.substring((session as any).lastOutputPosition); // 更新上次输出位置 (session as any).lastOutputPosition = session.stdout.length; log.info(`命令执行完成,等待输入状态: ${session.isWaitingForInput}, 新输出长度: ${newOutput.length}`); return { content: [{ type: "text", text: JSON.stringify({ status: "success", new_output: stripAnsiCodes(newOutput), waiting_for_input: session.isWaitingForInput }) }] }; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); log.error(`向会话发送输入失败: ${errorMessage}`); throw new McpError( ErrorCode.InternalError, `无法发送输入到会话: ${errorMessage}` ); } } // 获取命令输出 case "get_command_output": { const sessionId = String(request.params.arguments?.session_id); if (!sessionId) { throw new McpError(ErrorCode.InvalidParams, "会话ID是必需的"); } const session = activeSessions.get(sessionId); if (!session) { throw new McpError(ErrorCode.InvalidParams, `找不到会话ID: ${sessionId}`); } // 增加上次获取输出的时间记录 if (!(session as any).lastOutputFetch) { (session as any).lastOutputFetch = 0; } // 初始化lastOutputPosition(如果还没有的话) if (!(session as any).lastOutputPosition) { (session as any).lastOutputPosition = 0; } // 获取新输出 const newOutput = session.stdout.substring((session as any).lastOutputPosition); const hasNewOutput = newOutput.length > 0; // 更新获取时间和位置 const now = Date.now(); const timeSinceLastFetch = now - (session as any).lastOutputFetch; (session as any).lastOutputFetch = now; (session as any).lastOutputPosition = session.stdout.length; return { content: [{ type: "text", text: JSON.stringify({ status: "success", stdout: stripAnsiCodes(newOutput), // 只返回新的输出 stderr: session.stderr, has_new_output: hasNewOutput, time_since_last_fetch_ms: timeSinceLastFetch, waiting_for_input: session.isWaitingForInput }) }] }; } // 关闭交互式命令 case "close_interactive_command": { const sessionId = String(request.params.arguments?.session_id); if (!sessionId) { throw new McpError(ErrorCode.InvalidParams, "会话ID是必需的"); } const session = activeSessions.get(sessionId); if (!session) { throw new McpError(ErrorCode.InvalidParams, `找不到会话ID: ${sessionId}`); } try { log.info(`关闭会话 ${sessionId}`); session.close(); activeSessions.delete(sessionId); return { content: [{ type: "text", text: JSON.stringify({ status: "success", message: "会话已关闭", final_stdout: session.stdout, final_stderr: session.stderr }) }] }; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); log.error(`关闭会话失败: ${errorMessage}`); throw new McpError( ErrorCode.InternalError, `无法关闭会话: ${errorMessage}` ); } } default: throw new McpError(ErrorCode.MethodNotFound, "未知工具"); } } catch (error) { if (error instanceof McpError) { throw error; } throw new McpError( ErrorCode.InternalError, error instanceof Error ? error.message : String(error) ); } }); return server; } async function main() { try { // 使用标准输入输出 const server = createServer(); // 设置MCP错误处理程序 server.onerror = (error: any) => { log.error(`MCP错误: ${error.message}`); }; // 测试SSH连接 try { log.info("=== 测试与Kali Linux的SSH连接 ==="); log.info(`配置: ${KALI_CONFIG.username}@${KALI_CONFIG.host}:${KALI_CONFIG.port}`); // 连接并测试一个简单命令 await commandExecutor.connect({ host: KALI_CONFIG.host, port: KALI_CONFIG.port, username: KALI_CONFIG.username, privateKeyPath: KALI_CONFIG.privateKeyPath }); const testResult = await commandExecutor.executeCommand("echo '连接测试成功'", { timeout: 10000 }); log.info(`SSH连接测试成功! 输出: ${testResult.stdout}`); // 显示实时推送状态 const realtimeStatus = commandExecutor.getRealtimePusherStatus(); log.info("=== 实时输出查看器状态 ==="); log.info(`实时推送: ${realtimeStatus.enabled ? '已启用' : '已禁用'}`); log.info(`查看器URL: ${realtimeStatus.config.viewerUrl}`); log.info("提示: 启动实时查看器后,所有命令输出将实时显示在浏览器中"); log.info(`=== SSH连接测试完成 ===`); } catch (error) { log.error(`=== SSH连接测试失败 ===`); log.error(`错误: ${error instanceof Error ? error.message : String(error)}`); // 提供排查建议 log.error("排查建议:"); log.error("1. 确认SSH服务已启动 - 在Kali终端中运行: sudo service ssh start"); log.error("2. 检查连接信息 - 主机、端口、用户名和私钥路径"); log.error("3. 尝试手动SSH连接 - 运行: ssh root@localhost -p 2222"); log.error("4. 检查Kali Linux中的SSH配置 - /etc/ssh/sshd_config"); log.error(`=== 继续启动服务器,但命令可能无法执行 ===`); } const transport = new StdioServerTransport(); await server.connect(transport); log.info("Kali Linux渗透测试MCP服务器运行在stdio上"); // 处理进程退出 process.on('SIGINT', async () => { log.info("关闭服务器..."); // 关闭所有活跃会话 for (const [sessionId, session] of activeSessions.entries()) { log.info(`关闭会话 ${sessionId}`); session.close(); } await commandExecutor.disconnect(); process.exit(0); }); } catch (error) { log.error("服务器错误:", error); process.exit(1); } } main().catch((error) => { log.error("服务器错误:", error); process.exit(1); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sfz009900/kalilinuxmcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server