Terminal MCP Server
by weidwonder
Verified
import { Client } from 'ssh2';
import * as fs from 'fs';
import * as path from 'path';
import * as os from 'os';
import { exec } from 'child_process';
import { log } from './index.js';
export class CommandExecutor {
private sessions: Map<string, {
client: Client | null;
connection: Promise<void> | null;
timeout: NodeJS.Timeout | null;
host?: string;
env?: Record<string, string>; // 添加环境变量存储
shell?: any; // 添加shell会话
shellReady?: boolean; // shell是否准备好
}> = new Map();
private sessionTimeout: number = 20 * 60 * 1000; // 20 minutes
constructor() {}
private getSessionKey(host: string | undefined, sessionName: string): string {
return `${host || 'local'}-${sessionName}`;
}
async connect(host: string, username: string, sessionName: string = 'default'): Promise<void> {
const sessionKey = this.getSessionKey(host, sessionName);
const session = this.sessions.get(sessionKey);
// 如果会话存在且连接有效,直接返回现有连接
if (session?.connection && session?.client) {
// 检查客户端是否仍然连接
if (session.client.listenerCount('ready') > 0 || session.client.listenerCount('data') > 0) {
log.info(`Reusing existing session: ${sessionKey}`);
return session.connection;
}
// 如果客户端已断开连接,清理旧会话
log.info(`Session ${sessionKey} disconnected, creating new session`);
this.sessions.delete(sessionKey);
}
try {
const privateKey = fs.readFileSync(path.join(os.homedir(), '.ssh', 'id_rsa'));
const client = new Client();
const connection = new Promise<void>((resolve, reject) => {
client
.on('ready', () => {
log.info(`Session ${sessionKey} connected`);
this.resetTimeout(sessionKey);
// 创建一个交互式shell
client.shell((err, stream) => {
if (err) {
log.error(`Failed to create interactive shell: ${err.message}`);
reject(err);
return;
}
log.info(`Creating interactive shell for session ${sessionKey}`);
// 获取会话对象
const sessionData = this.sessions.get(sessionKey);
if (sessionData) {
// 设置shell和shellReady标志
sessionData.shell = stream;
sessionData.shellReady = true;
// 更新会话
this.sessions.set(sessionKey, sessionData);
}
// 处理shell关闭事件
stream.on('close', () => {
log.info(`Interactive shell for session ${sessionKey} closed`);
const sessionData = this.sessions.get(sessionKey);
if (sessionData) {
sessionData.shellReady = false;
this.sessions.set(sessionKey, sessionData);
}
});
// 等待shell准备好
stream.write('echo "Shell ready"\n');
// 解析promise
resolve();
});
})
.on('error', (err) => {
log.error(`会话 ${sessionKey} 连接错误:`, err.message);
reject(err);
})
.connect({
host: host,
username: username,
privateKey: privateKey,
keepaliveInterval: 60000, // 每分钟发送一次keepalive包
});
});
log.info(`Creating new session: ${sessionKey}`);
this.sessions.set(sessionKey, {
client,
connection,
timeout: null,
host,
shell: null,
shellReady: false
});
return connection;
} catch (error) {
if (error instanceof Error && error.message.includes('ENOENT')) {
throw new Error('SSH key file does not exist, please ensure SSH key-based authentication is set up');
}
throw error;
}
}
private resetTimeout(sessionKey: string): void {
const session = this.sessions.get(sessionKey);
if (!session) return;
if (session.timeout) {
clearTimeout(session.timeout);
}
session.timeout = setTimeout(async () => {
log.info(`Session ${sessionKey} timeout, disconnecting`);
await this.disconnectSession(sessionKey);
}, this.sessionTimeout);
this.sessions.set(sessionKey, session);
}
async executeCommand(
command: string,
options: {
host?: string;
username?: string;
session?: string;
env?: Record<string, string>;
} = {}
): Promise<{stdout: string; stderr: string}> {
const { host, username, session = 'default', env = {} } = options;
const sessionKey = this.getSessionKey(host, session);
// 如果指定了host,则使用SSH执行命令
if (host) {
if (!username) {
throw new Error('Username is required when using SSH');
}
let sessionData = this.sessions.get(sessionKey);
// 检查会话是否存在且有效
let needNewConnection = false;
if (!sessionData || sessionData.host !== host) {
needNewConnection = true;
} else if (sessionData.client) {
// 检查客户端是否仍然连接
if (sessionData.client.listenerCount('ready') === 0 && sessionData.client.listenerCount('data') === 0) {
log.info(`Session ${sessionKey} disconnected, reconnecting`);
needNewConnection = true;
}
} else {
needNewConnection = true;
}
// 如果需要新连接,则创建
if (needNewConnection) {
log.info(`Creating new connection for command execution: ${sessionKey}`);
await this.connect(host, username, session);
sessionData = this.sessions.get(sessionKey);
} else {
log.info(`Reusing existing session for command execution: ${sessionKey}`);
}
if (!sessionData || !sessionData.client) {
throw new Error(`无法创建到 ${host} 的SSH会话`);
}
this.resetTimeout(sessionKey);
// 检查是否有交互式shell可用
if (sessionData.shellReady && sessionData.shell) {
log.info(`Executing command using interactive shell: ${command}`);
return new Promise((resolve, reject) => {
let stdout = "";
let stderr = "";
let commandFinished = false;
const uniqueMarker = `CMD_END_${Date.now()}_${Math.random().toString(36).substring(2, 15)}`;
// 构建环境变量设置命令
const envSetup = Object.entries(env)
.map(([key, value]) => `export ${key}="${String(value).replace(/"/g, '\\"')}"`)
.join(' && ');
// 如果有环境变量,先设置环境变量,再执行命令
const fullCommand = envSetup ? `${envSetup} && ${command}` : command;
// 添加数据处理器
const dataHandler = (data: Buffer) => {
const str = data.toString();
log.debug(`Shell数据: ${str}`);
if (str.includes(uniqueMarker)) {
// 命令执行完成
commandFinished = true;
// 提取命令输出(从命令开始到标记之前的内容)
const lines = stdout.split('\n');
let commandOutput = '';
let foundCommand = false;
for (const line of lines) {
if (foundCommand) {
if (line.includes(uniqueMarker)) {
break;
}
commandOutput += line + '\n';
} else if (line.includes(fullCommand)) {
foundCommand = true;
}
}
// 解析输出
resolve({ stdout: commandOutput.trim(), stderr });
// 移除处理器
sessionData.shell.removeListener('data', dataHandler);
clearTimeout(timeout);
} else if (!commandFinished) {
stdout += str;
}
};
// 添加错误处理器
const errorHandler = (err: Error) => {
stderr += err.message;
reject(err);
sessionData.shell.removeListener('data', dataHandler);
sessionData.shell.removeListener('error', errorHandler);
};
// 监听数据和错误
sessionData.shell.on('data', dataHandler);
sessionData.shell.on('error', errorHandler);
// 执行命令并添加唯一标记
// 使用一个更明确的方式来执行命令和捕获输出
sessionData.shell.write(`echo "Starting command execution: ${fullCommand}"\n`);
sessionData.shell.write(`${fullCommand}\n`);
sessionData.shell.write(`echo "${uniqueMarker}"\n`);
// 设置超时
const timeout = setTimeout(() => {
if (!commandFinished) {
stderr += "Command execution timed out";
resolve({ stdout, stderr });
sessionData.shell.removeListener('data', dataHandler);
sessionData.shell.removeListener('error', errorHandler);
}
}, 30000); // 30秒超时
});
} else {
log.info(`Executing command using exec: ${command}`);
return new Promise((resolve, reject) => {
// 构建环境变量设置命令
const envSetup = Object.entries(env)
.map(([key, value]) => `export ${key}="${String(value).replace(/"/g, '\\"')}"`)
.join(' && ');
// 如果有环境变量,先设置环境变量,再执行命令
const fullCommand = envSetup ? `${envSetup} && ${command}` : command;
sessionData?.client?.exec(`/bin/bash --login -c "${fullCommand.replace(/"/g, '\\"')}"`, (err, stream) => {
if (err) {
reject(err);
return;
}
let stdout = "";
let stderr = '';
stream
.on("data", (data: Buffer) => {
this.resetTimeout(sessionKey);
stdout += data.toString();
})
.stderr.on('data', (data: Buffer) => {
stderr += data.toString();
})
.on('close', () => {
resolve({ stdout, stderr });
})
.on('error', (err) => {
reject(err);
});
});
});
}
}
// 否则在本地执行命令
else {
// 在本地执行命令时,也使用会话机制来保持环境变量
log.info(`Executing command using local session: ${sessionKey}`);
// 检查是否已有本地会话
let sessionData = this.sessions.get(sessionKey);
let sessionEnv = {};
if (!sessionData) {
// 为本地会话创建一个空条目,以便跟踪超时
sessionData = {
client: null,
connection: null,
timeout: null,
host: undefined,
env: { ...env } // 保存初始环境变量
};
this.sessions.set(sessionKey, sessionData);
log.info(`Creating new local session: ${sessionKey}`);
sessionEnv = env;
} else {
log.info(`Reusing existing local session: ${sessionKey}`);
// 合并现有会话环境变量和新的环境变量
if (!sessionData.env) {
sessionData.env = {};
}
sessionData.env = { ...sessionData.env, ...env };
sessionEnv = sessionData.env;
// 更新会话
this.sessions.set(sessionKey, sessionData);
}
this.resetTimeout(sessionKey);
return new Promise((resolve, reject) => {
// 构建环境变量,优先级:系统环境变量 < 会话环境变量 < 当前命令环境变量
const envVars = { ...process.env, ...sessionEnv };
// 执行命令
log.info(`Executing local command: ${command}`);
exec(command, { env: envVars }, (error, stdout, stderr) => {
if (error && error.code !== 0) {
// 我们不直接拒绝,而是返回错误信息作为stderr
resolve({ stdout, stderr: stderr || error.message });
} else {
resolve({ stdout, stderr });
}
});
});
}
}
private async disconnectSession(sessionKey: string): Promise<void> {
const session = this.sessions.get(sessionKey);
if (session) {
if (session.shell) {
log.info(`Closing interactive shell for session ${sessionKey}`);
session.shell.end();
session.shellReady = false;
}
if (session.client) {
log.info(`Disconnecting SSH connection for session ${sessionKey}`);
session.client.end();
}
if (session.timeout) {
clearTimeout(session.timeout);
}
log.info(`Disconnecting session: ${sessionKey}`);
this.sessions.delete(sessionKey);
}
}
async disconnect(): Promise<void> {
const disconnectPromises = Array.from(this.sessions.keys()).map(
sessionKey => this.disconnectSession(sessionKey)
);
await Promise.all(disconnectPromises);
this.sessions.clear();
}
}