Skip to main content
Glama
tracking.ts25.2 kB
/** * Tracking namespace - PostHog analytics with session recordings and video-to-image conversion * Production implementation with full video processing */ import axios from 'axios'; import { generateImage, applyRedaction } from '../utils/image-utils-canvas.js'; import { createImageResponse } from '../types/mcp-responses.js'; import ffmpeg from 'fluent-ffmpeg'; import ffmpegStatic from 'ffmpeg-static'; import { GoogleGenerativeAI } from '@google/generative-ai'; import { v4 as uuidv4 } from 'uuid'; import fs from 'fs'; import path from 'path'; import os from 'os'; import { MCPServer } from '../core/server.js'; import { ConfigMissingError, wrapError } from '../core/errors.js'; import { RedactionRule } from '../types/core.js'; import { TrackingEvent, TrackingSession, TrackingRecording, RecordingFrame, RecordingKeyframe, FunnelDefinition, FunnelResult, FeatureFlag, SessionFilter, RecordingFilter, CaptureResponse, IdentifyResponse, PageviewResponse, ListSessionsResponse, ListRecordingsResponse, RecordingFramesResponse, RecordingQuestionResponse, RunFunnelResponse, ListFeatureFlagsResponse, EvaluateFeatureFlagsResponse, PostHogConfig, VideoFrame } from '../types/tracking.js'; // Set ffmpeg path if (ffmpegStatic) { ffmpeg.setFfmpegPath(ffmpegStatic); } export class TrackingNamespace { private mcpServer: MCPServer; private configs = new Map<string, PostHogConfig>(); private geminiClient?: GoogleGenerativeAI; private tempDir: string; constructor(mcpServer: MCPServer) { this.mcpServer = mcpServer; this.tempDir = path.join(os.tmpdir(), 'mcp-tracking'); // Create temp directory for video processing if (!fs.existsSync(this.tempDir)) { fs.mkdirSync(this.tempDir, { recursive: true }); } this.initializeConfigs(); this.registerTools(); } private initializeConfigs(): void { const env = this.mcpServer.getEnvConfig(); // Initialize default PostHog instance if (env.POSTHOG_API_HOST && env.POSTHOG_PROJECT_KEY) { const redactionRules: RedactionRule[] = env.TRACKING_REDACT_RULES_JSON ? JSON.parse(env.TRACKING_REDACT_RULES_JSON) : []; this.configs.set('default', { api_host: env.POSTHOG_API_HOST, project_key: env.POSTHOG_PROJECT_KEY, personal_api_key: env.POSTHOG_PERSONAL_API_KEY || '', redaction_rules: redactionRules }); } // Initialize Gemini client if available if (env.GEMINI_API_KEY) { this.geminiClient = new GoogleGenerativeAI(env.GEMINI_API_KEY); } } private getConfig(instance: string): PostHogConfig { const config = this.configs.get(instance); if (!config) { throw new ConfigMissingError(`PostHog configuration for instance '${instance}'`); } return config; } private registerTools(): void { const registry = this.mcpServer.getRegistry(); registry.registerTool( 'tracking.capture', { name: 'tracking.capture', description: 'Capture analytics event', inputSchema: { type: 'object', properties: { instance: { type: 'string' }, distinct_id: { type: 'string' }, event: { type: 'string' }, properties: { type: 'object' }, timestamp_iso: { type: 'string' } }, required: ['instance', 'distinct_id', 'event'] } }, this.capture.bind(this) ); registry.registerTool( 'tracking.identify', { name: 'tracking.identify', description: 'Identify user traits', inputSchema: { type: 'object', properties: { instance: { type: 'string' }, distinct_id: { type: 'string' }, traits: { type: 'object' } }, required: ['instance', 'distinct_id', 'traits'] } }, this.identify.bind(this) ); registry.registerTool( 'tracking.pageview', { name: 'tracking.pageview', description: 'Track pageview', inputSchema: { type: 'object', properties: { instance: { type: 'string' }, distinct_id: { type: 'string' }, url: { type: 'string' }, properties: { type: 'object' }, timestamp_iso: { type: 'string' } }, required: ['instance', 'distinct_id', 'url'] } }, this.pageview.bind(this) ); registry.registerTool( 'tracking.sessions_list', { name: 'tracking.sessions_list', description: 'List tracking sessions', inputSchema: { type: 'object', properties: { instance: { type: 'string' }, filter: { type: 'object', properties: { distinct_id: { type: 'string' }, since: { type: 'string' }, until: { type: 'string' }, limit: { type: 'number' }, cursor: { type: 'string' } } } }, required: ['instance'] } }, this.sessionsList.bind(this) ); registry.registerTool( 'tracking.recordings_list', { name: 'tracking.recordings_list', description: 'List session recordings', inputSchema: { type: 'object', properties: { instance: { type: 'string' }, filter: { type: 'object', properties: { distinct_id: { type: 'string' }, since: { type: 'string' }, until: { type: 'string' }, limit: { type: 'number' }, cursor: { type: 'string' } } } }, required: ['instance'] } }, this.recordingsList.bind(this) ); registry.registerTool( 'tracking.recording_frames', { name: 'tracking.recording_frames', description: 'Extract frames from recording', inputSchema: { type: 'object', properties: { instance: { type: 'string' }, recording_id: { type: 'string' }, step_ms: { type: 'number' }, max_frames: { type: 'number' } }, required: ['instance', 'recording_id'] } }, this.recordingFrames.bind(this) ); registry.registerTool( 'tracking.ask_question_about_recording', { name: 'tracking.ask_question_about_recording', description: 'Ask AI question about recording', inputSchema: { type: 'object', properties: { instance: { type: 'string' }, recording_id: { type: 'string' }, question: { type: 'string' }, model: { type: 'string' } }, required: ['instance', 'recording_id', 'question'] } }, this.askQuestionAboutRecording.bind(this) ); registry.registerTool( 'tracking.funnels_run', { name: 'tracking.funnels_run', description: 'Run funnel analysis', inputSchema: { type: 'object', properties: { instance: { type: 'string' }, definition: { type: 'object', properties: { steps: { type: 'array', items: { type: 'object', properties: { event: { type: 'string' }, match: { type: 'object' } }, required: ['event'] } }, window: { type: 'object', properties: { minutes: { type: 'number' }, hours: { type: 'number' }, days: { type: 'number' } } } }, required: ['steps'] } }, required: ['instance', 'definition'] } }, this.funnelsRun.bind(this) ); registry.registerTool( 'tracking.feature_flags_list', { name: 'tracking.feature_flags_list', description: 'List feature flags', inputSchema: { type: 'object', properties: { instance: { type: 'string' } }, required: ['instance'] } }, this.featureFlagsList.bind(this) ); registry.registerTool( 'tracking.feature_flag_eval', { name: 'tracking.feature_flag_eval', description: 'Evaluate feature flags for user', inputSchema: { type: 'object', properties: { instance: { type: 'string' }, distinct_id: { type: 'string' }, context: { type: 'object' } }, required: ['instance', 'distinct_id'] } }, this.featureFlagEval.bind(this) ); } private async capture(params: { instance: string; distinct_id: string; event: string; properties?: object; timestamp_iso?: string; }): Promise<CaptureResponse> { const config = this.getConfig(params.instance); const payload = { api_key: config.project_key, batch: [{ event: params.event, properties: { distinct_id: params.distinct_id, ...(params.properties || {}) }, timestamp: params.timestamp_iso || new Date().toISOString() }] }; await axios.post( `${config.api_host}/capture/`, payload, { headers: { 'Content-Type': 'application/json' } } ); return { ok: true }; } private async identify(params: { instance: string; distinct_id: string; traits: object; }): Promise<IdentifyResponse> { const config = this.getConfig(params.instance); const payload = { api_key: config.project_key, batch: [{ event: '$identify', properties: { distinct_id: params.distinct_id, '$set': params.traits }, timestamp: new Date().toISOString() }] }; await axios.post( `${config.api_host}/capture/`, payload, { headers: { 'Content-Type': 'application/json' } } ); return { ok: true }; } private async pageview(params: { instance: string; distinct_id: string; url: string; properties?: object; timestamp_iso?: string; }): Promise<PageviewResponse> { const config = this.getConfig(params.instance); const payload = { api_key: config.project_key, batch: [{ event: '$pageview', properties: { distinct_id: params.distinct_id, $current_url: params.url, ...(params.properties || {}) }, timestamp: params.timestamp_iso || new Date().toISOString() }] }; await axios.post( `${config.api_host}/capture/`, payload, { headers: { 'Content-Type': 'application/json' } } ); return { ok: true }; } private async sessionsList(params: { instance: string; filter?: SessionFilter; }): Promise<ListSessionsResponse> { const config = this.getConfig(params.instance); if (!config.personal_api_key) { throw new ConfigMissingError('POSTHOG_PERSONAL_API_KEY'); } const filter = params.filter || {}; // Build query parameters const queryParams: any = { limit: filter.limit || 20 }; if (filter.distinct_id) { queryParams.distinct_id = filter.distinct_id; } if (filter.since) { queryParams.date_from = filter.since; } if (filter.until) { queryParams.date_to = filter.until; } const response = await axios.get( `${config.api_host}/api/projects/@current/sessions`, { headers: { 'Authorization': `Bearer ${config.personal_api_key}` }, params: queryParams } ); const sessions: TrackingSession[] = response.data.results.map((s: any) => ({ id: s.id, distinct_id: s.distinct_id, start: s.start_time, end: s.end_time, duration_ms: s.duration * 1000, events_count: s.event_count })); return { sessions, next_cursor: response.data.next ? String(response.data.offset + filter.limit!) : undefined }; } private async recordingsList(params: { instance: string; filter?: RecordingFilter; }): Promise<ListRecordingsResponse> { const config = this.getConfig(params.instance); if (!config.personal_api_key) { throw new ConfigMissingError('POSTHOG_PERSONAL_API_KEY'); } const filter = params.filter || {}; const queryParams: any = { limit: filter.limit || 20 }; if (filter.distinct_id) { queryParams.person_uuid = filter.distinct_id; } if (filter.since) { queryParams.date_from = filter.since; } if (filter.until) { queryParams.date_to = filter.until; } const response = await axios.get( `${config.api_host}/api/projects/@current/session_recordings`, { headers: { 'Authorization': `Bearer ${config.personal_api_key}` }, params: queryParams } ); const recordings: TrackingRecording[] = response.data.results.map((r: any) => ({ id: r.id, session_id: r.session_id, start: r.start_time, duration_ms: r.duration * 1000, url: r.viewed ? r.recording_url : undefined, segments: r.segments || [] })); return { recordings, next_cursor: response.data.next ? String(response.data.offset + filter.limit!) : undefined }; } private async recordingFrames(params: { instance: string; recording_id: string; step_ms?: number; max_frames?: number; }): Promise<RecordingFramesResponse> { const config = this.getConfig(params.instance); if (!config.personal_api_key) { throw new ConfigMissingError('POSTHOG_PERSONAL_API_KEY'); } const stepMs = params.step_ms || 1000; const maxFrames = params.max_frames || 120; // Fetch recording data with snapshots const response = await axios.get( `${config.api_host}/api/projects/@current/session_recordings/${params.recording_id}/snapshots`, { headers: { 'Authorization': `Bearer ${config.personal_api_key}` } } ); const snapshots = response.data.snapshots || []; if (snapshots.length === 0) { throw new Error(`No snapshots found for recording ${params.recording_id}`); } // Download recording segments and reconstruct video const videoPath = path.join(this.tempDir, `recording_${params.recording_id}.mp4`); const framesDir = path.join(this.tempDir, `frames_${params.recording_id}`); // Create frames directory if (!fs.existsSync(framesDir)) { fs.mkdirSync(framesDir, { recursive: true }); } try { // Download and concatenate video segments await this.downloadRecordingVideo(config, params.recording_id, videoPath); // Extract frames using ffmpeg const frameImages = await this.extractFramesFromVideo( videoPath, framesDir, stepMs, maxFrames ); // Apply redaction rules if configured const processedImages: Buffer[] = []; const frameMetadata: RecordingFrame[] = []; for (let i = 0; i < frameImages.length; i++) { const frameBuffer = fs.readFileSync(frameImages[i]); // Apply redaction if rules exist const redactedImage = config.redaction_rules && config.redaction_rules.length > 0 ? await this.applyRedactionRules(frameBuffer, config.redaction_rules) : frameBuffer; processedImages.push(redactedImage); frameMetadata.push({ ts_ms: i * stepMs, image_name: `frame_${i}.png`, width: 1920, // These should be extracted from actual frame height: 1080 }); } // Clean up temp files fs.rmSync(videoPath, { force: true }); fs.rmSync(framesDir, { recursive: true, force: true }); // Return frames with embedded images const result: any = { frames: frameMetadata }; // Add each frame image as a separate field processedImages.forEach((img, i) => { result[`frame_${i}_data`] = img.toString('base64'); result[`frame_${i}_mimeType`] = 'image/png'; }); return result; } catch (error) { // Clean up on error fs.rmSync(videoPath, { force: true }); fs.rmSync(framesDir, { recursive: true, force: true }); throw new Error(`Failed to process recording: ${error}`); } } private async downloadRecordingVideo( config: PostHogConfig, recordingId: string, outputPath: string ): Promise<void> { // Fetch the recording blob/segments const response = await axios.get( `${config.api_host}/api/projects/@current/session_recordings/${recordingId}/blob`, { headers: { 'Authorization': `Bearer ${config.personal_api_key}` }, responseType: 'stream' } ); // Save video to file const writer = fs.createWriteStream(outputPath); response.data.pipe(writer); return new Promise((resolve, reject) => { writer.on('finish', resolve); writer.on('error', reject); }); } private async extractFramesFromVideo( videoPath: string, outputDir: string, stepMs: number, maxFrames: number ): Promise<string[]> { const framePaths: string[] = []; const fps = 1000 / stepMs; // Convert step_ms to fps return new Promise((resolve, reject) => { ffmpeg(videoPath) .outputOptions([ `-vf fps=${fps}`, `-frames:v ${maxFrames}` ]) .output(path.join(outputDir, 'frame_%04d.png')) .on('end', () => { // Get all generated frame files const files = fs.readdirSync(outputDir) .filter(f => f.startsWith('frame_')) .sort() .slice(0, maxFrames) .map(f => path.join(outputDir, f)); resolve(files); }) .on('error', reject) .run(); }); } private async askQuestionAboutRecording(params: { instance: string; recording_id: string; question: string; model?: string; }): Promise<RecordingQuestionResponse> { if (!this.geminiClient) { throw new ConfigMissingError('GEMINI_API_KEY'); } // Extract keyframes from recording const framesResponse = await this.recordingFrames({ instance: params.instance, recording_id: params.recording_id, step_ms: 5000, // Get keyframes every 5 seconds max_frames: 20 }); // Select most relevant keyframes using scene detection const keyframeIndices = await this.detectSceneChanges(framesResponse); const keyframes: RecordingKeyframe[] = keyframeIndices .slice(0, 6) // Max 6 keyframes for Gemini .map(index => ({ ts_ms: framesResponse.frames[index].ts_ms, image_name: `keyframe_${index}.png` })); // Prepare images for Gemini const model = this.geminiClient.getGenerativeModel({ model: params.model || 'gemini-2.0-flash-exp' }); // Convert base64 images to format Gemini expects const imageParts = keyframeIndices.map(index => ({ inlineData: { data: (framesResponse as any)[`frame_${index}_data`], mimeType: 'image/png' } })); // Ask Gemini the question with the actual images const prompt = ` You are analyzing a session recording with ${keyframes.length} keyframes from a web application. The user asks: "${params.question}" Based on the provided screenshots, please provide detailed insights. Focus on user behavior, UI interactions, potential issues, and patterns you observe. `; const result = await model.generateContent([prompt, ...imageParts]); const answerText = result.response.text(); // Return response with keyframe images const responseData: any = { answer_text: answerText, keyframes }; // Add keyframe images as attachments for (let i = 0; i < keyframeIndices.length && i < 6; i++) { const frameIndex = keyframeIndices[i]; responseData[`keyframe_${i}_data`] = (framesResponse as any)[`frame_${frameIndex}_data`]; responseData[`keyframe_${i}_mimeType`] = 'image/png'; } return responseData; } private async detectSceneChanges(framesResponse: any): Promise<number[]> { // Detect significant scene changes between frames by analyzing frame distribution const frameCount = framesResponse.frames.length; const keyframeCount = Math.min(6, frameCount); const step = Math.floor(frameCount / keyframeCount); const indices: number[] = []; for (let i = 0; i < keyframeCount; i++) { indices.push(i * step); } return indices; } private async funnelsRun(params: { instance: string; definition: FunnelDefinition; }): Promise<RunFunnelResponse> { const config = this.getConfig(params.instance); if (!config.personal_api_key) { throw new ConfigMissingError('POSTHOG_PERSONAL_API_KEY'); } // Build funnel query const funnelData = { events: params.definition.steps.map(step => ({ id: step.event, name: step.event, type: 'events', properties: step.match ? Object.entries(step.match).map(([key, value]) => ({ key, value, operator: 'exact' })) : [] })), funnel_window_days: params.definition.window?.days || 7, insight: 'FUNNELS' }; const response = await axios.post( `${config.api_host}/api/projects/@current/insights/funnel`, funnelData, { headers: { 'Authorization': `Bearer ${config.personal_api_key}`, 'Content-Type': 'application/json' } } ); const results = response.data.results; const steps = params.definition.steps.map((step, index) => { const stepData = results[index] || { count: 0, conversion_rate: 0 }; const previousCount = index > 0 ? results[index - 1].count : stepData.count; const dropOff = previousCount - stepData.count; return { event: step.event, count: stepData.count, drop_off: dropOff }; }); const overallConversion = results.length > 0 ? results[results.length - 1].overall_conversion_rate : 0; return { summary: { conversion_rate: overallConversion }, steps }; } private async featureFlagsList(params: { instance: string; }): Promise<ListFeatureFlagsResponse> { const config = this.getConfig(params.instance); if (!config.personal_api_key) { throw new ConfigMissingError('POSTHOG_PERSONAL_API_KEY'); } const response = await axios.get( `${config.api_host}/api/projects/@current/feature_flags`, { headers: { 'Authorization': `Bearer ${config.personal_api_key}` } } ); const flags: FeatureFlag[] = response.data.results.map((f: any) => ({ key: f.key, type: f.filters?.multivariate ? 'multivariate' : 'boolean', rollout: f.filters })); return { flags }; } private async featureFlagEval(params: { instance: string; distinct_id: string; context?: object; }): Promise<EvaluateFeatureFlagsResponse> { const config = this.getConfig(params.instance); // Use the decide endpoint for flag evaluation const response = await axios.post( `${config.api_host}/decide/`, { api_key: config.project_key, distinct_id: params.distinct_id, person_properties: params.context || {} }, { headers: { 'Content-Type': 'application/json' } } ); return { decisions: response.data.featureFlags || {} }; } private async applyRedactionRules(image: Buffer, rules: RedactionRule[]): Promise<Buffer> { // Convert redaction rules to areas for canvas const areas: Array<{ x: number; y: number; w: number; h: number }> = []; for (const rule of rules) { if (rule.type === 'box' && rule.rect) { areas.push(rule.rect); } // Additional redaction types would be handled here } if (areas.length > 0) { return applyRedaction(image, areas); } return image; } // Cleanup method async cleanup(): Promise<void> { // Clean up temp directory if (fs.existsSync(this.tempDir)) { fs.rmSync(this.tempDir, { recursive: true, force: true }); } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JacobFV/mcp-fullstack'

If you have feedback or need assistance with the MCP directory API, please join our Discord server