Skip to main content
Glama
azure-blob-streamer.ts17.9 kB
/** * Azure Blob Streamer Module * Streams blobs from Azure Storage via HTTPS (no SDK dependency) * Part of DXP-110 implementation */ import * as https from 'https'; import * as zlib from 'zlib'; import { URL } from 'url'; import OutputLogger from './output-logger'; // Type definitions interface StreamOptions { debug?: boolean; } interface StreamStats { bytesDownloaded: number; linesProcessed: number; duration: number; throughput: number; } interface BlobCountResult { count: number; estimated: boolean; pages: number; } interface BlobPageResult { blobNames: string[]; nextMarker: string | null; } interface FilterOptions { minutesBack?: number; startDateTime?: string; endDateTime?: string; debug?: boolean; // DXP-189: Enhanced debug logging for date filtering } type LineHandler = (line: string) => Promise<void>; class AzureBlobStreamer { /** * Stream a blob and process line-by-line */ static async streamBlob(sasUrl: string, lineHandler: LineHandler, options: StreamOptions = {}): Promise<StreamStats> { const { debug = false } = options; return new Promise((resolve, reject) => { const startTime = Date.now(); let bytesDownloaded = 0; let linesProcessed = 0; let buffer = ''; const parsedUrl = new URL(sasUrl); const requestOptions = { hostname: parsedUrl.hostname, path: parsedUrl.pathname + parsedUrl.search, method: 'GET', headers: { 'Accept-Encoding': 'gzip' } }; const req = https.request(requestOptions, (res) => { if (res.statusCode !== 200) { reject(new Error(`HTTP ${res.statusCode}: ${res.statusMessage}`)); return; } // Handle gzip compression const isGzipped = res.headers['content-encoding'] === 'gzip'; const stream = isGzipped ? res.pipe(zlib.createGunzip()) : res; stream.on('data', async (chunk: Buffer) => { bytesDownloaded += chunk.length; buffer += chunk.toString('utf8'); // Process complete lines const lines = buffer.split('\n'); buffer = lines.pop() || ''; // Keep incomplete line in buffer for (const line of lines) { if (line.trim()) { try { await lineHandler(line); linesProcessed++; } catch (error) { if (debug) { OutputLogger.debug(`Error processing line: ${(error as Error).message}`); } } } } }); stream.on('end', async () => { // Process final line if exists if (buffer.trim()) { try { await lineHandler(buffer); linesProcessed++; } catch (error) { if (debug) { OutputLogger.debug(`Error processing final line: ${(error as Error).message}`); } } } const duration = Date.now() - startTime; resolve({ bytesDownloaded, linesProcessed, duration, throughput: Math.round(bytesDownloaded / (duration / 1000)) }); }); stream.on('error', reject); }); req.on('error', reject); req.end(); }); } /** * Count blobs in a container (quick diagnostic) * Fetches only first few pages to estimate size */ static async countBlobs(containerSasUrl: string, maxPages: number = 3): Promise<BlobCountResult> { const parsedUrl = new URL(containerSasUrl); let count = 0; let marker: string | null = null; let pageCount = 0; OutputLogger.info(`🔢 Counting blobs (max ${maxPages} pages)...`); try { do { pageCount++; // DXP-179: Fix double ?? bug - parsedUrl.search already includes leading ? let listUrl = `${parsedUrl.protocol}//${parsedUrl.hostname}${parsedUrl.pathname}${parsedUrl.search}&restype=container&comp=list`; if (marker) { listUrl += `&marker=${encodeURIComponent(marker)}`; } const { blobNames, nextMarker } = await this._fetchBlobPage(listUrl); count += blobNames.length; marker = nextMarker; OutputLogger.info(`📄 Page ${pageCount}: ${blobNames.length} blobs (total: ${count}, hasMore: ${!!nextMarker})`); if (pageCount >= maxPages) { break; } } while (marker); const estimated = marker !== null; // Still has more pages OutputLogger.info(`✅ ${estimated ? 'Estimated' : 'Exact'} count: ${count}${estimated ? '+' : ''} blobs across ${pageCount} pages`); return { count, estimated, pages: pageCount }; } catch (error) { throw new Error(`Failed to count blobs: ${(error as Error).message}`); } } /** * List blobs in a container and get their SAS URLs * DXP-117: Added pagination support to handle containers with >5000 blobs */ static async listBlobs(containerSasUrl: string): Promise<string[]> { const MAX_PAGES = 20; // Limit to 100K blobs (5000 per page) const MAX_PAGES_FOR_SHORT_QUERIES = 5; // For minutesBack < 60, only fetch 25K blobs const parsedUrl = new URL(containerSasUrl); const allBlobNames: string[] = []; let marker: string | null = null; let pageCount = 0; OutputLogger.info(`📋 Starting blob list operation...`); const startTime = Date.now(); try { do { pageCount++; const pageStartTime = Date.now(); OutputLogger.info(`📄 Fetching page ${pageCount}...`); // Build URL with optional marker for pagination // DXP-179: Fix double ?? bug - parsedUrl.search already includes leading ? let listUrl = `${parsedUrl.protocol}//${parsedUrl.hostname}${parsedUrl.pathname}${parsedUrl.search}&restype=container&comp=list`; if (marker) { listUrl += `&marker=${encodeURIComponent(marker)}`; } // Fetch page of blobs const { blobNames, nextMarker } = await this._fetchBlobPage(listUrl); const pageDuration = Date.now() - pageStartTime; allBlobNames.push(...blobNames); marker = nextMarker; // DXP-117: Log pagination progress with timing OutputLogger.info(`📄 Fetched page ${pageCount}: ${blobNames.length} blobs in ${pageDuration}ms (total: ${allBlobNames.length}, hasMore: ${!!nextMarker})`); // Performance safeguard: prevent excessive pagination if (pageCount >= MAX_PAGES) { OutputLogger.warn(`⚠️ Hit max page limit (${MAX_PAGES} pages = ${MAX_PAGES * 5000} blobs). Stopping pagination.`); break; } // Early exit warning for large containers if (pageCount >= MAX_PAGES_FOR_SHORT_QUERIES && marker) { OutputLogger.warn(`⚠️ Large container detected (${allBlobNames.length}+ blobs). Consider using longer time ranges for better performance.`); } } while (marker); // Log completion const totalDuration = Date.now() - startTime; if (pageCount > 1) { OutputLogger.info(`✅ Fetched ${allBlobNames.length.toLocaleString()} blobs across ${pageCount} pages in ${totalDuration}ms`); } else { OutputLogger.info(`✅ Fetched ${allBlobNames.length.toLocaleString()} blobs (single page, no pagination needed) in ${totalDuration}ms`); } // Generate SAS URLs for all blobs OutputLogger.info(`🔗 Generating ${allBlobNames.length} SAS URLs...`); const urlStartTime = Date.now(); const blobUrls = allBlobNames.map(name => { // DXP-179: Fix double ?? bug - parsedUrl.search already includes leading ? return `${parsedUrl.protocol}//${parsedUrl.hostname}${parsedUrl.pathname}/${name}${parsedUrl.search}`; }); const urlDuration = Date.now() - urlStartTime; OutputLogger.info(`✅ Generated ${blobUrls.length} SAS URLs in ${urlDuration}ms`); return blobUrls; } catch (error) { throw new Error(`Failed to list blobs: ${(error as Error).message}`); } } /** * Fetch a single page of blobs from Azure Storage * DXP-117: Helper method for pagination support * @private */ static _fetchBlobPage(listUrl: string): Promise<BlobPageResult> { return new Promise((resolve, reject) => { https.get(listUrl, (res) => { let data = ''; res.on('data', (chunk) => { data += chunk; }); res.on('end', () => { if (res.statusCode !== 200) { reject(new Error(`HTTP ${res.statusCode}: ${res.statusMessage}`)); return; } try { // Parse XML response for blob names const blobNames: string[] = []; const nameMatches = data.matchAll(/<Name>([^<]+)<\/Name>/g); for (const match of nameMatches) { blobNames.push(match[1]); } // Extract NextMarker for pagination // Azure returns <NextMarker>value</NextMarker> when more results exist const markerMatch = data.match(/<NextMarker>([^<]*)<\/NextMarker>/); const nextMarker = markerMatch ? markerMatch[1] : null; resolve({ blobNames, nextMarker }); } catch (error) { reject(new Error(`Failed to parse blob list XML: ${(error as Error).message}`)); } }); res.on('error', reject); }).on('error', reject); }); } /** * Filter blobs by date/time range */ static filterBlobsByDate(blobUrls: string[], options: FilterOptions = {}): string[] { const { minutesBack, startDateTime, endDateTime, debug = false } = options; // Calculate time range - ALWAYS use UTC for consistency let startTime: Date, endTime: Date; if (minutesBack) { // DXP-114 FIX: Create dates in UTC to match blob timestamps const nowUtc = new Date(); endTime = nowUtc; startTime = new Date(nowUtc.getTime() - minutesBack * 60 * 1000); // DXP-114 FIX: Log timezone info for debugging OutputLogger.info(`⏰ Current time (UTC): ${endTime.toISOString()}`); OutputLogger.info(`⏰ Filtering for last ${minutesBack} minutes: ${startTime.toISOString()} to ${endTime.toISOString()}`); } else if (startDateTime && endDateTime) { startTime = new Date(startDateTime); endTime = new Date(endDateTime); OutputLogger.info(`⏰ Time range: ${startTime.toISOString()} to ${endTime.toISOString()}`); } else { // No filtering return blobUrls; } // Extract timestamps from blob names (Azure format: y=YYYY/m=MM/d=DD/h=HH/m=MM) // DEBUG: Log blob count and sample OutputLogger.info(`📊 Total blobs before filtering: ${blobUrls.length}`); if (blobUrls.length > 0) { OutputLogger.info(`🔍 Sample blob URL: ${blobUrls[0]}`); // Check if container has archived exports mixed with streaming logs const hasArchives = blobUrls.some(url => url.includes('.zip') || url.includes('.gz')); const archiveCount = blobUrls.filter(url => url.includes('.zip') || url.includes('.gz')).length; if (hasArchives) { OutputLogger.warn(`⚠️ Found ${archiveCount} archived log exports (.zip/.gz) - these will be excluded from streaming analysis`); OutputLogger.info(` Only processing standard Azure Application Insights streaming logs`); } } return blobUrls.filter(url => { // Exclude archived log exports (ZIP/GZ files) if (url.includes('.zip') || url.includes('.gz')) { OutputLogger.debug(`⏭️ Excluding archived export: ${url}`); return false; } // DXP-114: Parse path segments sequentially to avoid m= ambiguity // Split URL path and find segments by prefix const pathParts = url.split('/'); // Try new format first: y=YYYY/m=MM/d=DD/h=HH/m=MM const yearPart = pathParts.find(p => p.startsWith('y=')); const dayPart = pathParts.find(p => p.startsWith('d=')); const hourPart = pathParts.find(p => p.startsWith('h=')); // Find BOTH m= parts (month first, then minute) const monthIndex = pathParts.findIndex(p => p.startsWith('m=')); const monthPart = monthIndex >= 0 ? pathParts[monthIndex] : null; if (yearPart && monthPart && dayPart && hourPart) { const year = yearPart.substring(2); const month = monthPart.substring(2); const day = dayPart.substring(2); const hour = hourPart.substring(2); // DXP-114 FIX: Use hour-based overlap filtering // Azure blobs contain full hour of logs (PT1H.json), not just specific minute // Include blob if its hour overlaps with requested time range const blobHourStart = new Date(`${year}-${month}-${day}T${hour}:00:00Z`); const blobHourEnd = new Date(`${year}-${month}-${day}T${hour}:59:59Z`); // Check if blob's hour overlaps with requested range const overlaps = (blobHourEnd >= startTime && blobHourStart <= endTime); // DXP-189: Enhanced debug logging for filtering decisions if (debug) { if (overlaps) { OutputLogger.debug(`✅ Including blob: ${year}-${month}-${day}T${hour}:XX (overlaps ${startTime.toISOString()} to ${endTime.toISOString()})`); } else { OutputLogger.debug(`❌ Filtered out blob: ${url}`); OutputLogger.debug(` Blob hour: ${blobHourStart.toISOString()} - ${blobHourEnd.toISOString()}`); OutputLogger.debug(` Requested: ${startTime.toISOString()} - ${endTime.toISOString()}`); const blobTooOld = blobHourEnd < startTime; const blobTooNew = blobHourStart > endTime; OutputLogger.debug(` Reason: ${blobTooOld ? 'Blob ends before requested range starts' : blobTooNew ? 'Blob starts after requested range ends' : 'No overlap'}`); } } return overlaps; } // Fallback to old format for backwards compatibility: YYYY/MM/DD/HH/ const match = url.match(/\/(\d{4})\/(\d{2})\/(\d{2})\/(\d{2})\//); if (match) { const [, year, month, day, hour] = match; const blobHourStart = new Date(`${year}-${month}-${day}T${hour}:00:00Z`); const blobHourEnd = new Date(`${year}-${month}-${day}T${hour}:59:59Z`); const overlaps = (blobHourEnd >= startTime && blobHourStart <= endTime); // DXP-189: Enhanced debug logging for old format if (debug) { if (overlaps) { OutputLogger.debug(`✅ Including blob (old format): ${year}-${month}-${day}T${hour}:00`); } else { OutputLogger.debug(`❌ Filtered out blob (old format): ${url}`); OutputLogger.debug(` Blob hour: ${blobHourStart.toISOString()} - ${blobHourEnd.toISOString()}`); OutputLogger.debug(` Requested: ${startTime.toISOString()} - ${endTime.toISOString()}`); const blobTooOld = blobHourEnd < startTime; const blobTooNew = blobHourStart > endTime; OutputLogger.debug(` Reason: ${blobTooOld ? 'Blob ends before requested range starts' : blobTooNew ? 'Blob starts after requested range ends' : 'No overlap'}`); } } return overlaps; } // Exclude if can't parse date (likely old format or unexpected naming) OutputLogger.debug(`⚠️ Excluding unparseable blob: ${url}`); return false; }); } } export default AzureBlobStreamer;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JaxonDigital/optimizely-dxp-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server