observability-integration.ts•30.2 kB
/**
* GEPA Observability Integration System
*
* Comprehensive metrics collection, distributed tracing, log aggregation,
* and real-time dashboards for complete system visibility.
*/
import { EventEmitter } from 'events';
import { ErrorEvent } from './error-tracking-system';
import { AnomalyEvent } from './anomaly-detection';
import { Incident, IncidentSeverity } from './incident-response';
import { PerformanceTracker } from '../../services/performance-tracker';
import { MemoryLeakIntegration } from '../memory-leak-detector';
// Core Observability Types
export interface MetricDefinition {
name: string;
type: MetricType;
description: string;
unit: string;
labels: string[];
aggregation: AggregationType;
}
export enum MetricType {
COUNTER = 'counter',
GAUGE = 'gauge',
HISTOGRAM = 'histogram',
SUMMARY = 'summary'
}
export enum AggregationType {
SUM = 'sum',
AVERAGE = 'average',
COUNT = 'count',
RATE = 'rate',
PERCENTILE = 'percentile'
}
export interface MetricSample {
metric: string;
value: number;
timestamp: number;
labels: Record<string, string>;
}
export interface TraceSpan {
traceId: string;
spanId: string;
parentSpanId?: string;
operationName: string;
startTime: number;
endTime?: number;
duration?: number;
tags: Record<string, any>;
logs: TraceLog[];
status: SpanStatus;
error?: boolean;
}
export enum SpanStatus {
OK = 'ok',
ERROR = 'error',
TIMEOUT = 'timeout',
CANCELLED = 'cancelled'
}
export interface TraceLog {
timestamp: number;
level: 'debug' | 'info' | 'warn' | 'error';
message: string;
fields?: Record<string, any>;
}
export interface LogEntry {
timestamp: number;
level: 'debug' | 'info' | 'warn' | 'error' | 'fatal';
message: string;
source: string;
traceId?: string;
spanId?: string;
labels: Record<string, string>;
fields: Record<string, any>;
}
export interface Dashboard {
id: string;
name: string;
description: string;
panels: DashboardPanel[];
filters: DashboardFilter[];
refreshInterval: number;
timeRange: TimeRange;
metadata: Record<string, any>;
}
export interface DashboardPanel {
id: string;
title: string;
type: PanelType;
query: QueryDefinition;
visualization: VisualizationConfig;
position: { x: number; y: number; width: number; height: number };
alerts?: AlertRule[];
}
export enum PanelType {
TIME_SERIES = 'time_series',
SINGLE_STAT = 'single_stat',
TABLE = 'table',
HEATMAP = 'heatmap',
GAUGE = 'gauge',
BAR_CHART = 'bar_chart',
PIE_CHART = 'pie_chart'
}
export interface QueryDefinition {
metric: string;
aggregation: AggregationType;
groupBy?: string[];
filters?: Record<string, string>;
timeWindow?: string;
}
export interface VisualizationConfig {
colorScheme?: string;
thresholds?: Array<{ value: number; color: string }>;
unit?: string;
decimals?: number;
min?: number;
max?: number;
}
export interface DashboardFilter {
field: string;
label: string;
type: 'select' | 'multiselect' | 'text' | 'time';
options?: string[];
defaultValue?: any;
}
export interface TimeRange {
from: number | string;
to: number | string;
}
export interface AlertRule {
id: string;
name: string;
query: QueryDefinition;
condition: AlertCondition;
actions: AlertAction[];
enabled: boolean;
}
export interface AlertCondition {
operator: 'greater_than' | 'less_than' | 'equals' | 'not_equals';
threshold: number;
duration: number; // Duration the condition must be met
}
export interface AlertAction {
type: 'webhook' | 'email' | 'slack';
target: string;
template?: string;
}
export interface ObservabilityConfig {
enableMetricsCollection: boolean;
enableDistributedTracing: boolean;
enableLogAggregation: boolean;
enableRealTimeDashboards: boolean;
metricsRetentionDays: number;
tracesRetentionDays: number;
logsRetentionDays: number;
maxSpansPerTrace: number;
samplingRate: number;
alertingEnabled: boolean;
}
/**
* Comprehensive Observability System
*/
export class ObservabilityIntegration extends EventEmitter {
private config: Required<ObservabilityConfig>;
private metricDefinitions: Map<string, MetricDefinition> = new Map();
private metricSamples: MetricSample[] = [];
private activeTraces: Map<string, TraceSpan[]> = new Map();
private completedTraces: Map<string, TraceSpan[]> = new Map();
private logEntries: LogEntry[] = [];
private dashboards: Map<string, Dashboard> = new Map();
private alertRules: Map<string, AlertRule> = new Map();
private performanceTracker: PerformanceTracker | undefined;
constructor(
config: Partial<ObservabilityConfig> = {},
performanceTracker?: PerformanceTracker
) {
super();
this.config = {
enableMetricsCollection: config.enableMetricsCollection ?? true,
enableDistributedTracing: config.enableDistributedTracing ?? true,
enableLogAggregation: config.enableLogAggregation ?? true,
enableRealTimeDashboards: config.enableRealTimeDashboards ?? true,
metricsRetentionDays: config.metricsRetentionDays ?? 30,
tracesRetentionDays: config.tracesRetentionDays ?? 7,
logsRetentionDays: config.logsRetentionDays ?? 14,
maxSpansPerTrace: config.maxSpansPerTrace ?? 1000,
samplingRate: config.samplingRate ?? 0.1,
alertingEnabled: config.alertingEnabled ?? true
};
this.performanceTracker = performanceTracker;
this.initializeStandardMetrics();
this.initializeDefaultDashboards();
this.initializeMemoryIntegration();
this.startBackgroundProcesses();
}
/**
* Define a new metric
*/
defineMetric(definition: MetricDefinition): void {
this.metricDefinitions.set(definition.name, definition);
}
/**
* Record a metric sample
*/
recordMetric(name: string, value: number, labels: Record<string, string> = {}): void {
if (!this.config.enableMetricsCollection) return;
const sample: MetricSample = {
metric: name,
value,
timestamp: Date.now(),
labels
};
this.metricSamples.push(sample);
this.maintainMetricsRetention();
// Emit for real-time processing
this.emit('metric-recorded', sample);
// Check alert rules
if (this.config.alertingEnabled) {
this.checkAlertRules(sample);
}
}
/**
* Start a new distributed trace
*/
startTrace(operationName: string, parentSpanId?: string): TraceSpan {
if (!this.config.enableDistributedTracing) {
throw new Error('Distributed tracing is disabled');
}
const traceId = parentSpanId ? this.findTraceId(parentSpanId) : this.generateTraceId();
const spanId = this.generateSpanId();
const span: TraceSpan = {
traceId,
spanId,
...(parentSpanId ? { parentSpanId } : {}),
operationName,
startTime: Date.now(),
tags: {},
logs: [],
status: SpanStatus.OK
};
// Add to active traces
if (!this.activeTraces.has(traceId)) {
this.activeTraces.set(traceId, []);
}
const traces = this.activeTraces.get(traceId);
if (traces) {
traces.push(span);
}
// Check span limits
const spans = this.activeTraces.get(traceId);
if (!spans) return span;
if (spans.length > this.config.maxSpansPerTrace) {
// eslint-disable-next-line no-console
console.warn(`Trace ${traceId} exceeded maximum spans (${this.config.maxSpansPerTrace})`);
}
return span;
}
/**
* Finish a trace span
*/
finishSpan(spanId: string, status: SpanStatus = SpanStatus.OK, error?: Error): void {
const span = this.findActiveSpan(spanId);
if (!span) return;
span.endTime = Date.now();
span.duration = span.endTime - span.startTime;
span.status = status;
if (error) {
span.error = true;
span.tags.error_message = error.message;
span.tags.error_stack = error.stack;
}
// Move to completed traces if this is the root span
if (!span.parentSpanId) {
const traceSpans = this.activeTraces.get(span.traceId);
if (traceSpans) {
this.completedTraces.set(span.traceId, traceSpans);
this.activeTraces.delete(span.traceId);
this.emit('trace-completed', { traceId: span.traceId, spans: traceSpans });
}
}
// Record tracing metrics
this.recordMetric('trace_span_duration', span.duration, {
operation: span.operationName,
status: span.status
});
if (span.error) {
this.recordMetric('trace_span_errors', 1, {
operation: span.operationName
});
}
}
/**
* Add tag to a span
*/
addSpanTag(spanId: string, key: string, value: any): void {
const span = this.findActiveSpan(spanId);
if (span) {
span.tags[key] = value;
}
}
/**
* Add log to a span
*/
addSpanLog(spanId: string, level: 'debug' | 'info' | 'warn' | 'error', message: string, fields?: Record<string, any>): void {
const span = this.findActiveSpan(spanId);
if (span) {
span.logs.push({
timestamp: Date.now(),
level,
message,
...(fields ? { fields } : {})
});
}
}
/**
* Record a log entry
*/
recordLog(
level: 'debug' | 'info' | 'warn' | 'error' | 'fatal',
message: string,
source: string,
traceId?: string,
spanId?: string,
labels: Record<string, string> = {},
fields: Record<string, any> = {}
): void {
if (!this.config.enableLogAggregation) return;
const logEntry: LogEntry = {
timestamp: Date.now(),
level,
message,
source,
...(traceId ? { traceId } : {}),
...(spanId ? { spanId } : {}),
labels,
fields
};
this.logEntries.push(logEntry);
this.maintainLogsRetention();
// Emit for real-time processing
this.emit('log-recorded', logEntry);
// Auto-correlate with active spans
if (traceId && spanId && level !== 'fatal') {
this.addSpanLog(spanId, level, message, fields);
}
}
/**
* Handle error events with observability integration
*/
handleErrorEvent(error: ErrorEvent): void {
// Record error metrics
this.recordMetric('errors_total', 1, {
level: error.level,
category: error.category,
source: error.source
});
// Record log entry
this.recordLog(
error.level === 'critical' || error.level === 'fatal' ? 'error' : 'warn',
error.message,
error.source,
undefined,
undefined,
{ error_id: error.id, error_category: error.category },
{ stack: error.stack, context: error.context }
);
// Correlate with active traces if context provides trace info
if (error.context.traceId && error.context.spanId) {
this.addSpanTag(error.context.spanId, 'error', true);
this.addSpanTag(error.context.spanId, 'error_message', error.message);
this.finishSpan(error.context.spanId, SpanStatus.ERROR, new Error(error.message));
}
}
/**
* Handle anomaly events with observability integration
*/
handleAnomalyEvent(anomaly: AnomalyEvent): void {
// Record anomaly metrics
this.recordMetric('anomalies_total', 1, {
type: anomaly.type,
severity: anomaly.severity,
method: anomaly.detectionMethod
});
this.recordMetric('anomaly_confidence', anomaly.confidence, {
type: anomaly.type
});
// Record log entry
this.recordLog(
'warn',
`Anomaly detected: ${anomaly.description}`,
'anomaly-detection',
undefined,
undefined,
{
anomaly_id: anomaly.id,
anomaly_type: anomaly.type,
severity: anomaly.severity
},
{
confidence: anomaly.confidence,
evidence: anomaly.evidence,
affected_systems: anomaly.affectedSystems
}
);
}
/**
* Handle incident events with observability integration
*/
handleIncidentEvent(incident: Incident): void {
// Record incident metrics
this.recordMetric('incidents_total', 1, {
severity: incident.severity,
status: incident.status,
priority: incident.priority
});
if (incident.resolvedAt) {
const resolutionTime = incident.resolvedAt - incident.createdAt;
this.recordMetric('incident_resolution_time', resolutionTime, {
severity: incident.severity
});
}
// Record log entry
this.recordLog(
incident.severity === IncidentSeverity.CRITICAL ? 'error' : 'warn',
`Incident ${incident.status}: ${incident.title}`,
'incident-response',
undefined,
undefined,
{
incident_id: incident.id,
severity: incident.severity,
status: incident.status
},
{
affected_systems: incident.affectedSystems,
assignee: incident.assignee,
team: incident.team
}
);
}
/**
* Create a new dashboard
*/
createDashboard(dashboard: Dashboard): void {
this.dashboards.set(dashboard.id, dashboard);
}
/**
* Update an existing dashboard
*/
updateDashboard(dashboardId: string, updates: Partial<Dashboard>): boolean {
const dashboard = this.dashboards.get(dashboardId);
if (!dashboard) return false;
Object.assign(dashboard, updates);
return true;
}
/**
* Get dashboard data
*/
getDashboardData(dashboardId: string, timeRange?: TimeRange): any {
const dashboard = this.dashboards.get(dashboardId);
if (!dashboard) return null;
const range = timeRange || dashboard.timeRange;
const data: any = {
dashboard,
panels: []
};
for (const panel of dashboard.panels) {
const panelData = this.executePanelQuery(panel, range);
data.panels.push({
...panel,
data: panelData
});
}
return data;
}
/**
* Query metrics
*/
queryMetrics(query: QueryDefinition, timeRange: TimeRange): MetricSample[] {
const start = typeof timeRange.from === 'number' ? timeRange.from : this.parseTimeString(timeRange.from);
const end = typeof timeRange.to === 'number' ? timeRange.to : this.parseTimeString(timeRange.to);
let samples = this.metricSamples.filter(s =>
s.metric === query.metric &&
s.timestamp >= start &&
s.timestamp <= end
);
// Apply filters
if (query.filters) {
samples = samples.filter(s => {
for (const [key, value] of Object.entries(query.filters!)) {
if (s.labels[key] !== value) return false;
}
return true;
});
}
// Apply grouping and aggregation
if (query.groupBy && query.groupBy.length > 0) {
samples = this.aggregateMetricsByGroup(samples, query.groupBy, query.aggregation);
} else {
samples = this.aggregateMetrics(samples, query.aggregation);
}
return samples;
}
/**
* Query traces
*/
queryTraces(filters: {
timeRange: TimeRange;
operationName?: string;
minDuration?: number;
maxDuration?: number;
hasError?: boolean;
tags?: Record<string, any>;
}): TraceSpan[][] {
const start = typeof filters.timeRange.from === 'number' ? filters.timeRange.from : this.parseTimeString(filters.timeRange.from);
const end = typeof filters.timeRange.to === 'number' ? filters.timeRange.to : this.parseTimeString(filters.timeRange.to);
const traces = Array.from(this.completedTraces.values()).filter(spans => {
const rootSpan = spans.find(s => !s.parentSpanId);
if (!rootSpan) return false;
// Time range filter
if (rootSpan.startTime < start || rootSpan.startTime > end) return false;
// Operation name filter
if (filters.operationName && !spans.some(s => s.operationName.includes(filters.operationName!))) {
return false;
}
// Duration filters
if (filters.minDuration && rootSpan.duration && rootSpan.duration < filters.minDuration) return false;
if (filters.maxDuration && rootSpan.duration && rootSpan.duration > filters.maxDuration) return false;
// Error filter
if (filters.hasError !== undefined) {
const hasError = spans.some(s => s.error);
if (filters.hasError !== hasError) return false;
}
// Tags filter
if (filters.tags) {
const matchesTags = spans.some(span => {
for (const [key, value] of Object.entries(filters.tags!)) {
if (span.tags[key] !== value) return false;
}
return true;
});
if (!matchesTags) return false;
}
return true;
});
return traces;
}
/**
* Query logs
*/
queryLogs(filters: {
timeRange: TimeRange;
levels?: string[];
sources?: string[];
traceId?: string;
search?: string;
limit?: number;
}): LogEntry[] {
const start = typeof filters.timeRange.from === 'number' ? filters.timeRange.from : this.parseTimeString(filters.timeRange.from);
const end = typeof filters.timeRange.to === 'number' ? filters.timeRange.to : this.parseTimeString(filters.timeRange.to);
let logs = this.logEntries.filter(log =>
log.timestamp >= start && log.timestamp <= end
);
// Apply filters
if (filters.levels) {
logs = logs.filter(log => filters.levels!.includes(log.level));
}
if (filters.sources) {
logs = logs.filter(log => filters.sources!.includes(log.source));
}
if (filters.traceId) {
logs = logs.filter(log => log.traceId === filters.traceId);
}
if (filters.search) {
const searchPattern = new RegExp(filters.search, 'i');
logs = logs.filter(log => searchPattern.test(log.message));
}
// Sort by timestamp (newest first)
logs.sort((a, b) => b.timestamp - a.timestamp);
// Apply limit
if (filters.limit) {
logs = logs.slice(0, filters.limit);
}
return logs;
}
/**
* Get system observability metrics
*/
getObservabilityMetrics(): {
metricsCount: number;
activeTraces: number;
completedTraces: number;
logsCount: number;
dashboardsCount: number;
alertRulesCount: number;
memoryUsage: {
metrics: number;
traces: number;
logs: number;
total: number;
};
} {
const memoryUsage = this.estimateMemoryUsage();
return {
metricsCount: this.metricSamples.length,
activeTraces: this.activeTraces.size,
completedTraces: this.completedTraces.size,
logsCount: this.logEntries.length,
dashboardsCount: this.dashboards.size,
alertRulesCount: this.alertRules.size,
memoryUsage
};
}
/**
* Clear all observability data
*/
clearData(): void {
this.metricSamples = [];
this.activeTraces.clear();
this.completedTraces.clear();
this.logEntries = [];
}
// Private methods
private findActiveSpan(spanId: string): TraceSpan | null {
for (const spans of this.activeTraces.values()) {
const span = spans.find(s => s.spanId === spanId);
if (span) return span;
}
return null;
}
private findTraceId(spanId: string): string {
for (const [traceId, spans] of this.activeTraces) {
if (spans.some(s => s.spanId === spanId)) {
return traceId;
}
}
// If not found in active, check completed
for (const [traceId, spans] of this.completedTraces) {
if (spans.some(s => s.spanId === spanId)) {
return traceId;
}
}
return this.generateTraceId();
}
private generateTraceId(): string {
return `trace_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private generateSpanId(): string {
return `span_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private parseTimeString(timeStr: string): number {
// Simple time string parsing (e.g., "now-1h", "now-24h")
if (timeStr === 'now') return Date.now();
const match = timeStr.match(/^now-(\d+)([mhd])$/);
if (match && match[1] && match[2]) {
const value = parseInt(match[1]);
const unit = match[2];
const multipliers = { m: 60000, h: 3600000, d: 86400000 };
return Date.now() - (value * multipliers[unit as keyof typeof multipliers]);
}
return parseInt(timeStr) || Date.now();
}
private aggregateMetrics(samples: MetricSample[], aggregation: AggregationType): MetricSample[] {
if (samples.length === 0) return [];
const firstSample = samples[0]!;
const lastSample = samples[samples.length - 1]!;
switch (aggregation) {
case AggregationType.SUM:
return [{
metric: firstSample.metric,
value: samples.reduce((sum, s) => sum + s.value, 0),
timestamp: lastSample.timestamp,
labels: {}
}];
case AggregationType.AVERAGE:
return [{
metric: firstSample.metric,
value: samples.reduce((sum, s) => sum + s.value, 0) / samples.length,
timestamp: lastSample.timestamp,
labels: {}
}];
case AggregationType.COUNT:
return [{
metric: firstSample.metric,
value: samples.length,
timestamp: lastSample.timestamp,
labels: {}
}];
default:
return samples;
}
}
private aggregateMetricsByGroup(samples: MetricSample[], groupBy: string[], aggregation: AggregationType): MetricSample[] {
const groups = new Map<string, MetricSample[]>();
for (const sample of samples) {
const key = groupBy.map(field => sample.labels[field] || '').join('|');
if (!groups.has(key)) {
groups.set(key, []);
}
const group = groups.get(key);
if (group) {
group.push(sample);
}
}
const result: MetricSample[] = [];
for (const [key, groupSamples] of groups) {
const aggregated = this.aggregateMetrics(groupSamples, aggregation);
if (aggregated.length > 0 && aggregated[0]) {
// Reconstruct labels from key
const labels: Record<string, string> = {};
const keyParts = key.split('|');
groupBy.forEach((field, index) => {
const keyPart = keyParts[index];
if (keyPart) {
labels[field] = keyPart;
}
});
aggregated[0].labels = labels;
result.push(aggregated[0]);
}
}
return result;
}
private executePanelQuery(panel: DashboardPanel, timeRange: TimeRange): any {
switch (panel.type) {
case PanelType.TIME_SERIES:
return this.queryMetrics(panel.query, timeRange);
case PanelType.SINGLE_STAT:
const samples = this.queryMetrics(panel.query, timeRange);
const lastSample = samples[samples.length - 1];
return samples.length > 0 && lastSample ? lastSample.value : 0;
case PanelType.TABLE:
return this.queryMetrics(panel.query, timeRange);
default:
return this.queryMetrics(panel.query, timeRange);
}
}
private checkAlertRules(sample: MetricSample): void {
for (const rule of this.alertRules.values()) {
if (!rule.enabled || rule.query.metric !== sample.metric) continue;
// Simple threshold checking
let conditionMet = false;
switch (rule.condition.operator) {
case 'greater_than':
conditionMet = sample.value > rule.condition.threshold;
break;
case 'less_than':
conditionMet = sample.value < rule.condition.threshold;
break;
case 'equals':
conditionMet = sample.value === rule.condition.threshold;
break;
case 'not_equals':
conditionMet = sample.value !== rule.condition.threshold;
break;
}
if (conditionMet) {
this.emit('alert-triggered', {
rule,
sample,
timestamp: Date.now()
});
}
}
}
private initializeStandardMetrics(): void {
// Define standard GEPA metrics
this.defineMetric({
name: 'errors_total',
type: MetricType.COUNTER,
description: 'Total number of errors',
unit: 'count',
labels: ['level', 'category', 'source'],
aggregation: AggregationType.SUM
});
this.defineMetric({
name: 'anomalies_total',
type: MetricType.COUNTER,
description: 'Total number of anomalies detected',
unit: 'count',
labels: ['type', 'severity', 'method'],
aggregation: AggregationType.SUM
});
this.defineMetric({
name: 'incidents_total',
type: MetricType.COUNTER,
description: 'Total number of incidents',
unit: 'count',
labels: ['severity', 'status', 'priority'],
aggregation: AggregationType.SUM
});
this.defineMetric({
name: 'trace_span_duration',
type: MetricType.HISTOGRAM,
description: 'Duration of trace spans',
unit: 'milliseconds',
labels: ['operation', 'status'],
aggregation: AggregationType.PERCENTILE
});
this.defineMetric({
name: 'system_memory_usage',
type: MetricType.GAUGE,
description: 'System memory usage',
unit: 'bytes',
labels: ['component'],
aggregation: AggregationType.AVERAGE
});
}
private initializeDefaultDashboards(): void {
// Create error monitoring dashboard
this.createDashboard({
id: 'error-monitoring',
name: 'Error Monitoring',
description: 'Monitor errors and incidents across the system',
refreshInterval: 30000,
timeRange: { from: 'now-1h', to: 'now' },
filters: [],
panels: [
{
id: 'error-rate',
title: 'Error Rate',
type: PanelType.TIME_SERIES,
query: {
metric: 'errors_total',
aggregation: AggregationType.RATE,
groupBy: ['level']
},
visualization: {
colorScheme: 'red',
thresholds: [
{ value: 0, color: 'green' },
{ value: 10, color: 'yellow' },
{ value: 50, color: 'red' }
]
},
position: { x: 0, y: 0, width: 12, height: 6 }
},
{
id: 'total-errors',
title: 'Total Errors (Last Hour)',
type: PanelType.SINGLE_STAT,
query: {
metric: 'errors_total',
aggregation: AggregationType.SUM
},
visualization: {
colorScheme: 'red',
unit: 'count'
},
position: { x: 0, y: 6, width: 6, height: 3 }
}
],
metadata: {}
});
// Create performance dashboard
this.createDashboard({
id: 'performance',
name: 'Performance Monitoring',
description: 'Monitor system performance and traces',
refreshInterval: 15000,
timeRange: { from: 'now-30m', to: 'now' },
filters: [],
panels: [
{
id: 'response-times',
title: 'Response Times',
type: PanelType.TIME_SERIES,
query: {
metric: 'trace_span_duration',
aggregation: AggregationType.PERCENTILE,
groupBy: ['operation']
},
visualization: {
colorScheme: 'blue',
unit: 'ms'
},
position: { x: 0, y: 0, width: 12, height: 6 }
}
],
metadata: {}
});
}
private maintainMetricsRetention(): void {
const cutoff = Date.now() - (this.config.metricsRetentionDays * 86400000);
this.metricSamples = this.metricSamples.filter(s => s.timestamp > cutoff);
}
private maintainTracesRetention(): void {
const cutoff = Date.now() - (this.config.tracesRetentionDays * 86400000);
for (const [traceId, spans] of this.completedTraces) {
const rootSpan = spans.find(s => !s.parentSpanId);
if (rootSpan && rootSpan.startTime < cutoff) {
this.completedTraces.delete(traceId);
}
}
}
private maintainLogsRetention(): void {
const cutoff = Date.now() - (this.config.logsRetentionDays * 86400000);
this.logEntries = this.logEntries.filter(log => log.timestamp > cutoff);
}
private estimateMemoryUsage(): {
metrics: number;
traces: number;
logs: number;
total: number;
} {
const metricsSize = this.metricSamples.length * 200; // Approximate size per metric
let tracesSize = 0;
for (const spans of this.activeTraces.values()) {
tracesSize += spans.length * 1024; // Approximate size per span
}
for (const spans of this.completedTraces.values()) {
tracesSize += spans.length * 1024;
}
const logsSize = this.logEntries.length * 512; // Approximate size per log
return {
metrics: metricsSize,
traces: tracesSize,
logs: logsSize,
total: metricsSize + tracesSize + logsSize
};
}
private initializeMemoryIntegration(): void {
MemoryLeakIntegration.initialize();
setInterval(() => {
const memoryUsage = this.estimateMemoryUsage();
MemoryLeakIntegration.trackObservability('monitor', memoryUsage.total);
// Record system memory metrics
this.recordMetric('system_memory_usage', memoryUsage.total, { component: 'observability' });
this.recordMetric('system_memory_usage', memoryUsage.metrics, { component: 'metrics' });
this.recordMetric('system_memory_usage', memoryUsage.traces, { component: 'traces' });
this.recordMetric('system_memory_usage', memoryUsage.logs, { component: 'logs' });
}, 60000);
}
private startBackgroundProcesses(): void {
// Retention cleanup every hour
setInterval(() => {
this.maintainMetricsRetention();
this.maintainTracesRetention();
this.maintainLogsRetention();
}, 3600000);
// Performance metrics collection every 30 seconds
setInterval(() => {
if (this.performanceTracker) {
const metrics = this.getObservabilityMetrics();
this.recordMetric('observability_metrics_count', metrics.metricsCount);
this.recordMetric('observability_active_traces', metrics.activeTraces);
this.recordMetric('observability_logs_count', metrics.logsCount);
}
}, 30000);
}
}