Skip to main content
Glama

MCP Toolkit

by zxfgds
index.ts7.5 kB
import { ToolDefinition, ToolResponse, createSuccessResponse, createErrorResponse } from '../types.js'; import { Config } from '../../config/config.js'; import * as fs from 'fs/promises'; import * as path from 'path'; // 文件系统管理器类 class FilesystemManager { constructor( private config: Config ) {} // 目录操作 async createDirectory(args: { path: string }): Promise<ToolResponse> { try { const fullPath = path.join(process.cwd(), args.path); await fs.mkdir(fullPath, { recursive: true }); return createSuccessResponse({ type: 'text', text: `目录创建成功: ${fullPath}` }); } catch (error) { return createErrorResponse(error instanceof Error ? error.message : String(error)); } } async deleteDirectory(args: { path: string; recursive?: boolean }): Promise<ToolResponse> { try { const fullPath = path.join(process.cwd(), args.path); await fs.rm(fullPath, { recursive: args.recursive || false, force: true }); return createSuccessResponse({ type: 'text', text: `目录删除成功: ${fullPath}` }); } catch (error) { return createErrorResponse(error instanceof Error ? error.message : String(error)); } } // 文件操作 async writeFile(args: { path: string; content: string }): Promise<ToolResponse> { try { const fullPath = path.join(process.cwd(), args.path); // 确保目录存在 await fs.mkdir(path.dirname(fullPath), { recursive: true }); await fs.writeFile(fullPath, args.content, 'utf8'); return createSuccessResponse({ type: 'text', text: `文件写入成功: ${fullPath}` }); } catch (error) { return createErrorResponse(error instanceof Error ? error.message : String(error)); } } async readFile(args: { path: string }): Promise<ToolResponse> { try { const fullPath = path.join(process.cwd(), args.path); const content = await fs.readFile(fullPath, 'utf8'); return createSuccessResponse([{ type: 'text', text: content }]); } catch (error) { return createErrorResponse(error instanceof Error ? error.message : String(error)); } } async replaceInFile(args: { path: string; diff: string }): Promise<ToolResponse> { try { const fullPath = path.join(process.cwd(), args.path); // 读取文件内容 const content = await fs.readFile(fullPath, 'utf8'); // 解析差异块 const diffBlocks = args.diff.split(/<<<<<<< SEARCH\n/g).slice(1); let newContent = content; for (const block of diffBlocks) { const [search, replace] = block.split(/=======\n/); const replaceContent = replace.split(/>>>>>>> REPLACE\n/)[0]; // 替换内容 newContent = newContent.replace(search, replaceContent); } // 写入文件 await fs.writeFile(fullPath, newContent, 'utf8'); return createSuccessResponse({ type: 'text', text: `文件内容已替换: ${fullPath}` }); } catch (error) { return createErrorResponse(error instanceof Error ? error.message : String(error)); } } async deleteFile(args: { path: string }): Promise<ToolResponse> { try { const fullPath = path.join(process.cwd(), args.path); await fs.unlink(fullPath); return createSuccessResponse({ type: 'text', text: `文件删除成功: ${fullPath}` }); } catch (error) { return createErrorResponse(error instanceof Error ? error.message : String(error)); } } // 路径操作 async setBasePath(args: { path: string }): Promise<ToolResponse> { try { const fullPath = path.resolve(args.path); // 确保目录存在 await fs.mkdir(fullPath, { recursive: true }); process.chdir(fullPath); return createSuccessResponse({ type: 'path_result', data: { path: fullPath, operation: 'set_base_path' } }); } catch (error) { return createErrorResponse(error instanceof Error ? error.message : String(error)); } } async getCurrentPath(): Promise<ToolResponse> { try { return createSuccessResponse({ type: 'path_result', data: { path: process.cwd(), operation: 'get_current_path' } }); } catch (error) { return createErrorResponse(error instanceof Error ? error.message : String(error)); } } } // 工具定义和处理器映射 export function createFilesystemTools( config: Config ): ToolDefinition[] { const manager = new FilesystemManager(config); return [ { name: 'create_directory', description: '创建目录', inputSchema: { type: 'object', properties: { path: { type: 'string', description: '要创建的目录路径(相对于当前工作目录,或绝对路径)' } }, required: ['path'] }, handler: args => manager.createDirectory(args) }, { name: 'write_file', description: '写入文件', inputSchema: { type: 'object', properties: { path: { type: 'string', description: '文件路径(相对于当前工作目录,或绝对路径)' }, content: { type: 'string', description: '文件内容' } }, required: ['path', 'content'] }, handler: args => manager.writeFile(args) }, { name: 'read_file', description: '读取文件', inputSchema: { type: 'object', properties: { path: { type: 'string', description: '文件路径(相对于当前工作目录,或绝对路径)' } }, required: ['path'] }, handler: args => manager.readFile(args) }, { name: 'replace_in_file', description: '替换文件内容(支持多块替换)', inputSchema: { type: 'object', properties: { path: { type: 'string', description: '文件路径(相对于当前工作目录,或绝对路径)' }, diff: { type: 'string', description: '替换内容,格式为多个 SEARCH/REPLACE 块' } }, required: ['path', 'diff'] }, handler: args => manager.replaceInFile(args) }, { name: 'delete_file', description: '删除文件', inputSchema: { type: 'object', properties: { path: { type: 'string', description: '要删除的文件路径(相对于当前工作目录,或绝对路径)' } }, required: ['path'] }, handler: args => manager.deleteFile(args) }, { name: 'delete_directory', description: '删除目录', inputSchema: { type: 'object', properties: { path: { type: 'string', description: '要删除的目录路径(相对于当前工作目录,或绝对路径)' }, recursive: { type: 'boolean', description: '是否递归删除' } }, required: ['path'] }, handler: args => manager.deleteDirectory(args) } ]; }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/zxfgds/mcp-toolkit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server