/**
* Tracking namespace - PostHog analytics with session recordings and video-to-image conversion
* Production implementation with full video processing
*/
import axios from 'axios';
import { generateImage, applyRedaction } from '../utils/image-utils-canvas.js';
import { createImageResponse } from '../types/mcp-responses.js';
import ffmpeg from 'fluent-ffmpeg';
import ffmpegStatic from 'ffmpeg-static';
import { GoogleGenerativeAI } from '@google/generative-ai';
import { v4 as uuidv4 } from 'uuid';
import fs from 'fs';
import path from 'path';
import os from 'os';
import { MCPServer } from '../core/server.js';
import { ConfigMissingError, wrapError } from '../core/errors.js';
import { RedactionRule } from '../types/core.js';
import {
TrackingEvent,
TrackingSession,
TrackingRecording,
RecordingFrame,
RecordingKeyframe,
FunnelDefinition,
FunnelResult,
FeatureFlag,
SessionFilter,
RecordingFilter,
CaptureResponse,
IdentifyResponse,
PageviewResponse,
ListSessionsResponse,
ListRecordingsResponse,
RecordingFramesResponse,
RecordingQuestionResponse,
RunFunnelResponse,
ListFeatureFlagsResponse,
EvaluateFeatureFlagsResponse,
PostHogConfig,
VideoFrame
} from '../types/tracking.js';
// Set ffmpeg path
if (ffmpegStatic) {
ffmpeg.setFfmpegPath(ffmpegStatic);
}
export class TrackingNamespace {
private mcpServer: MCPServer;
private configs = new Map<string, PostHogConfig>();
private geminiClient?: GoogleGenerativeAI;
private tempDir: string;
constructor(mcpServer: MCPServer) {
this.mcpServer = mcpServer;
this.tempDir = path.join(os.tmpdir(), 'mcp-tracking');
// Create temp directory for video processing
if (!fs.existsSync(this.tempDir)) {
fs.mkdirSync(this.tempDir, { recursive: true });
}
this.initializeConfigs();
this.registerTools();
}
private initializeConfigs(): void {
const env = this.mcpServer.getEnvConfig();
// Initialize default PostHog instance
if (env.POSTHOG_API_HOST && env.POSTHOG_PROJECT_KEY) {
const redactionRules: RedactionRule[] = env.TRACKING_REDACT_RULES_JSON ?
JSON.parse(env.TRACKING_REDACT_RULES_JSON) : [];
this.configs.set('default', {
api_host: env.POSTHOG_API_HOST,
project_key: env.POSTHOG_PROJECT_KEY,
personal_api_key: env.POSTHOG_PERSONAL_API_KEY || '',
redaction_rules: redactionRules
});
}
// Initialize Gemini client if available
if (env.GEMINI_API_KEY) {
this.geminiClient = new GoogleGenerativeAI(env.GEMINI_API_KEY);
}
}
private getConfig(instance: string): PostHogConfig {
const config = this.configs.get(instance);
if (!config) {
throw new ConfigMissingError(`PostHog configuration for instance '${instance}'`);
}
return config;
}
private registerTools(): void {
const registry = this.mcpServer.getRegistry();
registry.registerTool(
'tracking.capture',
{
name: 'tracking.capture',
description: 'Capture analytics event',
inputSchema: {
type: 'object',
properties: {
instance: { type: 'string' },
distinct_id: { type: 'string' },
event: { type: 'string' },
properties: { type: 'object' },
timestamp_iso: { type: 'string' }
},
required: ['instance', 'distinct_id', 'event']
}
},
this.capture.bind(this)
);
registry.registerTool(
'tracking.identify',
{
name: 'tracking.identify',
description: 'Identify user traits',
inputSchema: {
type: 'object',
properties: {
instance: { type: 'string' },
distinct_id: { type: 'string' },
traits: { type: 'object' }
},
required: ['instance', 'distinct_id', 'traits']
}
},
this.identify.bind(this)
);
registry.registerTool(
'tracking.pageview',
{
name: 'tracking.pageview',
description: 'Track pageview',
inputSchema: {
type: 'object',
properties: {
instance: { type: 'string' },
distinct_id: { type: 'string' },
url: { type: 'string' },
properties: { type: 'object' },
timestamp_iso: { type: 'string' }
},
required: ['instance', 'distinct_id', 'url']
}
},
this.pageview.bind(this)
);
registry.registerTool(
'tracking.sessions_list',
{
name: 'tracking.sessions_list',
description: 'List tracking sessions',
inputSchema: {
type: 'object',
properties: {
instance: { type: 'string' },
filter: {
type: 'object',
properties: {
distinct_id: { type: 'string' },
since: { type: 'string' },
until: { type: 'string' },
limit: { type: 'number' },
cursor: { type: 'string' }
}
}
},
required: ['instance']
}
},
this.sessionsList.bind(this)
);
registry.registerTool(
'tracking.recordings_list',
{
name: 'tracking.recordings_list',
description: 'List session recordings',
inputSchema: {
type: 'object',
properties: {
instance: { type: 'string' },
filter: {
type: 'object',
properties: {
distinct_id: { type: 'string' },
since: { type: 'string' },
until: { type: 'string' },
limit: { type: 'number' },
cursor: { type: 'string' }
}
}
},
required: ['instance']
}
},
this.recordingsList.bind(this)
);
registry.registerTool(
'tracking.recording_frames',
{
name: 'tracking.recording_frames',
description: 'Extract frames from recording',
inputSchema: {
type: 'object',
properties: {
instance: { type: 'string' },
recording_id: { type: 'string' },
step_ms: { type: 'number' },
max_frames: { type: 'number' }
},
required: ['instance', 'recording_id']
}
},
this.recordingFrames.bind(this)
);
registry.registerTool(
'tracking.ask_question_about_recording',
{
name: 'tracking.ask_question_about_recording',
description: 'Ask AI question about recording',
inputSchema: {
type: 'object',
properties: {
instance: { type: 'string' },
recording_id: { type: 'string' },
question: { type: 'string' },
model: { type: 'string' }
},
required: ['instance', 'recording_id', 'question']
}
},
this.askQuestionAboutRecording.bind(this)
);
registry.registerTool(
'tracking.funnels_run',
{
name: 'tracking.funnels_run',
description: 'Run funnel analysis',
inputSchema: {
type: 'object',
properties: {
instance: { type: 'string' },
definition: {
type: 'object',
properties: {
steps: {
type: 'array',
items: {
type: 'object',
properties: {
event: { type: 'string' },
match: { type: 'object' }
},
required: ['event']
}
},
window: {
type: 'object',
properties: {
minutes: { type: 'number' },
hours: { type: 'number' },
days: { type: 'number' }
}
}
},
required: ['steps']
}
},
required: ['instance', 'definition']
}
},
this.funnelsRun.bind(this)
);
registry.registerTool(
'tracking.feature_flags_list',
{
name: 'tracking.feature_flags_list',
description: 'List feature flags',
inputSchema: {
type: 'object',
properties: {
instance: { type: 'string' }
},
required: ['instance']
}
},
this.featureFlagsList.bind(this)
);
registry.registerTool(
'tracking.feature_flag_eval',
{
name: 'tracking.feature_flag_eval',
description: 'Evaluate feature flags for user',
inputSchema: {
type: 'object',
properties: {
instance: { type: 'string' },
distinct_id: { type: 'string' },
context: { type: 'object' }
},
required: ['instance', 'distinct_id']
}
},
this.featureFlagEval.bind(this)
);
}
private async capture(params: {
instance: string;
distinct_id: string;
event: string;
properties?: object;
timestamp_iso?: string;
}): Promise<CaptureResponse> {
const config = this.getConfig(params.instance);
const payload = {
api_key: config.project_key,
batch: [{
event: params.event,
properties: {
distinct_id: params.distinct_id,
...(params.properties || {})
},
timestamp: params.timestamp_iso || new Date().toISOString()
}]
};
await axios.post(
`${config.api_host}/capture/`,
payload,
{
headers: {
'Content-Type': 'application/json'
}
}
);
return { ok: true };
}
private async identify(params: {
instance: string;
distinct_id: string;
traits: object;
}): Promise<IdentifyResponse> {
const config = this.getConfig(params.instance);
const payload = {
api_key: config.project_key,
batch: [{
event: '$identify',
properties: {
distinct_id: params.distinct_id,
'$set': params.traits
},
timestamp: new Date().toISOString()
}]
};
await axios.post(
`${config.api_host}/capture/`,
payload,
{
headers: {
'Content-Type': 'application/json'
}
}
);
return { ok: true };
}
private async pageview(params: {
instance: string;
distinct_id: string;
url: string;
properties?: object;
timestamp_iso?: string;
}): Promise<PageviewResponse> {
const config = this.getConfig(params.instance);
const payload = {
api_key: config.project_key,
batch: [{
event: '$pageview',
properties: {
distinct_id: params.distinct_id,
$current_url: params.url,
...(params.properties || {})
},
timestamp: params.timestamp_iso || new Date().toISOString()
}]
};
await axios.post(
`${config.api_host}/capture/`,
payload,
{
headers: {
'Content-Type': 'application/json'
}
}
);
return { ok: true };
}
private async sessionsList(params: {
instance: string;
filter?: SessionFilter;
}): Promise<ListSessionsResponse> {
const config = this.getConfig(params.instance);
if (!config.personal_api_key) {
throw new ConfigMissingError('POSTHOG_PERSONAL_API_KEY');
}
const filter = params.filter || {};
// Build query parameters
const queryParams: any = {
limit: filter.limit || 20
};
if (filter.distinct_id) {
queryParams.distinct_id = filter.distinct_id;
}
if (filter.since) {
queryParams.date_from = filter.since;
}
if (filter.until) {
queryParams.date_to = filter.until;
}
const response = await axios.get(
`${config.api_host}/api/projects/@current/sessions`,
{
headers: {
'Authorization': `Bearer ${config.personal_api_key}`
},
params: queryParams
}
);
const sessions: TrackingSession[] = response.data.results.map((s: any) => ({
id: s.id,
distinct_id: s.distinct_id,
start: s.start_time,
end: s.end_time,
duration_ms: s.duration * 1000,
events_count: s.event_count
}));
return {
sessions,
next_cursor: response.data.next ? String(response.data.offset + filter.limit!) : undefined
};
}
private async recordingsList(params: {
instance: string;
filter?: RecordingFilter;
}): Promise<ListRecordingsResponse> {
const config = this.getConfig(params.instance);
if (!config.personal_api_key) {
throw new ConfigMissingError('POSTHOG_PERSONAL_API_KEY');
}
const filter = params.filter || {};
const queryParams: any = {
limit: filter.limit || 20
};
if (filter.distinct_id) {
queryParams.person_uuid = filter.distinct_id;
}
if (filter.since) {
queryParams.date_from = filter.since;
}
if (filter.until) {
queryParams.date_to = filter.until;
}
const response = await axios.get(
`${config.api_host}/api/projects/@current/session_recordings`,
{
headers: {
'Authorization': `Bearer ${config.personal_api_key}`
},
params: queryParams
}
);
const recordings: TrackingRecording[] = response.data.results.map((r: any) => ({
id: r.id,
session_id: r.session_id,
start: r.start_time,
duration_ms: r.duration * 1000,
url: r.viewed ? r.recording_url : undefined,
segments: r.segments || []
}));
return {
recordings,
next_cursor: response.data.next ? String(response.data.offset + filter.limit!) : undefined
};
}
private async recordingFrames(params: {
instance: string;
recording_id: string;
step_ms?: number;
max_frames?: number;
}): Promise<RecordingFramesResponse> {
const config = this.getConfig(params.instance);
if (!config.personal_api_key) {
throw new ConfigMissingError('POSTHOG_PERSONAL_API_KEY');
}
const stepMs = params.step_ms || 1000;
const maxFrames = params.max_frames || 120;
// Fetch recording data with snapshots
const response = await axios.get(
`${config.api_host}/api/projects/@current/session_recordings/${params.recording_id}/snapshots`,
{
headers: {
'Authorization': `Bearer ${config.personal_api_key}`
}
}
);
const snapshots = response.data.snapshots || [];
if (snapshots.length === 0) {
throw new Error(`No snapshots found for recording ${params.recording_id}`);
}
// Download recording segments and reconstruct video
const videoPath = path.join(this.tempDir, `recording_${params.recording_id}.mp4`);
const framesDir = path.join(this.tempDir, `frames_${params.recording_id}`);
// Create frames directory
if (!fs.existsSync(framesDir)) {
fs.mkdirSync(framesDir, { recursive: true });
}
try {
// Download and concatenate video segments
await this.downloadRecordingVideo(config, params.recording_id, videoPath);
// Extract frames using ffmpeg
const frameImages = await this.extractFramesFromVideo(
videoPath,
framesDir,
stepMs,
maxFrames
);
// Apply redaction rules if configured
const processedImages: Buffer[] = [];
const frameMetadata: RecordingFrame[] = [];
for (let i = 0; i < frameImages.length; i++) {
const frameBuffer = fs.readFileSync(frameImages[i]);
// Apply redaction if rules exist
const redactedImage = config.redaction_rules && config.redaction_rules.length > 0
? await this.applyRedactionRules(frameBuffer, config.redaction_rules)
: frameBuffer;
processedImages.push(redactedImage);
frameMetadata.push({
ts_ms: i * stepMs,
image_name: `frame_${i}.png`,
width: 1920, // These should be extracted from actual frame
height: 1080
});
}
// Clean up temp files
fs.rmSync(videoPath, { force: true });
fs.rmSync(framesDir, { recursive: true, force: true });
// Return frames with embedded images
const result: any = {
frames: frameMetadata
};
// Add each frame image as a separate field
processedImages.forEach((img, i) => {
result[`frame_${i}_data`] = img.toString('base64');
result[`frame_${i}_mimeType`] = 'image/png';
});
return result;
} catch (error) {
// Clean up on error
fs.rmSync(videoPath, { force: true });
fs.rmSync(framesDir, { recursive: true, force: true });
throw new Error(`Failed to process recording: ${error}`);
}
}
private async downloadRecordingVideo(
config: PostHogConfig,
recordingId: string,
outputPath: string
): Promise<void> {
// Fetch the recording blob/segments
const response = await axios.get(
`${config.api_host}/api/projects/@current/session_recordings/${recordingId}/blob`,
{
headers: {
'Authorization': `Bearer ${config.personal_api_key}`
},
responseType: 'stream'
}
);
// Save video to file
const writer = fs.createWriteStream(outputPath);
response.data.pipe(writer);
return new Promise((resolve, reject) => {
writer.on('finish', resolve);
writer.on('error', reject);
});
}
private async extractFramesFromVideo(
videoPath: string,
outputDir: string,
stepMs: number,
maxFrames: number
): Promise<string[]> {
const framePaths: string[] = [];
const fps = 1000 / stepMs; // Convert step_ms to fps
return new Promise((resolve, reject) => {
ffmpeg(videoPath)
.outputOptions([
`-vf fps=${fps}`,
`-frames:v ${maxFrames}`
])
.output(path.join(outputDir, 'frame_%04d.png'))
.on('end', () => {
// Get all generated frame files
const files = fs.readdirSync(outputDir)
.filter(f => f.startsWith('frame_'))
.sort()
.slice(0, maxFrames)
.map(f => path.join(outputDir, f));
resolve(files);
})
.on('error', reject)
.run();
});
}
private async askQuestionAboutRecording(params: {
instance: string;
recording_id: string;
question: string;
model?: string;
}): Promise<RecordingQuestionResponse> {
if (!this.geminiClient) {
throw new ConfigMissingError('GEMINI_API_KEY');
}
// Extract keyframes from recording
const framesResponse = await this.recordingFrames({
instance: params.instance,
recording_id: params.recording_id,
step_ms: 5000, // Get keyframes every 5 seconds
max_frames: 20
});
// Select most relevant keyframes using scene detection
const keyframeIndices = await this.detectSceneChanges(framesResponse);
const keyframes: RecordingKeyframe[] = keyframeIndices
.slice(0, 6) // Max 6 keyframes for Gemini
.map(index => ({
ts_ms: framesResponse.frames[index].ts_ms,
image_name: `keyframe_${index}.png`
}));
// Prepare images for Gemini
const model = this.geminiClient.getGenerativeModel({
model: params.model || 'gemini-2.0-flash-exp'
});
// Convert base64 images to format Gemini expects
const imageParts = keyframeIndices.map(index => ({
inlineData: {
data: (framesResponse as any)[`frame_${index}_data`],
mimeType: 'image/png'
}
}));
// Ask Gemini the question with the actual images
const prompt = `
You are analyzing a session recording with ${keyframes.length} keyframes from a web application.
The user asks: "${params.question}"
Based on the provided screenshots, please provide detailed insights.
Focus on user behavior, UI interactions, potential issues, and patterns you observe.
`;
const result = await model.generateContent([prompt, ...imageParts]);
const answerText = result.response.text();
// Return response with keyframe images
const responseData: any = {
answer_text: answerText,
keyframes
};
// Add keyframe images as attachments
for (let i = 0; i < keyframeIndices.length && i < 6; i++) {
const frameIndex = keyframeIndices[i];
responseData[`keyframe_${i}_data`] = (framesResponse as any)[`frame_${frameIndex}_data`];
responseData[`keyframe_${i}_mimeType`] = 'image/png';
}
return responseData;
}
private async detectSceneChanges(framesResponse: any): Promise<number[]> {
// Detect significant scene changes between frames by analyzing frame distribution
const frameCount = framesResponse.frames.length;
const keyframeCount = Math.min(6, frameCount);
const step = Math.floor(frameCount / keyframeCount);
const indices: number[] = [];
for (let i = 0; i < keyframeCount; i++) {
indices.push(i * step);
}
return indices;
}
private async funnelsRun(params: {
instance: string;
definition: FunnelDefinition;
}): Promise<RunFunnelResponse> {
const config = this.getConfig(params.instance);
if (!config.personal_api_key) {
throw new ConfigMissingError('POSTHOG_PERSONAL_API_KEY');
}
// Build funnel query
const funnelData = {
events: params.definition.steps.map(step => ({
id: step.event,
name: step.event,
type: 'events',
properties: step.match ? Object.entries(step.match).map(([key, value]) => ({
key,
value,
operator: 'exact'
})) : []
})),
funnel_window_days: params.definition.window?.days || 7,
insight: 'FUNNELS'
};
const response = await axios.post(
`${config.api_host}/api/projects/@current/insights/funnel`,
funnelData,
{
headers: {
'Authorization': `Bearer ${config.personal_api_key}`,
'Content-Type': 'application/json'
}
}
);
const results = response.data.results;
const steps = params.definition.steps.map((step, index) => {
const stepData = results[index] || { count: 0, conversion_rate: 0 };
const previousCount = index > 0 ? results[index - 1].count : stepData.count;
const dropOff = previousCount - stepData.count;
return {
event: step.event,
count: stepData.count,
drop_off: dropOff
};
});
const overallConversion = results.length > 0 ?
results[results.length - 1].overall_conversion_rate : 0;
return {
summary: {
conversion_rate: overallConversion
},
steps
};
}
private async featureFlagsList(params: {
instance: string;
}): Promise<ListFeatureFlagsResponse> {
const config = this.getConfig(params.instance);
if (!config.personal_api_key) {
throw new ConfigMissingError('POSTHOG_PERSONAL_API_KEY');
}
const response = await axios.get(
`${config.api_host}/api/projects/@current/feature_flags`,
{
headers: {
'Authorization': `Bearer ${config.personal_api_key}`
}
}
);
const flags: FeatureFlag[] = response.data.results.map((f: any) => ({
key: f.key,
type: f.filters?.multivariate ? 'multivariate' : 'boolean',
rollout: f.filters
}));
return { flags };
}
private async featureFlagEval(params: {
instance: string;
distinct_id: string;
context?: object;
}): Promise<EvaluateFeatureFlagsResponse> {
const config = this.getConfig(params.instance);
// Use the decide endpoint for flag evaluation
const response = await axios.post(
`${config.api_host}/decide/`,
{
api_key: config.project_key,
distinct_id: params.distinct_id,
person_properties: params.context || {}
},
{
headers: {
'Content-Type': 'application/json'
}
}
);
return {
decisions: response.data.featureFlags || {}
};
}
private async applyRedactionRules(image: Buffer, rules: RedactionRule[]): Promise<Buffer> {
// Convert redaction rules to areas for canvas
const areas: Array<{ x: number; y: number; w: number; h: number }> = [];
for (const rule of rules) {
if (rule.type === 'box' && rule.rect) {
areas.push(rule.rect);
}
// Additional redaction types would be handled here
}
if (areas.length > 0) {
return applyRedaction(image, areas);
}
return image;
}
// Cleanup method
async cleanup(): Promise<void> {
// Clean up temp directory
if (fs.existsSync(this.tempDir)) {
fs.rmSync(this.tempDir, { recursive: true, force: true });
}
}
}