Skip to main content
Glama
index.ts.broken26.3 kB
import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { CallToolRequestSchema, GetPromptRequestSchema, ListPromptsRequestSchema, ListResourcesRequestSchema, ListToolsRequestSchema, ReadResourceRequestSchema, } from "@modelcontextprotocol/sdk/types.js"; import type { Snapshot } from "@snapback/contracts"; import { DependencyAnalyzer, Guardian, MCPClientManager } from "@snapback/core"; import { SnapBackEvent, SnapBackEventBus } from "@snapback/events"; import { z } from "zod"; import { SnapBackAPIClient } from "./client/snapback-api.js"; import { AnalyzeSuggestionSchema, CheckIterationSafetySchema, CreateSnapshotSchema, validateFilePath, } from "./utils/security.js"; // Performance tracker for monitoring operation times class PerformanceTracker { static async track<T>(operation: string, fn: () => Promise<T>): Promise<T> { const start = Date.now(); try { return await fn(); } finally { const duration = Date.now() - start; console.error(`[PERF] ${operation}: ${duration}ms`); // Alert if exceeds budget (example budgets) const PERFORMANCE_BUDGETS: Record<string, number> = { analyze_suggestion: 200, check_iteration_safety: 100, create_snapshot: 500, }; if (duration > (PERFORMANCE_BUDGETS[operation] || 1000)) { console.warn( `[PERF] ⚠️ ${operation} exceeded budget: ${duration}ms > ${PERFORMANCE_BUDGETS[operation] || 1000}ms`, ); } } } } // Export storage factory for testing export function createStorage(testStorage?: any) { // If testStorage is provided, return it instead of creating a new one if (testStorage) { return testStorage; } return { create: async (options?: { description?: string; protected?: boolean }) => { return { id: `snap-${Date.now()}`, timestamp: Date.now(), meta: options || {}, } as Snapshot; }, retrieve: async () => { return null; }, list: async () => { return []; }, restore: async (_id: string, _targetPath: string, _options?: any) => { return { success: true, restoredFiles: [], errors: [], }; }, }; } // Create API client factory for testing export function createAPIClient(): SnapBackAPIClient { // Validate required environment variables const apiUrl = process.env.SNAPBACK_API_URL; const apiKey = process.env.SNAPBACK_API_KEY; // Use defaults in development/test mode const isDevelopment = process.env.NODE_ENV === "development" || process.env.NODE_ENV === "test"; if (!apiUrl) { if (isDevelopment) { console.warn("[SnapBack MCP] Warning: SNAPBACK_API_URL not set, using default: http://localhost:3000"); // Fall through to use default } else { throw new Error("Required configuration missing: SNAPBACK_API_URL must be set"); } } if (!apiKey) { if (isDevelopment) { console.warn("[SnapBack MCP] Warning: SNAPBACK_API_KEY not set, using empty string"); // Fall through to use empty string } else { throw new Error("Required configuration missing: SNAPBACK_API_KEY must be set"); } } // Validate API key format (skip in development) if (!isDevelopment && apiKey) { const API_KEY_MIN_LENGTH = 32; const API_KEY_PATTERN = /^[A-Za-z0-9_-]{32,}$/; if (!API_KEY_PATTERN.test(apiKey) || apiKey.length < API_KEY_MIN_LENGTH) { throw new Error("Invalid API key format"); } } return new SnapBackAPIClient({ baseUrl: apiUrl || "http://localhost:3000", apiKey: apiKey || "", }); } // Error handler for sanitizing error messages class ErrorHandler { static sanitize( error: unknown, context: string, ): { message: string; code: string; logId: string; } { // In test/development mode, provide more detailed errors const isDevelopment = process.env.NODE_ENV === "development" || process.env.NODE_ENV === "test"; const logId = `ERR-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; // Log full details internally console.error(`[Error ${logId}] ${context}:`, error); // Return more detailed message in development, generic in production if (isDevelopment) { const errorMessage = error instanceof Error ? error.message : String(error); return { message: errorMessage, code: "INTERNAL_ERROR", logId, }; } // Return generic message to client return { message: "An internal error occurred. Contact support with log ID.", code: "INTERNAL_ERROR", logId, }; } } // Export a function to create and start the server export async function startServer(apiClient?: SnapBackAPIClient, testStorage?: any) { // In a real implementation, this would be injected or created through a factory // Mock implementation - replace with actual storage in production const storage = createStorage(testStorage); const guardian = new Guardian(); const dep = new DependencyAnalyzer(); const mcpManager = new MCPClientManager(); // Connect to external MCP servers from config (non-blocking) try { await mcpManager.connectFromConfig(); } catch (err) { console.error("[SnapBack MCP] External MCP connection failed:", err); } // Create SnapBack API client if not provided const client = apiClient || createAPIClient(); // Initialize event bus const eventBus = new SnapBackEventBus(); try { await eventBus.connect(); console.error("[SnapBack MCP] Connected to event bus"); } catch (err) { console.error("[SnapBack MCP] Failed to connect to event bus:", err); } const server = new Server( { name: "snapback-mcp-server", version: "1.0.0" }, { capabilities: { tools: {}, resources: {}, prompts: {}, }, }, ); // Register tools listing server.setRequestHandler(ListToolsRequestSchema, async () => ({ tools: [ { name: "analyze_suggestion", description: `**Purpose:** Analyze AI-generated code for security, performance, and quality risks before applying changes. **When to Use:** - BEFORE accepting code suggestions (proactive safety) - When user asks "is this safe to apply?" - For files with BLOCK protection level (required analysis) - When multiple consecutive AI edits detected (iteration risk) **When NOT to Use:** - After user manually created snapshot (already protected) - For non-code files (images, docs, configs without logic) - When user explicitly says "skip analysis" or "I trust this" **Contextual Integration:** - Checks current file protection level from Extension state - Considers recent edit velocity (5+ consecutive edits = high risk) - Aware of git context (unstaged changes, branch name) **Output:** - ✅ SAFE (green): Apply confidently, auto-snapshot if protected - ⚠️ WARN (yellow): Review carefully, show issues to user - 🚨 BLOCK (red): Do not apply, create manual snapshot first if proceeding **Example Workflow:** 1. User types: "refactor this function to use async/await" 2. LLM generates code suggestion 3. **Call analyze_suggestion** with generated code + file_path 4. Parse response: - If SAFE → Apply + create snapshot (if file protected) - If WARN → Show issues, ask user confirmation - If BLOCK → Stop, require manual snapshot creation **Performance:** < 200ms average analysis time`, inputSchema: { type: "object", properties: { code: { type: "string", description: "The AI-generated code to analyze (max 1MB)", }, file_path: { type: "string", description: "Relative path from workspace root (e.g., 'src/utils/helper.ts')", }, context: { type: "object", description: "Additional analysis context (optional but recommended)", properties: { surrounding_code: { type: "string", description: "Code around the change (helps detect breaking changes)", }, project_type: { type: "string", enum: ["node", "browser", "deno"], description: "Runtime environment for accurate risk assessment", }, language: { type: "string", enum: ["javascript", "typescript", "python"], description: "Programming language for syntax-specific checks", }, }, }, }, required: ["code", "file_path"], }, }, { name: "check_iteration_safety", description: `**Purpose:** Check if continuing with AI suggestions is safe based on iteration count. **When to Use:** - Periodically during AI-assisted coding sessions - When user has made 3+ consecutive AI edits - Before accepting major refactor suggestions - When velocity exceeds safe thresholds **Contextual Integration:** - Tracks consecutive AI edits per file - Monitors edit velocity (edits/minute) - Considers file protection levels - Integrates with Extension's save handler **Output:** - Current iteration count - Risk level (low/medium/high) - Velocity metrics - Actionable recommendation **Performance:** < 100ms average`, inputSchema: { type: "object", properties: { file_path: { type: "string", description: "Path to the file being edited", }, }, required: ["file_path"], }, }, { name: "create_snapshot", description: `**Purpose:** Manually create a code snapshot before making risky changes. **When to Use:** - Before accepting significant AI suggestions - When about to refactor critical files - Before implementing breaking changes - When user explicitly requests snapshot **Contextual Integration:** - Creates snapshot with MCP source tagging - Updates Extension UI in real-time - Integrates with protection levels - Available immediately in restore commands **Output:** - Snapshot ID for later restoration - Confirmation message with timestamp - File list included in snapshot **Performance:** < 500ms average (larger files may take longer)`, inputSchema: { type: "object", properties: { file_path: { type: "string", description: "Path to the file to snapshot", }, reason: { type: "string", description: "Reason for creating snapshot (e.g., 'before major refactor')", }, }, required: ["file_path"], }, }, // Existing SnapBack tools { name: "snapback.analyze_risk", description: "Analyze code changes for potential risks", inputSchema: { type: "object", properties: { changes: { type: "array", items: { type: "object", properties: { added: { type: "boolean" }, removed: { type: "boolean" }, value: { type: "string" }, count: { type: "number" }, }, required: ["value"], }, }, }, required: ["changes"], }, }, { name: "snapback.check_dependencies", description: "Check for dependency-related risks", inputSchema: { type: "object", properties: { before: { type: "object", additionalProperties: true, }, after: { type: "object", additionalProperties: true, }, }, required: ["before", "after"], }, }, { name: "snapback.create_snapshot", description: "Create a code snapshot", inputSchema: { type: "object", properties: { trigger: { type: "string" }, content: { type: "string" }, }, }, }, { name: "snapback.list_snapshots", description: "List available snapshots", inputSchema: { type: "object", properties: {}, }, }, { name: "catalog.list_tools", description: "List available tools from connected MCP servers", inputSchema: { type: "object", properties: {}, }, }, ], })); // Handle tool calls server.setRequestHandler(CallToolRequestSchema, async (request) => { const { name, arguments: args } = request.params; try { // Handle new SnapBack MCP tools if (name === "analyze_suggestion") { // Validate input using Zod schema const validated = AnalyzeSuggestionSchema.parse(args); const { code, file_path, context } = validated; // Additional security validation for file path validateFilePath(file_path); try { // Call SnapBack backend for fast analysis const analysis = await PerformanceTracker.track("analyze_suggestion", () => client.analyzeFast({ code, filePath: file_path, // Handle different context naming conventions context: context ? { surroundingCode: context.surrounding_code || context.surroundingCode, projectType: context.project_type || context.projectType || "node", language: context.language || "javascript", } : undefined, }), ); // Publish event when analysis is completed eventBus.publish(SnapBackEvent.ANALYSIS_COMPLETED, { filePath: file_path, timestamp: Date.now(), riskLevel: analysis.riskLevel, }); // Format response for AI tool using array.join for O(n) complexity const parts = [ `${analysis.riskLevel === "safe" ? "✅" : analysis.riskLevel === "low" ? "🟢" : analysis.riskLevel === "medium" ? "🟡" : analysis.riskLevel === "high" ? "🔴" : "🚨"} Risk Analysis Complete`, "", `Risk Level: ${analysis.riskLevel.toUpperCase()}`, `Recommendation: ${ analysis.riskLevel === "high" || analysis.riskLevel === "critical" ? "BLOCK: Do not apply this suggestion" : analysis.riskLevel === "medium" ? "WARN: Review carefully before applying" : "ALLOW: Safe to apply" }`, `Analysis Time: ${analysis.analysisTimeMs}ms`, "", ]; if (analysis.issues.length > 0) { parts.push("Issues Detected:"); // Limit issues to prevent resource exhaustion const maxIssues = 100; const issuesToShow = analysis.issues.slice(0, maxIssues); for (const [idx, issue] of issuesToShow.entries()) { parts.push(`${idx + 1}. [${issue.severity}] ${issue.message}`); if (issue.line) { const col = issue.column ? `, Column ${issue.column}` : ""; parts.push(` Line ${issue.line}${col}`); } } if (analysis.issues.length > maxIssues) { parts.push(`... and ${analysis.issues.length - maxIssues} more issues`); } } else { parts.push("No issues detected. This change appears safe."); } // Add actionable guidance if (analysis.riskLevel === "high" || analysis.riskLevel === "critical") { parts.push(""); parts.push( "⚠️ RECOMMENDED ACTION: Create a snapshot before proceeding or reject this suggestion.", ); } const responseText = parts.join("\n"); return { content: [ { type: "text", text: responseText, }, ], isError: false, }; } catch (error: any) { const sanitized = ErrorHandler.sanitize(error, "analyze_suggestion"); return { content: [ { type: "text", text: `${sanitized.message} (Log ID: ${sanitized.logId})`, }, ], isError: true, }; } } if (name === "check_iteration_safety") { // Validate input using Zod schema const validated = CheckIterationSafetySchema.parse(args); const { file_path } = validated; // Additional security validation for file path validateFilePath(file_path); try { // Get real iteration stats from the backend const stats = await PerformanceTracker.track("check_iteration_safety", () => client.getIterationStats(file_path), ); // Publish event when iteration safety is checked eventBus.publish(SnapBackEvent.ANALYSIS_COMPLETED, { filePath: file_path, timestamp: Date.now(), iterationCount: stats.consecutiveAIEdits, riskLevel: stats.riskLevel, }); const emoji = stats.riskLevel === "high" ? "🚨" : stats.riskLevel === "medium" ? "⚠️" : "✅"; const parts = [ `${emoji} Iteration Safety Check`, "", `Current AI Iterations: ${stats.consecutiveAIEdits}`, `Risk Level: ${stats.riskLevel.toUpperCase()}`, `Change Velocity: ${stats.velocity} edits/min`, "", stats.recommendation, ]; const responseText = parts.join("\n"); return { content: [ { type: "text", text: responseText, }, ], isError: false, }; } catch (error: any) { const sanitized = ErrorHandler.sanitize(error, "check_iteration_safety"); return { content: [ { type: "text", text: `${sanitized.message} (Log ID: ${sanitized.logId})`, }, ], isError: true, }; } } if (name === "create_snapshot") { // Validate input using Zod schema const validated = CreateSnapshotSchema.parse(args); const { file_path, reason } = validated; // Additional security validation for file path validateFilePath(file_path); try { const snapshot = await PerformanceTracker.track("create_snapshot", () => client.createSnapshot({ filePath: file_path, reason: reason || "Manual snapshot via MCP", source: "mcp", }), ); // Publish event when snapshot is created eventBus.publish(SnapBackEvent.SNAPSHOT_CREATED, { id: snapshot.id, filePath: file_path, source: "mcp", timestamp: snapshot.timestamp, }); const parts = [ "✅ Snapshot created successfully", "", `Snapshot ID: ${snapshot.id}`, `File: ${file_path}`, `Timestamp: ${new Date(snapshot.timestamp).toLocaleString()}`, `Reason: ${reason || "Manual snapshot"}`, "", "You can now safely proceed with changes. If anything goes wrong, you can restore to this snapshot.", ]; const responseText = parts.join("\n"); return { content: [ { type: "text", text: responseText, }, ], isError: false, }; } catch (error: any) { const sanitized = ErrorHandler.sanitize(error, "create_snapshot"); return { content: [ { type: "text", text: `${sanitized.message} (Log ID: ${sanitized.logId})`, }, ], isError: true, }; } } // Handle existing SnapBack tools if (name === "snapback.analyze_risk") { const parsed = z .object({ changes: z.array( z.object({ added: z.boolean().optional().default(false), removed: z.boolean().optional().default(false), value: z.string(), count: z.number().optional(), }), ), }) .parse(args); const risk = await guardian.analyze(parsed.changes); return { content: [{ type: "json", json: risk }] }; } if (name === "snapback.check_dependencies") { const parsed = z .object({ before: z.record(z.string(), z.any()), after: z.record(z.string(), z.any()), }) .parse(args); const result = dep.quickAnalyze(parsed.before, parsed.after); return { content: [{ type: "json", json: result }] }; } if (name === "snapback.create_snapshot") { const input = z .object({ trigger: z.string().default("mcp"), content: z.string().optional(), }) .parse(args); const snap = await storage.create({ description: input.content, protected: false, }); return { content: [{ type: "json", json: snap }] }; } if (name === "snapback.list_snapshots") { const snaps = await storage.list(); return { content: [{ type: "json", json: snaps }] }; } // Handle catalog tool if (name === "catalog.list_tools") { const catalog = mcpManager.getToolCatalog(); return { content: [{ type: "json", json: catalog }] }; } // Handle proxied tools from external MCP servers if (name.startsWith("ctx7.") || name.startsWith("gh.") || name.startsWith("registry.")) { const result = await mcpManager.callToolByName(name, args); return result; } throw new Error("Unknown tool " + name); } catch (error: any) { // Log error to stderr to avoid corrupting stdout JSON-RPC messages const sanitized = ErrorHandler.sanitize(error, `tool_call_${name}`); console.error(`[SnapBack MCP] Error handling tool ${name}:`, error); return { content: [ { type: "text", text: `${sanitized.message} (Log ID: ${sanitized.logId})`, }, ], isError: true, error: { message: sanitized.message, code: sanitized.code, }, }; } }); // Register resources listing server.setRequestHandler(ListResourcesRequestSchema, async () => ({ resources: [ { uri: "snapback://session/current", name: "Current Session State", description: "Real-time information about the current coding session", mimeType: "application/json", }, { uri: "snapback://guidelines/safety", name: "Safety Guidelines", description: "Project-specific safety rules and patterns to avoid", mimeType: "text/plain", }, ], })); // Handle resource reading server.setRequestHandler(ReadResourceRequestSchema, async (request) => { const uri = request.params.uri; try { if (uri === "snapback://session/current") { try { // Get real session data from the backend using parallel execution const session = await PerformanceTracker.track("get_current_session", () => client.getCurrentSession(), ); return { contents: [ { uri, mimeType: "application/json", text: JSON.stringify(session, null, 2), }, ], }; } catch (error: any) { const sanitized = ErrorHandler.sanitize(error, "get_current_session"); throw new Error(`${sanitized.message} (Log ID: ${sanitized.logId})`); } } if (uri === "snapback://guidelines/safety") { try { // Get real safety guidelines from the backend const guidelines = await PerformanceTracker.track("get_safety_guidelines", () => client.getSafetyGuidelines(), ); return { contents: [ { uri, mimeType: "text/plain", text: guidelines, }, ], }; } catch (error: any) { const sanitized = ErrorHandler.sanitize(error, "get_safety_guidelines"); throw new Error(`${sanitized.message} (Log ID: ${sanitized.logId})`); } } throw new Error("Unknown resource: " + uri); } catch (error: any) { // Log error to stderr to avoid corrupting stdout JSON-RPC messages const sanitized = ErrorHandler.sanitize(error, `read_resource_${uri}`); console.error(`[SnapBack MCP] Error reading resource ${uri}:`, error); throw new Error(`${sanitized.message} (Log ID: ${sanitized.logId})`); } }); // Register prompts listing server.setRequestHandler(ListPromptsRequestSchema, async () => ({ prompts: [ { name: "safety_context", description: "Safety context and guidelines for AI-assisted development", arguments: [], }, { name: "risk_warning", description: "Context-aware risk warnings for specific scenarios", arguments: [ { name: "risk_type", description: "Type of risk to warn about", required: true, }, ], }, ], })); // Handle prompt retrieval server.setRequestHandler(GetPromptRequestSchema, async (request) => { try { if (request.params.name === "safety_context") { try { // Execute both API calls in parallel for better performance const [session, guidelines] = await PerformanceTracker.track("get_safety_context", () => Promise.all([client.getCurrentSession(), client.getSafetyGuidelines()]), ); return { messages: [ { role: "user", content: { type: "text", text: `# SnapBack Safety Context ## Current Session - File: ${session.filePath} - AI Iterations: ${session.consecutiveAIEdits} - Risk Level: ${session.riskLevel} ## Safety Guidelines ${guidelines} ## Recommendations - Always review AI suggestions before applying - Create snapshots before major changes - Monitor iteration count (${session.consecutiveAIEdits}/5 threshold) - Block high-risk changes automatically`, }, }, ], }; } catch (error: any) { const sanitized = ErrorHandler.sanitize(error, "get_safety_context"); throw new Error(`${sanitized.message} (Log ID: ${sanitized.logId})`); } } if (request.params.name === "risk_warning") { const riskType = (request.params.arguments as any)?.risk_type; const warnings: Record<string, string> = { "high-iteration": "⚠️ You have made 5+ consecutive AI edits. Research shows quality degrades significantly. Consider testing before continuing.", security: "🚨 Security vulnerability detected in AI suggestion. Review carefully before accepting.", "breaking-change": "⚠️ This change may break existing functionality. Consider creating a snapshot first.", complexity: "⚠️ Complexity increase detected. This may make code harder to maintain.", config: "🚨 Configuration file change detected. These often have cascading effects.", }; const warningText = warnings[riskType] || "⚠️ Potential risk detected. Review carefully."; return { messages: [ { role: "user", content: { type: "text", text: warningText, }, }, ], }; } throw new Error("Unknown prompt: " + request.params.name); } catch (error: any) { // Log error to stderr to avoid corrupting stdout JSON-RPC messages const sanitized = ErrorHandler.sanitize(error, `get_prompt_${request.params.name}`); console.error(`[SnapBack MCP] Error getting prompt ${request.params.name}:`, error); throw new Error(`${sanitized.message} (Log ID: ${sanitized.logId})`); } }); const transport = new StdioServerTransport(); await server.connect(transport); // Log to stderr to avoid corrupting stdout JSON-RPC messages console.error("SnapBack MCP Server started"); return { server, transport }; } // Only start the server if this file is run directly if (import.meta.url === new URL(process.argv[1], "file:").href) { startServer().catch(console.error); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/snapback-dev/mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server