Skip to main content
Glama
streaming.ts13.3 kB
/** * Streaming utilities for handling large result sets * Implements streaming responses for better performance and memory usage */ // Node.js streaming imports removed - not currently used but available for future enhancement import { sanitizeHtml } from "./validation/security.js"; export interface StreamingOptions { batchSize?: number; delay?: number; transformItem?: (item: unknown) => unknown; filterItem?: (item: unknown) => boolean; } export interface StreamingResult<T> { data: T[]; hasMore: boolean; cursor?: string | undefined; total?: number | undefined; processed: number; } /** * Creates a streaming response for large datasets */ export class DataStreamer<T> { private batchSize: number; private delay: number; private transformItem: ((item: T) => unknown) | undefined; private filterItem: ((item: T) => boolean) | undefined; constructor(options: StreamingOptions = {}) { this.batchSize = options.batchSize || 50; this.delay = options.delay || 0; this.transformItem = options.transformItem || undefined; this.filterItem = options.filterItem || undefined; } /** * Processes data in batches with streaming */ async *streamBatches<U>( data: T[], processor: (batch: T[]) => Promise<U[]>, ): AsyncGenerator<StreamingResult<U>, void, unknown> { const total = data.length; let processed = 0; for (let i = 0; i < data.length; i += this.batchSize) { const batch = data.slice(i, i + this.batchSize); // Apply filtering if provided const filteredBatch = this.filterItem ? batch.filter(this.filterItem) : batch; // Process the batch const processedBatch = await processor(filteredBatch); // Apply transformation if provided const transformedBatch = this.transformItem ? processedBatch.map((item: unknown) => this.transformItem!(item as T)) : processedBatch; processed += batch.length; const hasMore = processed < total; yield { data: transformedBatch as U[], hasMore, cursor: hasMore ? String(i + this.batchSize) : undefined, total, processed, }; // Add delay between batches if specified if (this.delay > 0 && hasMore) { await new Promise((resolve) => setTimeout(resolve, this.delay)); } } } /** * Processes large datasets with pagination */ async *streamPages<U>( fetcher: (page: number, perPage: number) => Promise<{ data: T[]; hasMore: boolean }>, processor: (items: T[]) => Promise<U[]>, ): AsyncGenerator<StreamingResult<U>, void, unknown> { let page = 1; let totalProcessed = 0; while (true) { const result = await fetcher(page, this.batchSize); if (result.data.length === 0) { break; } // Apply filtering if provided const filteredData = this.filterItem ? result.data.filter(this.filterItem) : result.data; // Process the data const processedData = await processor(filteredData); // Apply transformation if provided const transformedData = this.transformItem ? processedData.map((item: unknown) => this.transformItem!(item as T)) : processedData; totalProcessed += result.data.length; yield { data: transformedData as U[], hasMore: result.hasMore, cursor: result.hasMore ? String(page + 1) : undefined, total: undefined, // Unknown for paginated results processed: totalProcessed, }; if (!result.hasMore) { break; } page++; // Add delay between pages if specified if (this.delay > 0) { await new Promise((resolve) => setTimeout(resolve, this.delay)); } } } } /** * Streaming formatter for WordPress data */ export class WordPressDataStreamer { /** * Streams WordPress posts with author and taxonomy information */ static async *streamPosts( posts: unknown[], options: { includeAuthor?: boolean; includeCategories?: boolean; includeTags?: boolean; batchSize?: number; } = {}, ): AsyncGenerator<StreamingResult<unknown>, void, unknown> { const streamer = new DataStreamer<unknown>({ batchSize: options.batchSize || 20, transformItem: (post) => { const p = post as Record<string, unknown>; const title = p.title as Record<string, unknown> | undefined; const excerpt = p.excerpt as Record<string, unknown> | undefined; return { id: p.id, title: title?.rendered || "Untitled", excerpt: excerpt?.rendered ? sanitizeHtml(String(excerpt.rendered)).substring(0, 150) + "..." : "No excerpt", status: p.status, date: new Date(String(p.date)).toLocaleDateString(), link: p.link, author: options.includeAuthor ? p.author : undefined, categories: options.includeCategories ? p.categories : undefined, tags: options.includeTags ? p.tags : undefined, }; }, filterItem: (post) => { const p = post as Record<string, unknown>; return p.status !== "trash"; }, // Filter out trashed posts }); const processor = async (batch: unknown[]) => { // Simulate processing time for large datasets await new Promise((resolve) => setTimeout(resolve, 10)); return batch; }; for await (const result of streamer.streamBatches(posts, processor)) { yield result; } } /** * Streams WordPress users with role information */ static async *streamUsers( users: unknown[], options: { includeRoles?: boolean; includeCapabilities?: boolean; batchSize?: number; } = {}, ): AsyncGenerator<StreamingResult<unknown>, void, unknown> { const streamer = new DataStreamer<unknown>({ batchSize: options.batchSize || 30, transformItem: (user) => { const u = user as Record<string, unknown>; return { id: u.id, name: u.name || "No name", username: u.slug || "unknown", email: u.email || "No email", roles: options.includeRoles ? u.roles : undefined, capabilities: options.includeCapabilities ? Object.keys((u.capabilities as Record<string, unknown>) || {}) : undefined, registeredDate: u.registered_date ? new Date(String(u.registered_date)).toLocaleDateString() : "Unknown", }; }, }); const processor = async (batch: unknown[]) => { // Add user processing logic here if needed return batch; }; for await (const result of streamer.streamBatches(users, processor)) { yield result; } } /** * Streams WordPress comments with moderation status */ static async *streamComments( comments: unknown[], options: { includeAuthor?: boolean; includePost?: boolean; batchSize?: number; } = {}, ): AsyncGenerator<StreamingResult<unknown>, void, unknown> { const streamer = new DataStreamer<unknown>({ batchSize: options.batchSize || 40, transformItem: (comment) => { const c = comment as Record<string, unknown>; const content = c.content as Record<string, unknown> | undefined; return { id: c.id, content: content?.rendered ? sanitizeHtml(String(content.rendered)).substring(0, 200) + "..." : "No content", status: c.status, date: new Date(String(c.date)).toLocaleDateString(), author: options.includeAuthor ? { name: c.author_name, email: c.author_email, url: c.author_url, } : undefined, post: options.includePost ? c.post : undefined, }; }, filterItem: (comment) => { const c = comment as Record<string, unknown>; return c.status !== "spam"; }, // Filter out spam comments }); const processor = async (batch: unknown[]) => { // Add comment processing logic here if needed return batch; }; for await (const result of streamer.streamBatches(comments, processor)) { yield result; } } } /** * Utility functions for streaming responses */ export class StreamingUtils { /** * Formats streaming results for display */ static formatStreamingResponse( results: StreamingResult<unknown>[], type: "posts" | "users" | "comments" | "media" = "posts", ): string { const allData = results.flatMap((result) => result.data); const totalProcessed = results[results.length - 1]?.processed || 0; const hasMore = results[results.length - 1]?.hasMore || false; const typeEmojis = { posts: "📄", users: "👥", comments: "💬", media: "📎", }; const emoji = typeEmojis[type]; const typeName = type.charAt(0).toUpperCase() + type.slice(1); let response = `${emoji} **${typeName} Results** (Streamed)\n\n`; response += `📊 **Summary**: ${allData.length} items displayed, ${totalProcessed} processed total\n`; if (hasMore) { response += `⏳ **Status**: More data available (streaming in progress)\n`; } else { response += `✅ **Status**: Complete\n`; } response += `🕐 **Retrieved**: ${new Date().toLocaleString()}\n\n`; // Format individual items allData.forEach((item, index) => { // Type-safe property access const itemObj = item as Record<string, unknown>; const title = itemObj.title || itemObj.name || (itemObj.content as string)?.substring(0, 50) || "Item"; response += `${index + 1}. **${title}**\n`; if (itemObj.excerpt) response += ` 📝 ${itemObj.excerpt}\n`; if (itemObj.email) response += ` 📧 ${itemObj.email}\n`; if (itemObj.status) response += ` 🏷️ Status: ${itemObj.status}\n`; if (itemObj.date) response += ` 📅 Date: ${itemObj.date}\n`; response += "\n"; }); return response; } /** * Implements progressive loading for large datasets */ static async loadProgressively<T>( fetcher: (offset: number, limit: number) => Promise<T[]>, options: { initialLoad?: number; batchSize?: number; maxItems?: number; onProgress?: (loaded: number, total?: number) => void; } = {}, ): Promise<T[]> { const initialLoad = options.initialLoad || 50; const batchSize = options.batchSize || 25; const maxItems = options.maxItems || 1000; const results: T[] = []; let offset = 0; // Initial load const initialBatch = await fetcher(offset, initialLoad); results.push(...initialBatch); offset += initialLoad; if (options.onProgress) { options.onProgress(results.length); } // Progressive loading while (results.length < maxItems) { const batch = await fetcher(offset, batchSize); if (batch.length === 0) { break; } results.push(...batch); offset += batchSize; if (options.onProgress) { options.onProgress(results.length, maxItems); } // Small delay to prevent overwhelming the server await new Promise((resolve) => setTimeout(resolve, 100)); } return results.slice(0, maxItems); } } /** * Memory-efficient data processor */ export class MemoryEfficientProcessor { /** * Processes large datasets with memory monitoring */ static async processLargeDataset<T, U>( dataProvider: () => AsyncGenerator<T[], void, unknown>, processor: (items: T[]) => Promise<U[]>, options: { maxMemoryUsage?: number; // in MB batchSize?: number; onProgress?: (processed: number) => void; } = {}, ): Promise<U[]> { const maxMemory = options.maxMemoryUsage || 100; // 100MB default const batchSize = options.batchSize || 50; const results: U[] = []; let processed = 0; for await (const batch of dataProvider()) { // Check memory usage const memoryUsage = process.memoryUsage(); const memoryUsageMB = memoryUsage.heapUsed / 1024 / 1024; if (memoryUsageMB > maxMemory) { // If memory usage is too high, process in smaller batches const smallerBatches = this.chunkArray(batch, Math.floor(batchSize / 2)); for (const smallBatch of smallerBatches) { const processed_batch = await processor(smallBatch); results.push(...processed_batch); processed += smallBatch.length; if (options.onProgress) { options.onProgress(processed); } // Force garbage collection if available if (global.gc) { global.gc(); } } } else { // Normal processing const processed_batch = await processor(batch); results.push(...processed_batch); processed += batch.length; if (options.onProgress) { options.onProgress(processed); } } } return results; } /** * Helper method to split arrays into chunks */ private static chunkArray<T>(array: T[], chunkSize: number): T[][] { const chunks: T[][] = []; for (let i = 0; i < array.length; i += chunkSize) { chunks.push(array.slice(i, i + chunkSize)); } return chunks; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/docdyhr/mcp-wordpress'

If you have feedback or need assistance with the MCP directory API, please join our Discord server