import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
GetPromptRequestSchema,
ListPromptsRequestSchema,
ListResourcesRequestSchema,
ListToolsRequestSchema,
ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import type { Snapshot } from "@snapback/contracts";
import { DependencyAnalyzer, Guardian, MCPClientManager } from "@snapback/core";
import { SnapBackEvent, SnapBackEventBus } from "@snapback/events";
import { z } from "zod";
import { SnapBackAPIClient } from "./client/snapback-api.js";
import {
AnalyzeSuggestionSchema,
CheckIterationSafetySchema,
CreateSnapshotSchema,
validateFilePath,
} from "./utils/security.js";
// Performance tracker for monitoring operation times
class PerformanceTracker {
static async track<T>(operation: string, fn: () => Promise<T>): Promise<T> {
const start = Date.now();
try {
return await fn();
} finally {
const duration = Date.now() - start;
console.error(`[PERF] ${operation}: ${duration}ms`);
// Alert if exceeds budget (example budgets)
const PERFORMANCE_BUDGETS: Record<string, number> = {
analyze_suggestion: 200,
check_iteration_safety: 100,
create_snapshot: 500,
};
if (duration > (PERFORMANCE_BUDGETS[operation] || 1000)) {
console.warn(
`[PERF] ⚠️ ${operation} exceeded budget: ${duration}ms > ${PERFORMANCE_BUDGETS[operation] || 1000}ms`,
);
}
}
}
}
// Export storage factory for testing
export function createStorage(testStorage?: any) {
// If testStorage is provided, return it instead of creating a new one
if (testStorage) {
return testStorage;
}
return {
create: async (options?: { description?: string; protected?: boolean }) => {
return {
id: `snap-${Date.now()}`,
timestamp: Date.now(),
meta: options || {},
} as Snapshot;
},
retrieve: async () => {
return null;
},
list: async () => {
return [];
},
restore: async (_id: string, _targetPath: string, _options?: any) => {
return {
success: true,
restoredFiles: [],
errors: [],
};
},
};
}
// Create API client factory for testing
export function createAPIClient(): SnapBackAPIClient {
// Validate required environment variables
const apiUrl = process.env.SNAPBACK_API_URL;
const apiKey = process.env.SNAPBACK_API_KEY;
// Use defaults in development/test mode
const isDevelopment = process.env.NODE_ENV === "development" || process.env.NODE_ENV === "test";
if (!apiUrl) {
if (isDevelopment) {
console.warn("[SnapBack MCP] Warning: SNAPBACK_API_URL not set, using default: http://localhost:3000");
// Fall through to use default
} else {
throw new Error("Required configuration missing: SNAPBACK_API_URL must be set");
}
}
if (!apiKey) {
if (isDevelopment) {
console.warn("[SnapBack MCP] Warning: SNAPBACK_API_KEY not set, using empty string");
// Fall through to use empty string
} else {
throw new Error("Required configuration missing: SNAPBACK_API_KEY must be set");
}
}
// Validate API key format (skip in development)
if (!isDevelopment && apiKey) {
const API_KEY_MIN_LENGTH = 32;
const API_KEY_PATTERN = /^[A-Za-z0-9_-]{32,}$/;
if (!API_KEY_PATTERN.test(apiKey) || apiKey.length < API_KEY_MIN_LENGTH) {
throw new Error("Invalid API key format");
}
}
return new SnapBackAPIClient({
baseUrl: apiUrl || "http://localhost:3000",
apiKey: apiKey || "",
});
}
// Error handler for sanitizing error messages
class ErrorHandler {
static sanitize(
error: unknown,
context: string,
): {
message: string;
code: string;
logId: string;
} {
// In test/development mode, provide more detailed errors
const isDevelopment = process.env.NODE_ENV === "development" || process.env.NODE_ENV === "test";
const logId = `ERR-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
// Log full details internally
console.error(`[Error ${logId}] ${context}:`, error);
// Return more detailed message in development, generic in production
if (isDevelopment) {
const errorMessage = error instanceof Error ? error.message : String(error);
return {
message: errorMessage,
code: "INTERNAL_ERROR",
logId,
};
}
// Return generic message to client
return {
message: "An internal error occurred. Contact support with log ID.",
code: "INTERNAL_ERROR",
logId,
};
}
}
// Export a function to create and start the server
export async function startServer(apiClient?: SnapBackAPIClient, testStorage?: any) {
// In a real implementation, this would be injected or created through a factory
// Mock implementation - replace with actual storage in production
const storage = createStorage(testStorage);
const guardian = new Guardian();
const dep = new DependencyAnalyzer();
const mcpManager = new MCPClientManager();
// Connect to external MCP servers from config (non-blocking)
try {
await mcpManager.connectFromConfig();
} catch (err) {
console.error("[SnapBack MCP] External MCP connection failed:", err);
}
// Create SnapBack API client if not provided
const client = apiClient || createAPIClient();
// Initialize event bus
const eventBus = new SnapBackEventBus();
try {
await eventBus.connect();
console.error("[SnapBack MCP] Connected to event bus");
} catch (err) {
console.error("[SnapBack MCP] Failed to connect to event bus:", err);
}
const server = new Server(
{ name: "snapback-mcp-server", version: "1.0.0" },
{
capabilities: {
tools: {},
resources: {},
prompts: {},
},
},
);
// Register tools listing
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: "analyze_suggestion",
description: `**Purpose:** Analyze AI-generated code for security, performance, and quality risks before applying changes.
**When to Use:**
- BEFORE accepting code suggestions (proactive safety)
- When user asks "is this safe to apply?"
- For files with BLOCK protection level (required analysis)
- When multiple consecutive AI edits detected (iteration risk)
**When NOT to Use:**
- After user manually created snapshot (already protected)
- For non-code files (images, docs, configs without logic)
- When user explicitly says "skip analysis" or "I trust this"
**Contextual Integration:**
- Checks current file protection level from Extension state
- Considers recent edit velocity (5+ consecutive edits = high risk)
- Aware of git context (unstaged changes, branch name)
**Output:**
- ✅ SAFE (green): Apply confidently, auto-snapshot if protected
- ⚠️ WARN (yellow): Review carefully, show issues to user
- 🚨 BLOCK (red): Do not apply, create manual snapshot first if proceeding
**Example Workflow:**
1. User types: "refactor this function to use async/await"
2. LLM generates code suggestion
3. **Call analyze_suggestion** with generated code + file_path
4. Parse response:
- If SAFE → Apply + create snapshot (if file protected)
- If WARN → Show issues, ask user confirmation
- If BLOCK → Stop, require manual snapshot creation
**Performance:** < 200ms average analysis time`,
inputSchema: {
type: "object",
properties: {
code: {
type: "string",
description: "The AI-generated code to analyze (max 1MB)",
},
file_path: {
type: "string",
description: "Relative path from workspace root (e.g., 'src/utils/helper.ts')",
},
context: {
type: "object",
description: "Additional analysis context (optional but recommended)",
properties: {
surrounding_code: {
type: "string",
description: "Code around the change (helps detect breaking changes)",
},
project_type: {
type: "string",
enum: ["node", "browser", "deno"],
description: "Runtime environment for accurate risk assessment",
},
language: {
type: "string",
enum: ["javascript", "typescript", "python"],
description: "Programming language for syntax-specific checks",
},
},
},
},
required: ["code", "file_path"],
},
},
{
name: "check_iteration_safety",
description: `**Purpose:** Check if continuing with AI suggestions is safe based on iteration count.
**When to Use:**
- Periodically during AI-assisted coding sessions
- When user has made 3+ consecutive AI edits
- Before accepting major refactor suggestions
- When velocity exceeds safe thresholds
**Contextual Integration:**
- Tracks consecutive AI edits per file
- Monitors edit velocity (edits/minute)
- Considers file protection levels
- Integrates with Extension's save handler
**Output:**
- Current iteration count
- Risk level (low/medium/high)
- Velocity metrics
- Actionable recommendation
**Performance:** < 100ms average`,
inputSchema: {
type: "object",
properties: {
file_path: {
type: "string",
description: "Path to the file being edited",
},
},
required: ["file_path"],
},
},
{
name: "create_snapshot",
description: `**Purpose:** Manually create a code snapshot before making risky changes.
**When to Use:**
- Before accepting significant AI suggestions
- When about to refactor critical files
- Before implementing breaking changes
- When user explicitly requests snapshot
**Contextual Integration:**
- Creates snapshot with MCP source tagging
- Updates Extension UI in real-time
- Integrates with protection levels
- Available immediately in restore commands
**Output:**
- Snapshot ID for later restoration
- Confirmation message with timestamp
- File list included in snapshot
**Performance:** < 500ms average (larger files may take longer)`,
inputSchema: {
type: "object",
properties: {
file_path: {
type: "string",
description: "Path to the file to snapshot",
},
reason: {
type: "string",
description: "Reason for creating snapshot (e.g., 'before major refactor')",
},
},
required: ["file_path"],
},
},
// Existing SnapBack tools
{
name: "snapback.analyze_risk",
description: "Analyze code changes for potential risks",
inputSchema: {
type: "object",
properties: {
changes: {
type: "array",
items: {
type: "object",
properties: {
added: { type: "boolean" },
removed: { type: "boolean" },
value: { type: "string" },
count: { type: "number" },
},
required: ["value"],
},
},
},
required: ["changes"],
},
},
{
name: "snapback.check_dependencies",
description: "Check for dependency-related risks",
inputSchema: {
type: "object",
properties: {
before: {
type: "object",
additionalProperties: true,
},
after: {
type: "object",
additionalProperties: true,
},
},
required: ["before", "after"],
},
},
{
name: "snapback.create_snapshot",
description: "Create a code snapshot",
inputSchema: {
type: "object",
properties: {
trigger: { type: "string" },
content: { type: "string" },
},
},
},
{
name: "snapback.list_snapshots",
description: "List available snapshots",
inputSchema: {
type: "object",
properties: {},
},
},
{
name: "catalog.list_tools",
description: "List available tools from connected MCP servers",
inputSchema: {
type: "object",
properties: {},
},
},
],
}));
// Handle tool calls
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
// Handle new SnapBack MCP tools
if (name === "analyze_suggestion") {
// Validate input using Zod schema
const validated = AnalyzeSuggestionSchema.parse(args);
const { code, file_path, context } = validated;
// Additional security validation for file path
validateFilePath(file_path);
try {
// Call SnapBack backend for fast analysis
const analysis = await PerformanceTracker.track("analyze_suggestion", () =>
client.analyzeFast({
code,
filePath: file_path,
// Handle different context naming conventions
context: context
? {
surroundingCode: context.surrounding_code || context.surroundingCode,
projectType: context.project_type || context.projectType || "node",
language: context.language || "javascript",
}
: undefined,
}),
);
// Publish event when analysis is completed
eventBus.publish(SnapBackEvent.ANALYSIS_COMPLETED, {
filePath: file_path,
timestamp: Date.now(),
riskLevel: analysis.riskLevel,
});
// Format response for AI tool using array.join for O(n) complexity
const parts = [
`${analysis.riskLevel === "safe" ? "✅" : analysis.riskLevel === "low" ? "🟢" : analysis.riskLevel === "medium" ? "🟡" : analysis.riskLevel === "high" ? "🔴" : "🚨"} Risk Analysis Complete`,
"",
`Risk Level: ${analysis.riskLevel.toUpperCase()}`,
`Recommendation: ${
analysis.riskLevel === "high" || analysis.riskLevel === "critical"
? "BLOCK: Do not apply this suggestion"
: analysis.riskLevel === "medium"
? "WARN: Review carefully before applying"
: "ALLOW: Safe to apply"
}`,
`Analysis Time: ${analysis.analysisTimeMs}ms`,
"",
];
if (analysis.issues.length > 0) {
parts.push("Issues Detected:");
// Limit issues to prevent resource exhaustion
const maxIssues = 100;
const issuesToShow = analysis.issues.slice(0, maxIssues);
for (const [idx, issue] of issuesToShow.entries()) {
parts.push(`${idx + 1}. [${issue.severity}] ${issue.message}`);
if (issue.line) {
const col = issue.column ? `, Column ${issue.column}` : "";
parts.push(` Line ${issue.line}${col}`);
}
}
if (analysis.issues.length > maxIssues) {
parts.push(`... and ${analysis.issues.length - maxIssues} more issues`);
}
} else {
parts.push("No issues detected. This change appears safe.");
}
// Add actionable guidance
if (analysis.riskLevel === "high" || analysis.riskLevel === "critical") {
parts.push("");
parts.push(
"⚠️ RECOMMENDED ACTION: Create a snapshot before proceeding or reject this suggestion.",
);
}
const responseText = parts.join("\n");
return {
content: [
{
type: "text",
text: responseText,
},
],
isError: false,
};
} catch (error: any) {
const sanitized = ErrorHandler.sanitize(error, "analyze_suggestion");
return {
content: [
{
type: "text",
text: `${sanitized.message} (Log ID: ${sanitized.logId})`,
},
],
isError: true,
};
}
}
if (name === "check_iteration_safety") {
// Validate input using Zod schema
const validated = CheckIterationSafetySchema.parse(args);
const { file_path } = validated;
// Additional security validation for file path
validateFilePath(file_path);
try {
// Get real iteration stats from the backend
const stats = await PerformanceTracker.track("check_iteration_safety", () =>
client.getIterationStats(file_path),
);
// Publish event when iteration safety is checked
eventBus.publish(SnapBackEvent.ANALYSIS_COMPLETED, {
filePath: file_path,
timestamp: Date.now(),
iterationCount: stats.consecutiveAIEdits,
riskLevel: stats.riskLevel,
});
const emoji = stats.riskLevel === "high" ? "🚨" : stats.riskLevel === "medium" ? "⚠️" : "✅";
const parts = [
`${emoji} Iteration Safety Check`,
"",
`Current AI Iterations: ${stats.consecutiveAIEdits}`,
`Risk Level: ${stats.riskLevel.toUpperCase()}`,
`Change Velocity: ${stats.velocity} edits/min`,
"",
stats.recommendation,
];
const responseText = parts.join("\n");
return {
content: [
{
type: "text",
text: responseText,
},
],
isError: false,
};
} catch (error: any) {
const sanitized = ErrorHandler.sanitize(error, "check_iteration_safety");
return {
content: [
{
type: "text",
text: `${sanitized.message} (Log ID: ${sanitized.logId})`,
},
],
isError: true,
};
}
}
if (name === "create_snapshot") {
// Validate input using Zod schema
const validated = CreateSnapshotSchema.parse(args);
const { file_path, reason } = validated;
// Additional security validation for file path
validateFilePath(file_path);
try {
const snapshot = await PerformanceTracker.track("create_snapshot", () =>
client.createSnapshot({
filePath: file_path,
reason: reason || "Manual snapshot via MCP",
source: "mcp",
}),
);
// Publish event when snapshot is created
eventBus.publish(SnapBackEvent.SNAPSHOT_CREATED, {
id: snapshot.id,
filePath: file_path,
source: "mcp",
timestamp: snapshot.timestamp,
});
const parts = [
"✅ Snapshot created successfully",
"",
`Snapshot ID: ${snapshot.id}`,
`File: ${file_path}`,
`Timestamp: ${new Date(snapshot.timestamp).toLocaleString()}`,
`Reason: ${reason || "Manual snapshot"}`,
"",
"You can now safely proceed with changes. If anything goes wrong, you can restore to this snapshot.",
];
const responseText = parts.join("\n");
return {
content: [
{
type: "text",
text: responseText,
},
],
isError: false,
};
} catch (error: any) {
const sanitized = ErrorHandler.sanitize(error, "create_snapshot");
return {
content: [
{
type: "text",
text: `${sanitized.message} (Log ID: ${sanitized.logId})`,
},
],
isError: true,
};
}
}
// Handle existing SnapBack tools
if (name === "snapback.analyze_risk") {
const parsed = z
.object({
changes: z.array(
z.object({
added: z.boolean().optional().default(false),
removed: z.boolean().optional().default(false),
value: z.string(),
count: z.number().optional(),
}),
),
})
.parse(args);
const risk = await guardian.analyze(parsed.changes);
return { content: [{ type: "json", json: risk }] };
}
if (name === "snapback.check_dependencies") {
const parsed = z
.object({
before: z.record(z.string(), z.any()),
after: z.record(z.string(), z.any()),
})
.parse(args);
const result = dep.quickAnalyze(parsed.before, parsed.after);
return { content: [{ type: "json", json: result }] };
}
if (name === "snapback.create_snapshot") {
const input = z
.object({
trigger: z.string().default("mcp"),
content: z.string().optional(),
})
.parse(args);
const snap = await storage.create({
description: input.content,
protected: false,
});
return { content: [{ type: "json", json: snap }] };
}
if (name === "snapback.list_snapshots") {
const snaps = await storage.list();
return { content: [{ type: "json", json: snaps }] };
}
// Handle catalog tool
if (name === "catalog.list_tools") {
const catalog = mcpManager.getToolCatalog();
return { content: [{ type: "json", json: catalog }] };
}
// Handle proxied tools from external MCP servers
if (name.startsWith("ctx7.") || name.startsWith("gh.") || name.startsWith("registry.")) {
const result = await mcpManager.callToolByName(name, args);
return result;
}
throw new Error("Unknown tool " + name);
} catch (error: any) {
// Log error to stderr to avoid corrupting stdout JSON-RPC messages
const sanitized = ErrorHandler.sanitize(error, `tool_call_${name}`);
console.error(`[SnapBack MCP] Error handling tool ${name}:`, error);
return {
content: [
{
type: "text",
text: `${sanitized.message} (Log ID: ${sanitized.logId})`,
},
],
isError: true,
error: {
message: sanitized.message,
code: sanitized.code,
},
};
}
});
// Register resources listing
server.setRequestHandler(ListResourcesRequestSchema, async () => ({
resources: [
{
uri: "snapback://session/current",
name: "Current Session State",
description: "Real-time information about the current coding session",
mimeType: "application/json",
},
{
uri: "snapback://guidelines/safety",
name: "Safety Guidelines",
description: "Project-specific safety rules and patterns to avoid",
mimeType: "text/plain",
},
],
}));
// Handle resource reading
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const uri = request.params.uri;
try {
if (uri === "snapback://session/current") {
try {
// Get real session data from the backend using parallel execution
const session = await PerformanceTracker.track("get_current_session", () =>
client.getCurrentSession(),
);
return {
contents: [
{
uri,
mimeType: "application/json",
text: JSON.stringify(session, null, 2),
},
],
};
} catch (error: any) {
const sanitized = ErrorHandler.sanitize(error, "get_current_session");
throw new Error(`${sanitized.message} (Log ID: ${sanitized.logId})`);
}
}
if (uri === "snapback://guidelines/safety") {
try {
// Get real safety guidelines from the backend
const guidelines = await PerformanceTracker.track("get_safety_guidelines", () =>
client.getSafetyGuidelines(),
);
return {
contents: [
{
uri,
mimeType: "text/plain",
text: guidelines,
},
],
};
} catch (error: any) {
const sanitized = ErrorHandler.sanitize(error, "get_safety_guidelines");
throw new Error(`${sanitized.message} (Log ID: ${sanitized.logId})`);
}
}
throw new Error("Unknown resource: " + uri);
} catch (error: any) {
// Log error to stderr to avoid corrupting stdout JSON-RPC messages
const sanitized = ErrorHandler.sanitize(error, `read_resource_${uri}`);
console.error(`[SnapBack MCP] Error reading resource ${uri}:`, error);
throw new Error(`${sanitized.message} (Log ID: ${sanitized.logId})`);
}
});
// Register prompts listing
server.setRequestHandler(ListPromptsRequestSchema, async () => ({
prompts: [
{
name: "safety_context",
description: "Safety context and guidelines for AI-assisted development",
arguments: [],
},
{
name: "risk_warning",
description: "Context-aware risk warnings for specific scenarios",
arguments: [
{
name: "risk_type",
description: "Type of risk to warn about",
required: true,
},
],
},
],
}));
// Handle prompt retrieval
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
try {
if (request.params.name === "safety_context") {
try {
// Execute both API calls in parallel for better performance
const [session, guidelines] = await PerformanceTracker.track("get_safety_context", () =>
Promise.all([client.getCurrentSession(), client.getSafetyGuidelines()]),
);
return {
messages: [
{
role: "user",
content: {
type: "text",
text: `# SnapBack Safety Context
## Current Session
- File: ${session.filePath}
- AI Iterations: ${session.consecutiveAIEdits}
- Risk Level: ${session.riskLevel}
## Safety Guidelines
${guidelines}
## Recommendations
- Always review AI suggestions before applying
- Create snapshots before major changes
- Monitor iteration count (${session.consecutiveAIEdits}/5 threshold)
- Block high-risk changes automatically`,
},
},
],
};
} catch (error: any) {
const sanitized = ErrorHandler.sanitize(error, "get_safety_context");
throw new Error(`${sanitized.message} (Log ID: ${sanitized.logId})`);
}
}
if (request.params.name === "risk_warning") {
const riskType = (request.params.arguments as any)?.risk_type;
const warnings: Record<string, string> = {
"high-iteration":
"⚠️ You have made 5+ consecutive AI edits. Research shows quality degrades significantly. Consider testing before continuing.",
security: "🚨 Security vulnerability detected in AI suggestion. Review carefully before accepting.",
"breaking-change":
"⚠️ This change may break existing functionality. Consider creating a snapshot first.",
complexity: "⚠️ Complexity increase detected. This may make code harder to maintain.",
config: "🚨 Configuration file change detected. These often have cascading effects.",
};
const warningText = warnings[riskType] || "⚠️ Potential risk detected. Review carefully.";
return {
messages: [
{
role: "user",
content: {
type: "text",
text: warningText,
},
},
],
};
}
throw new Error("Unknown prompt: " + request.params.name);
} catch (error: any) {
// Log error to stderr to avoid corrupting stdout JSON-RPC messages
const sanitized = ErrorHandler.sanitize(error, `get_prompt_${request.params.name}`);
console.error(`[SnapBack MCP] Error getting prompt ${request.params.name}:`, error);
throw new Error(`${sanitized.message} (Log ID: ${sanitized.logId})`);
}
});
const transport = new StdioServerTransport();
await server.connect(transport);
// Log to stderr to avoid corrupting stdout JSON-RPC messages
console.error("SnapBack MCP Server started");
return { server, transport };
}
// Only start the server if this file is run directly
if (import.meta.url === new URL(process.argv[1], "file:").href) {
startServer().catch(console.error);
}