---
description: Advanced MCP (Model Context Protocol) patterns
globs:
alwaysApply: false
---
> You are an expert in advanced MCP (Model Context Protocol) patterns, TypeScript, streaming protocols, and distributed systems. You focus on implementing sophisticated MCP features like sampling, roots management, resumable streaming, and complex message handling patterns.
## MCP Advanced Architecture Flow
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Sampling │ │ Roots Mgmt │ │ Streaming │
│ - LLM Request │───▶│ - File Systems │───▶│ - SSE/HTTP │
│ - Context │ │ - Permissions │ │ - Resumability│
│ - Model Prefs │ │ - Validation │ │ - Event Store │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Message │ │ Transport │ │ Session │
│ - Concurrency │ │ - Multiple │ │ - State Mgmt │
│ - Batching │ │ - Protocols │ │ - Persistence │
│ - Validation │ │ - Fallbacks │ │ - Recovery │
└─────────────────┘ └──────────────────┘ └─────────────────┘
```
## Project Structure
```
mcp-advanced/
├── src/
│ ├── sampling/ # LLM sampling implementations
│ │ ├── types.ts # Sampling types and schemas
│ │ ├── handler.ts # Core sampling logic
│ │ └── preferences.ts # Model preferences
│ ├── roots/ # File system roots management
│ │ ├── manager.ts # Root registration and validation
│ │ ├── resolver.ts # URI resolution
│ │ └── watcher.ts # Change notifications
│ ├── streaming/ # Advanced streaming patterns
│ │ ├── eventStore.ts # Event persistence
│ │ ├── resumable.ts # Resumable connections
│ │ └── multiplexer.ts # Stream multiplexing
│ ├── transport/ # Multi-protocol transport
│ │ ├── streamableHttp.ts # Streamable HTTP implementation
│ │ ├── sse.ts # Server-Sent Events
│ │ └── compatibility.ts # Backward compatibility
│ └── session/ # Session management
│ ├── manager.ts # Session lifecycle
│ ├── persistence.ts # State persistence
│ └── recovery.ts # Recovery mechanisms
└── examples/
├── samplingServer.ts # Sampling implementation example
├── rootsServer.ts # Roots management example
└── streamingServer.ts # Advanced streaming example
```
## Core Implementation Patterns
### Sampling Request Handler
```typescript
// ✅ DO: Implement comprehensive sampling with proper validation
import { z } from 'zod';
import {
CreateMessageRequest,
CreateMessageResult,
SamplingMessage,
ModelPreferencesSchema
} from '@modelcontextprotocol/sdk/types.js';
interface SamplingContext {
systemPrompt?: string;
includeContext?: "none" | "thisServer" | "allServers";
temperature?: number;
maxTokens: number;
stopSequences?: string[];
metadata?: Record<string, unknown>;
modelPreferences?: {
hints?: Array<{ name?: string }>;
costPriority?: number;
speedPriority?: number;
intelligencePriority?: number;
};
}
class AdvancedSamplingHandler {
constructor(
private llmProvider: LLMProvider,
private contextManager: ContextManager
) {}
async createMessage(
messages: SamplingMessage[],
context: SamplingContext
): Promise<CreateMessageResult> {
// Validate sampling request
this.validateSamplingRequest(messages, context);
// Include context if requested
const enhancedMessages = await this.enhanceWithContext(
messages,
context.includeContext
);
// Select optimal model based on preferences
const selectedModel = await this.selectModel(context.modelPreferences);
try {
const response = await this.llmProvider.generate({
model: selectedModel,
messages: enhancedMessages,
systemPrompt: context.systemPrompt,
temperature: context.temperature,
maxTokens: context.maxTokens,
stopSequences: context.stopSequences,
metadata: context.metadata,
});
return {
model: response.model,
stopReason: response.stopReason as "endTurn" | "stopSequence" | "maxTokens",
role: "assistant",
content: {
type: "text",
text: response.content,
},
};
} catch (error) {
throw new McpError(
ErrorCode.InternalError,
`Sampling failed: ${error.message}`,
{ originalError: error }
);
}
}
private validateSamplingRequest(
messages: SamplingMessage[],
context: SamplingContext
): void {
if (!messages?.length) {
throw new McpError(
ErrorCode.InvalidParams,
"Messages array cannot be empty"
);
}
if (context.maxTokens <= 0) {
throw new McpError(
ErrorCode.InvalidParams,
"maxTokens must be greater than 0"
);
}
// Validate model preferences if provided
if (context.modelPreferences) {
const result = ModelPreferencesSchema.safeParse(context.modelPreferences);
if (!result.success) {
throw new McpError(
ErrorCode.InvalidParams,
"Invalid model preferences",
result.error.issues
);
}
}
}
private async enhanceWithContext(
messages: SamplingMessage[],
includeContext?: "none" | "thisServer" | "allServers"
): Promise<SamplingMessage[]> {
if (!includeContext || includeContext === "none") {
return messages;
}
const contextMessages = await this.contextManager.getContext(includeContext);
return [...contextMessages, ...messages];
}
private async selectModel(
preferences?: SamplingContext["modelPreferences"]
): Promise<string> {
if (!preferences?.hints?.length) {
return this.llmProvider.getDefaultModel();
}
return this.llmProvider.selectOptimalModel({
hints: preferences.hints,
costPriority: preferences.costPriority ?? 0.5,
speedPriority: preferences.speedPriority ?? 0.5,
intelligencePriority: preferences.intelligencePriority ?? 0.5,
});
}
}
// ❌ DON'T: Use minimal sampling without proper validation
class BadSamplingHandler {
async createMessage(messages: any, config: any) {
return await this.llm.generate(messages); // No validation or error handling
}
}
```
### Context Management
```typescript
// ✅ DO: Implement sophisticated context management
class ContextManager {
private cache = new Map<string, SamplingMessage[]>();
private readonly contextTtl = 5 * 60 * 1000; // 5 minutes
constructor(
private resourceManager: ResourceManager,
private toolManager: ToolManager
) {}
async getContext(type: "thisServer" | "allServers"): Promise<SamplingMessage[]> {
const cacheKey = `context:${type}`;
// Check cache first
if (this.cache.has(cacheKey)) {
return this.cache.get(cacheKey)!;
}
const context = await this.buildContext(type);
// Cache with TTL
this.cache.set(cacheKey, context);
setTimeout(() => this.cache.delete(cacheKey), this.contextTtl);
return context;
}
private async buildContext(type: "thisServer" | "allServers"): Promise<SamplingMessage[]> {
const contextMessages: SamplingMessage[] = [];
if (type === "thisServer") {
// Add current server's resources and tools
contextMessages.push(...await this.getServerContext());
} else {
// Add context from all available servers
contextMessages.push(...await this.getAllServersContext());
}
return contextMessages;
}
private async getServerContext(): Promise<SamplingMessage[]> {
const context: SamplingMessage[] = [];
// Include available resources
try {
const resources = await this.resourceManager.listResources();
context.push({
role: "user",
content: {
type: "text",
text: `Available resources: ${resources.map(r => r.name).join(", ")}`,
},
});
} catch (error) {
console.warn("Failed to fetch resources for context:", error);
}
// Include available tools
try {
const tools = await this.toolManager.listTools();
context.push({
role: "user",
content: {
type: "text",
text: `Available tools: ${tools.map(t => t.name).join(", ")}`,
},
});
} catch (error) {
console.warn("Failed to fetch tools for context:", error);
}
return context;
}
clearContext(): void {
this.cache.clear();
}
}
```
## Advanced Roots Management
### Root Registration and Validation
```typescript
// ✅ DO: Implement comprehensive roots management
import { Root, ListRootsResult } from '@modelcontextprotocol/sdk/types.js';
import { watch, FSWatcher } from 'fs';
import { stat, access } from 'fs/promises';
import { dirname, resolve } from 'path';
interface RootConfig extends Root {
watchForChanges?: boolean;
permissions?: {
read: boolean;
write: boolean;
execute: boolean;
};
metadata?: Record<string, unknown>;
}
class RootsManager {
private roots = new Map<string, RootConfig>();
private watchers = new Map<string, FSWatcher>();
private changeListeners: Array<(roots: Root[]) => void> = [];
constructor(private server: McpServer) {}
async addRoot(rootConfig: RootConfig): Promise<void> {
// Validate root URI
if (!this.isValidRootUri(rootConfig.uri)) {
throw new McpError(
ErrorCode.InvalidParams,
`Invalid root URI: ${rootConfig.uri}. Must start with file://`
);
}
// Verify file system access
await this.validateFileSystemAccess(rootConfig);
// Store root configuration
this.roots.set(rootConfig.uri, rootConfig);
// Set up file watching if requested
if (rootConfig.watchForChanges) {
await this.setupFileWatcher(rootConfig);
}
// Notify listeners of root changes
await this.notifyRootListChanged();
}
async removeRoot(uri: string): Promise<void> {
if (!this.roots.has(uri)) {
throw new McpError(
ErrorCode.InvalidParams,
`Root not found: ${uri}`
);
}
// Clean up file watcher
const watcher = this.watchers.get(uri);
if (watcher) {
watcher.close();
this.watchers.delete(uri);
}
// Remove root
this.roots.delete(uri);
// Notify listeners
await this.notifyRootListChanged();
}
getRoots(): Root[] {
return Array.from(this.roots.values()).map(({ watchForChanges, permissions, metadata, ...root }) => root);
}
private isValidRootUri(uri: string): boolean {
try {
const url = new URL(uri);
return url.protocol === "file:";
} catch {
return false;
}
}
private async validateFileSystemAccess(rootConfig: RootConfig): Promise<void> {
try {
const url = new URL(rootConfig.uri);
const path = url.pathname;
// Check if path exists
const stats = await stat(path);
// Verify permissions if specified
if (rootConfig.permissions) {
const { read, write, execute } = rootConfig.permissions;
if (read) {
await access(path, require('fs').constants.R_OK);
}
if (write) {
await access(path, require('fs').constants.W_OK);
}
if (execute) {
await access(path, require('fs').constants.X_OK);
}
}
} catch (error) {
throw new McpError(
ErrorCode.InvalidParams,
`Cannot access root path: ${error.message}`,
{ uri: rootConfig.uri, error }
);
}
}
private async setupFileWatcher(rootConfig: RootConfig): Promise<void> {
try {
const url = new URL(rootConfig.uri);
const path = url.pathname;
const watcher = watch(path, { recursive: true }, (eventType, filename) => {
this.handleFileSystemChange(rootConfig.uri, eventType, filename);
});
watcher.on('error', (error) => {
console.error(`File watcher error for ${rootConfig.uri}:`, error);
});
this.watchers.set(rootConfig.uri, watcher);
} catch (error) {
console.error(`Failed to setup file watcher for ${rootConfig.uri}:`, error);
}
}
private handleFileSystemChange(
rootUri: string,
eventType: string,
filename: string | null
): void {
// Emit resource change notifications
if (filename) {
const resourceUri = `${rootUri}/${filename}`;
this.server.sendResourceUpdated({ uri: resourceUri });
}
}
private async notifyRootListChanged(): Promise<void> {
const roots = this.getRoots();
// Notify internal listeners
this.changeListeners.forEach(listener => {
try {
listener(roots);
} catch (error) {
console.error("Error in root change listener:", error);
}
});
// Send MCP notification
try {
await this.server.request({
method: "notifications/roots/list_changed"
});
} catch (error) {
console.warn("Failed to send roots list changed notification:", error);
}
}
onRootListChanged(listener: (roots: Root[]) => void): void {
this.changeListeners.push(listener);
}
async cleanup(): Promise<void> {
// Close all watchers
for (const watcher of this.watchers.values()) {
watcher.close();
}
this.watchers.clear();
this.changeListeners.length = 0;
}
}
// ❌ DON'T: Use minimal roots management without validation
class BadRootsManager {
private roots: string[] = [];
addRoot(uri: string) {
this.roots.push(uri); // No validation or error handling
}
}
```
### Root-Based Resource Resolution
```typescript
// ✅ DO: Implement sophisticated resource resolution
class RootBasedResourceResolver {
constructor(
private rootsManager: RootsManager,
private permissionChecker: PermissionChecker
) {}
async resolveResource(uri: string): Promise<{
localPath: string;
root: Root;
permissions: ResourcePermissions;
}> {
const roots = this.rootsManager.getRoots();
// Find matching root with longest prefix
const matchingRoot = this.findBestMatchingRoot(roots, uri);
if (!matchingRoot) {
throw new McpError(
ErrorCode.InvalidParams,
`No accessible root found for URI: ${uri}`
);
}
// Resolve to local file system path
const localPath = this.resolveToLocalPath(matchingRoot.uri, uri);
// Check security constraints
await this.validateSecurityConstraints(localPath, matchingRoot);
// Determine effective permissions
const permissions = await this.permissionChecker.getResourcePermissions(
localPath,
matchingRoot
);
return {
localPath,
root: matchingRoot,
permissions,
};
}
private findBestMatchingRoot(roots: Root[], uri: string): Root | null {
let bestMatch: Root | null = null;
let longestMatch = 0;
for (const root of roots) {
if (uri.startsWith(root.uri)) {
const matchLength = root.uri.length;
if (matchLength > longestMatch) {
longestMatch = matchLength;
bestMatch = root;
}
}
}
return bestMatch;
}
private resolveToLocalPath(rootUri: string, resourceUri: string): string {
const rootUrl = new URL(rootUri);
const resourceUrl = new URL(resourceUri);
// Calculate relative path
const relativePath = resourceUrl.pathname.substring(rootUrl.pathname.length);
// Resolve to absolute path
return resolve(rootUrl.pathname, relativePath.replace(/^\//, ''));
}
private async validateSecurityConstraints(
localPath: string,
root: Root
): Promise<void> {
const rootUrl = new URL(root.uri);
const rootPath = rootUrl.pathname;
// Ensure path doesn't escape root directory
const resolvedPath = resolve(localPath);
const resolvedRoot = resolve(rootPath);
if (!resolvedPath.startsWith(resolvedRoot)) {
throw new McpError(
ErrorCode.InvalidParams,
"Resource path escapes root directory",
{ path: localPath, root: root.uri }
);
}
// Additional security checks can be added here
// e.g., blacklist certain file types, check file sizes, etc.
}
}
```
## Advanced Streaming Patterns
### Resumable Streaming with Event Store
```typescript
// ✅ DO: Implement production-ready event store
interface EventStore {
storeEvent(streamId: string, message: JSONRPCMessage): Promise<string>;
replayEventsAfter(
lastEventId: string,
handler: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
): Promise<string>;
cleanup?(olderThan: Date): Promise<void>;
}
class RedisEventStore implements EventStore {
constructor(
private redis: Redis,
private ttl: number = 24 * 60 * 60 // 24 hours
) {}
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
const eventId = `${streamId}:${Date.now()}:${crypto.randomUUID()}`;
const eventData = JSON.stringify({
streamId,
message,
timestamp: Date.now(),
});
// Store event with expiration
await this.redis.setex(`event:${eventId}`, this.ttl, eventData);
// Add to stream index for efficient retrieval
await this.redis.zadd(`stream:${streamId}`, Date.now(), eventId);
return eventId;
}
async replayEventsAfter(
lastEventId: string,
{ send }: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
): Promise<string> {
if (!lastEventId) {
return '';
}
// Parse stream ID from event ID
const [streamId] = lastEventId.split(':');
if (!streamId) {
throw new McpError(ErrorCode.InvalidParams, "Invalid event ID format");
}
// Get timestamp of last event
const lastEventData = await this.redis.get(`event:${lastEventId}`);
if (!lastEventData) {
throw new McpError(ErrorCode.InvalidParams, "Last event not found");
}
const { timestamp: lastTimestamp } = JSON.parse(lastEventData);
// Get events after the last timestamp
const eventIds = await this.redis.zrangebyscore(
`stream:${streamId}`,
`(${lastTimestamp}`, // Exclusive of last timestamp
'+inf'
);
// Replay events in order
for (const eventId of eventIds) {
const eventData = await this.redis.get(`event:${eventId}`);
if (eventData) {
const { message } = JSON.parse(eventData);
await send(eventId, message);
}
}
return streamId;
}
async cleanup(olderThan: Date): Promise<void> {
const timestamp = olderThan.getTime();
// Get all stream keys
const streamKeys = await this.redis.keys('stream:*');
for (const streamKey of streamKeys) {
// Remove old events from stream index
const removedEventIds = await this.redis.zremrangebyscore(
streamKey,
'-inf',
timestamp
);
// Remove actual event data
if (removedEventIds.length > 0) {
const eventKeys = removedEventIds.map(id => `event:${id}`);
await this.redis.del(...eventKeys);
}
}
}
}
// ❌ DON'T: Use memory-only storage for production
class BadEventStore implements EventStore {
private events = new Map(); // Will lose data on restart
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
const id = Math.random().toString(); // Not guaranteed unique
this.events.set(id, message);
return id;
}
}
```
### Stream Multiplexing
```typescript
// ✅ DO: Implement sophisticated stream multiplexing
class StreamMultiplexer {
private streams = new Map<string, ServerResponse>();
private requestToStreamMapping = new Map<RequestId, string>();
private pendingResponses = new Map<RequestId, JSONRPCMessage>();
private sessionManager: SessionManager;
constructor(sessionManager: SessionManager) {
this.sessionManager = sessionManager;
}
async handleIncomingRequest(
messages: JSONRPCMessage[],
response: ServerResponse,
sessionId?: string
): Promise<void> {
const streamId = crypto.randomUUID();
// Validate session if provided
if (sessionId && !await this.sessionManager.validateSession(sessionId)) {
response.writeHead(401).end(JSON.stringify({
jsonrpc: "2.0",
error: { code: -32000, message: "Invalid session" },
id: null
}));
return;
}
const requests = messages.filter(isJSONRPCRequest);
const notifications = messages.filter(isJSONRPCNotification);
// Handle notifications immediately
for (const notification of notifications) {
this.handleNotification(notification, sessionId);
}
if (requests.length === 0) {
// Only notifications, return 202 Accepted
response.writeHead(202).end();
return;
}
// Set up streaming response for requests
await this.setupStreamingResponse(requests, response, streamId, sessionId);
}
private async setupStreamingResponse(
requests: JSONRPCRequest[],
response: ServerResponse,
streamId: string,
sessionId?: string
): Promise<void> {
// Configure response headers
const headers: Record<string, string> = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
"Connection": "keep-alive",
};
if (sessionId) {
headers["mcp-session-id"] = sessionId;
}
response.writeHead(200, headers);
// Register stream and request mappings
this.streams.set(streamId, response);
for (const request of requests) {
this.requestToStreamMapping.set(request.id, streamId);
}
// Handle connection cleanup
response.on("close", () => {
this.cleanup(streamId, requests.map(r => r.id));
});
// Process requests
for (const request of requests) {
this.processRequest(request, sessionId);
}
}
async sendMessage(
message: JSONRPCMessage,
options?: { relatedRequestId?: RequestId }
): Promise<void> {
const requestId = options?.relatedRequestId ||
(isJSONRPCResponse(message) || isJSONRPCError(message) ? message.id : undefined);
if (!requestId) {
// Broadcast to all streams (server-initiated messages)
await this.broadcastToAllStreams(message);
return;
}
const streamId = this.requestToStreamMapping.get(requestId);
if (!streamId) {
throw new McpError(
ErrorCode.InternalError,
`No stream found for request ID: ${requestId}`
);
}
const response = this.streams.get(streamId);
if (!response) {
throw new McpError(
ErrorCode.InternalError,
`Stream connection lost for: ${streamId}`
);
}
// Write SSE event
this.writeSSEEvent(response, message);
// Check if all responses for this stream are complete
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {
this.pendingResponses.set(requestId, message);
await this.checkStreamCompletion(streamId);
}
}
private writeSSEEvent(
response: ServerResponse,
message: JSONRPCMessage,
eventId?: string
): void {
let eventData = `event: message\n`;
if (eventId) {
eventData += `id: ${eventId}\n`;
}
eventData += `data: ${JSON.stringify(message)}\n\n`;
if (!response.write(eventData)) {
console.warn("Failed to write SSE event - connection may be closed");
}
}
private async checkStreamCompletion(streamId: string): Promise<void> {
// Get all requests for this stream
const relatedRequests = Array.from(this.requestToStreamMapping.entries())
.filter(([_, sid]) => sid === streamId)
.map(([requestId]) => requestId);
// Check if all requests have responses
const allResponsesReady = relatedRequests.every(
requestId => this.pendingResponses.has(requestId)
);
if (allResponsesReady) {
const response = this.streams.get(streamId);
if (response) {
response.end(); // Close SSE stream
}
// Clean up
this.cleanup(streamId, relatedRequests);
}
}
private cleanup(streamId: string, requestIds: RequestId[]): void {
this.streams.delete(streamId);
for (const requestId of requestIds) {
this.requestToStreamMapping.delete(requestId);
this.pendingResponses.delete(requestId);
}
}
}
```
## Advanced Message Patterns
### Request Batching and Concurrency
```typescript
// ✅ DO: Implement sophisticated message batching
class MessageBatchProcessor {
private batchQueues = new Map<string, JSONRPCMessage[]>();
private batchTimers = new Map<string, NodeJS.Timeout>();
private readonly batchSize = 10;
private readonly batchDelay = 100; // ms
constructor(private transport: Transport) {}
async processMessage(
message: JSONRPCMessage,
sessionId?: string
): Promise<void> {
const queueKey = sessionId || 'default';
// Add to batch queue
let queue = this.batchQueues.get(queueKey);
if (!queue) {
queue = [];
this.batchQueues.set(queueKey, queue);
}
queue.push(message);
// Process immediately if batch is full
if (queue.length >= this.batchSize) {
await this.processBatch(queueKey);
return;
}
// Set/reset timer for delayed processing
const existingTimer = this.batchTimers.get(queueKey);
if (existingTimer) {
clearTimeout(existingTimer);
}
const timer = setTimeout(() => {
this.processBatch(queueKey);
}, this.batchDelay);
this.batchTimers.set(queueKey, timer);
}
private async processBatch(queueKey: string): Promise<void> {
const queue = this.batchQueues.get(queueKey);
if (!queue || queue.length === 0) {
return;
}
// Clear timer
const timer = this.batchTimers.get(queueKey);
if (timer) {
clearTimeout(timer);
this.batchTimers.delete(queueKey);
}
// Take messages from queue
const messages = queue.splice(0);
try {
// Group by message type for optimal processing
const requests = messages.filter(isJSONRPCRequest);
const notifications = messages.filter(isJSONRPCNotification);
const responses = messages.filter(
msg => isJSONRPCResponse(msg) || isJSONRPCError(msg)
);
// Process each group
await Promise.all([
this.processRequests(requests),
this.processNotifications(notifications),
this.processResponses(responses),
]);
} catch (error) {
console.error("Error processing message batch:", error);
// Send error responses for failed requests
for (const message of messages) {
if (isJSONRPCRequest(message)) {
await this.transport.send({
jsonrpc: "2.0",
id: message.id,
error: {
code: ErrorCode.InternalError,
message: "Batch processing failed",
data: error.message,
},
});
}
}
}
}
private async processRequests(requests: JSONRPCRequest[]): Promise<void> {
// Process requests concurrently with limit
const concurrencyLimit = 5;
const semaphore = new Semaphore(concurrencyLimit);
await Promise.allSettled(
requests.map(async request => {
await semaphore.acquire();
try {
await this.processRequest(request);
} finally {
semaphore.release();
}
})
);
}
private async processNotifications(
notifications: JSONRPCNotification[]
): Promise<void> {
// Notifications can be processed in parallel
await Promise.allSettled(
notifications.map(notification => this.processNotification(notification))
);
}
private async processResponses(
responses: (JSONRPCResponse | JSONRPCError)[]
): Promise<void> {
// Responses need to be matched with pending requests
for (const response of responses) {
await this.processResponse(response);
}
}
}
// ❌ DON'T: Process messages one by one without batching
class BadMessageProcessor {
async processMessage(message: JSONRPCMessage) {
// Inefficient: no batching, no concurrency control
await this.handleSingleMessage(message);
}
}
```
### Circuit Breaker Pattern
```typescript
// ✅ DO: Implement circuit breaker for resilient messaging
class CircuitBreaker {
private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';
private failureCount = 0;
private lastFailureTime = 0;
private nextAttemptTime = 0;
constructor(
private failureThreshold: number = 5,
private recoveryTimeout: number = 60000, // 1 minute
private successThreshold: number = 3
) {}
async execute<T>(operation: () => Promise<T>): Promise<T> {
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttemptTime) {
throw new McpError(
ErrorCode.InternalError,
"Circuit breaker is OPEN - service unavailable"
);
}
// Try transitioning to HALF_OPEN
this.state = 'HALF_OPEN';
}
try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess(): void {
if (this.state === 'HALF_OPEN') {
this.failureCount = Math.max(0, this.failureCount - 1);
if (this.failureCount <= this.successThreshold) {
this.state = 'CLOSED';
this.failureCount = 0;
}
} else {
this.failureCount = 0;
}
}
private onFailure(): void {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
this.nextAttemptTime = Date.now() + this.recoveryTimeout;
}
}
getState(): { state: string; failureCount: number } {
return {
state: this.state,
failureCount: this.failureCount,
};
}
}
```
## Security and Performance Patterns
### Rate Limiting
```typescript
// ✅ DO: Implement comprehensive rate limiting
class RateLimiter {
private windows = new Map<string, { count: number; resetTime: number }>();
constructor(
private maxRequests: number = 100,
private windowMs: number = 60000 // 1 minute
) {}
async checkLimit(identifier: string): Promise<boolean> {
const now = Date.now();
const window = this.windows.get(identifier);
if (!window || now >= window.resetTime) {
// Create new window
this.windows.set(identifier, {
count: 1,
resetTime: now + this.windowMs,
});
return true;
}
if (window.count >= this.maxRequests) {
return false; // Rate limit exceeded
}
window.count++;
return true;
}
getRemainingRequests(identifier: string): number {
const window = this.windows.get(identifier);
if (!window || Date.now() >= window.resetTime) {
return this.maxRequests;
}
return Math.max(0, this.maxRequests - window.count);
}
cleanup(): void {
const now = Date.now();
for (const [key, window] of this.windows) {
if (now >= window.resetTime) {
this.windows.delete(key);
}
}
}
}
```
### Message Validation
```typescript
// ✅ DO: Implement comprehensive message validation
class MessageValidator {
private schemas = new Map<string, z.ZodSchema>();
constructor() {
this.registerDefaultSchemas();
}
private registerDefaultSchemas(): void {
this.schemas.set("sampling/createMessage", CreateMessageRequestSchema);
this.schemas.set("roots/list", ListRootsRequestSchema);
this.schemas.set("resources/read", ReadResourceRequestSchema);
this.schemas.set("tools/call", CallToolRequestSchema);
// Add more as needed
}
validateMessage(message: JSONRPCMessage): ValidationResult {
// Basic JSON-RPC validation
const basicValidation = JSONRPCMessageSchema.safeParse(message);
if (!basicValidation.success) {
return {
valid: false,
errors: basicValidation.error.issues,
};
}
// Method-specific validation for requests
if (isJSONRPCRequest(message)) {
return this.validateRequest(message);
}
return { valid: true };
}
private validateRequest(request: JSONRPCRequest): ValidationResult {
const schema = this.schemas.get(request.method);
if (!schema) {
return {
valid: false,
errors: [{ message: `Unknown method: ${request.method}` }],
};
}
const validation = schema.safeParse(request);
if (!validation.success) {
return {
valid: false,
errors: validation.error.issues,
};
}
return { valid: true };
}
registerSchema(method: string, schema: z.ZodSchema): void {
this.schemas.set(method, schema);
}
}
interface ValidationResult {
valid: boolean;
errors?: Array<{ message: string; path?: string[] }>;
}
```
## Testing Patterns
### Advanced Testing Setup
```typescript
// ✅ DO: Implement comprehensive test patterns
describe('Advanced MCP Patterns', () => {
let samplingHandler: AdvancedSamplingHandler;
let rootsManager: RootsManager;
let streamMultiplexer: StreamMultiplexer;
let mockTransport: MockTransport;
beforeEach(() => {
mockTransport = new MockTransport();
samplingHandler = new AdvancedSamplingHandler(
new MockLLMProvider(),
new MockContextManager()
);
rootsManager = new RootsManager(new MockMcpServer());
streamMultiplexer = new StreamMultiplexer(new MockSessionManager());
});
describe('Sampling Handler', () => {
it('should validate sampling requests', async () => {
const invalidRequest = {
messages: [], // Empty messages
maxTokens: -1, // Invalid maxTokens
};
await expect(
samplingHandler.createMessage(invalidRequest.messages, invalidRequest)
).rejects.toThrow(McpError);
});
it('should enhance messages with context', async () => {
const messages: SamplingMessage[] = [{
role: "user",
content: { type: "text", text: "Hello" }
}];
const result = await samplingHandler.createMessage(messages, {
maxTokens: 100,
includeContext: "thisServer"
});
expect(result.model).toBeDefined();
expect(result.content.type).toBe("text");
});
it('should handle model preferences', async () => {
const messages: SamplingMessage[] = [{
role: "user",
content: { type: "text", text: "Hello" }
}];
const context = {
maxTokens: 100,
modelPreferences: {
hints: [{ name: "fast-model" }],
speedPriority: 0.8,
costPriority: 0.2,
}
};
const result = await samplingHandler.createMessage(messages, context);
expect(result.model).toBe("fast-model");
});
});
describe('Roots Manager', () => {
it('should validate root URIs', async () => {
const invalidRoot = {
uri: "http://example.com", // Should be file://
name: "Invalid Root"
};
await expect(
rootsManager.addRoot(invalidRoot)
).rejects.toThrow(McpError);
});
it('should setup file watchers', async () => {
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'mcp-test-'));
const root = {
uri: `file://${tempDir}`,
name: "Test Root",
watchForChanges: true
};
await rootsManager.addRoot(root);
// Verify watcher is active
const roots = rootsManager.getRoots();
expect(roots).toHaveLength(1);
expect(roots[0].uri).toBe(root.uri);
// Cleanup
await rootsManager.cleanup();
await fs.rm(tempDir, { recursive: true });
});
});
describe('Stream Multiplexer', () => {
it('should handle concurrent requests', async () => {
const mockResponse = new MockServerResponse();
const requests: JSONRPCRequest[] = [
{ jsonrpc: "2.0", method: "test1", params: {}, id: "1" },
{ jsonrpc: "2.0", method: "test2", params: {}, id: "2" },
];
await streamMultiplexer.handleIncomingRequest(
requests,
mockResponse
);
// Verify SSE headers
expect(mockResponse.headers["Content-Type"]).toBe("text/event-stream");
expect(mockResponse.statusCode).toBe(200);
});
it('should handle session validation', async () => {
const mockResponse = new MockServerResponse();
const request: JSONRPCRequest = {
jsonrpc: "2.0",
method: "test",
params: {},
id: "1"
};
await streamMultiplexer.handleIncomingRequest(
[request],
mockResponse,
"invalid-session"
);
expect(mockResponse.statusCode).toBe(401);
});
});
});
// Mock implementations for testing
class MockTransport implements Transport {
onmessage?: (message: JSONRPCMessage) => void;
onerror?: (error: Error) => void;
onclose?: () => void;
async start(): Promise<void> {}
async close(): Promise<void> {}
async send(message: JSONRPCMessage): Promise<void> {
this.onmessage?.(message);
}
}
class MockLLMProvider {
getDefaultModel(): string {
return "default-model";
}
async selectOptimalModel(preferences: any): string {
return preferences.hints?.[0]?.name || "default-model";
}
async generate(params: any): Promise<any> {
return {
model: params.model,
content: "Mock response",
stopReason: "endTurn"
};
}
}
```
## Best Practices Summary
### Advanced Architecture
- Implement proper sampling with context enhancement and model selection
- Use comprehensive roots management with file system validation and watching
- Build resumable streaming with persistent event stores
- Design sophisticated message multiplexing with session management
### Performance Optimization
- Use circuit breakers for resilient service communication
- Implement message batching and concurrent processing
- Apply rate limiting and resource throttling
- Cache frequently accessed data with appropriate TTL
### Security Implementation
- Validate all incoming messages with appropriate schemas
- Implement proper session management and authentication
- Use file system access controls and path validation
- Apply security constraints for resource access
### Error Handling
- Use structured error types with appropriate error codes
- Implement comprehensive logging and monitoring
- Provide detailed error context for debugging
- Handle edge cases and recovery scenarios
### Testing Strategy
- Test all advanced patterns with comprehensive scenarios
- Use mocks for external dependencies and complex interactions
- Verify error conditions and edge cases
- Test concurrent operations and race conditions
## References
- [MCP TypeScript SDK Documentation](mdc:https:/modelcontextprotocol.io/docs/sdk/typescript)
- [Streamable HTTP Transport Specification](mdc:https:/spec.modelcontextprotocol.io/specification/basic/transports/#streamable-http-transport)
- [Server-Sent Events Specification](mdc:https:/html.spec.whatwg.org/multipage/server-sent-events.html)
- [JSON-RPC 2.0 Specification](mdc:https:/www.jsonrpc.org/specification)
- [Circuit Breaker Pattern](mdc:https:/martinfowler.com/bliki/CircuitBreaker.html)