ring-buffer.ts•9.24 kB
import { EventEmitter } from 'events';
import { DebugEvent } from '../schemas/index';
/**
* Event Ring Buffer with Cursor Pagination
*
* Implements a bounded ring buffer for debug events with efficient
* cursor-based pagination and truncation semantics.
*/
export class EventRingBuffer extends EventEmitter {
private events: DebugEvent[] = [];
private head: number = 0;
private tail: number = 0;
private count: number = 0;
private capacity: number;
private maxSize: number;
private cursorSequence: number = 0;
constructor(capacity: number = 1000, maxSize: number = 10000) {
super();
this.capacity = capacity;
this.maxSize = maxSize;
this.events = new Array(capacity);
}
/**
* Add an event to the buffer
*/
add(event: DebugEvent): void {
// Truncate if buffer is approaching max size
if (this.count >= this.maxSize * 0.9) {
this.truncate(Math.floor(this.maxSize * 0.5));
this.emit('truncated', { removed: this.count - Math.floor(this.maxSize * 0.5) });
}
const index = (this.head + this.count) % this.capacity;
this.events[index] = event;
this.head = (this.head + 1) % this.capacity;
if (this.count < this.capacity) {
this.count++;
} else {
// Buffer is full, overwrite oldest event
this.tail = (this.tail + 1) % this.capacity;
}
this.cursorSequence++;
event.event_id = this.generateEventId();
event.cursor = this.generateCursor(event.event_id);
this.emit('event', event);
}
/**
* Poll events with cursor-based pagination
*/
poll(
options: {
since?: number;
limit?: number;
cursor?: string;
session_id?: string;
} = {}
): DebugEvent[] {
let startIndex = 0;
let endIndex = this.count;
// Handle cursor-based pagination
if (options.cursor) {
const cursorEvent = this.findEventByCursor(options.cursor);
if (cursorEvent) {
startIndex = (this.getIndex(cursorEvent) + 1) % this.capacity;
if (startIndex <= this.tail) {
// Handle wrap-around
endIndex = this.tail;
}
} else {
// Invalid cursor, return most recent events
startIndex = Math.max(0, this.count - 50);
}
} else if (options.since) {
// Find events since timestamp
startIndex = this.findIndexSince(options.since);
}
// Apply limit
let endIndexLimit = endIndex;
if (options.limit && options.limit > 0) {
const actualStartIndex = startIndex;
if (actualStartIndex < endIndex) {
endIndexLimit = Math.min(endIndex, startIndex + options.limit);
} else {
// Handle wrap-around with limit
const available = this.count - (startIndex - endIndex);
endIndexLimit = Math.min(endIndex, startIndex + Math.min(options.limit, available));
}
}
// Filter by session_id if provided
const events = this.getEventsInRange(startIndex, endIndexLimit);
const filteredEvents = options.session_id
? events.filter(event => event.session_id === options.session_id)
: events;
// Update cursors for the returned events
filteredEvents.forEach(event => {
event.cursor = this.generateCursor(event.event_id);
});
return filteredEvents;
}
/**
* Get events in a specific range
*/
private getEventsInRange(startIndex: number, endIndex: number): DebugEvent[] {
const events: DebugEvent[] = [];
if (startIndex < endIndex) {
// No wrap-around
for (let i = startIndex; i < endIndex; i++) {
const event = this.events[i % this.capacity];
if (event) {
events.push({ ...event });
}
}
} else {
// Wrap-around
for (let i = startIndex; i < this.capacity; i++) {
const event = this.events[i];
if (event) {
events.push({ ...event });
}
}
for (let i = 0; i < endIndex; i++) {
const event = this.events[i];
if (event) {
events.push({ ...event });
}
}
}
return events;
}
/**
* Find the index of events since a timestamp
*/
private findIndexSince(timestamp: number): number {
for (let i = 0; i < this.count; i++) {
const index = (this.tail + i) % this.capacity;
const event = this.events[index];
if (event && event.timestamp >= timestamp) {
return index;
}
}
return this.tail; // Return oldest events
}
/**
* Find an event by cursor
*/
private findEventByCursor(cursor: string): DebugEvent | null {
for (let i = 0; i < this.count; i++) {
const index = (this.tail + i) % this.capacity;
const event = this.events[index];
if (event && event.cursor === cursor) {
return event;
}
}
return null;
}
/**
* Get an event by index
*/
private getIndex(event: DebugEvent): number {
for (let i = 0; i < this.count; i++) {
const index = (this.tail + i) % this.capacity;
const existingEvent = this.events[index];
if (existingEvent && existingEvent.event_id === event.event_id) {
return index;
}
}
return -1;
}
/**
* Truncate the buffer to a specific size
*/
truncate(targetSize: number): void {
if (this.count <= targetSize) {
return; // No truncation needed
}
// Remove oldest events
const removeCount = this.count - targetSize;
this.tail = (this.tail + removeCount) % this.capacity;
this.count = targetSize;
this.emit('truncated', { removed: removeCount });
}
/**
* Clear all events
*/
clear(): void {
this.events = new Array(this.capacity);
this.head = 0;
this.tail = 0;
this.count = 0;
this.cursorSequence = 0;
this.emit('cleared');
}
/**
* Get buffer statistics
*/
getStats(): {
capacity: number;
size: number;
maxSize: number;
utilization: number;
oldestEvent?: DebugEvent;
newestEvent?: DebugEvent;
} {
return {
capacity: this.capacity,
size: this.count,
maxSize: this.maxSize,
utilization: this.count / this.capacity,
oldestEvent: this.count > 0 ? this.events[this.tail] : undefined,
newestEvent:
this.count > 0 ? this.events[(this.head - 1 + this.capacity) % this.capacity] : undefined,
} as any;
}
/**
* Get events without modifying cursors
*/
getAll(): DebugEvent[] {
const events: DebugEvent[] = [];
for (let i = 0; i < this.count; i++) {
const index = (this.tail + i) % this.capacity;
const event = this.events[index];
if (event) {
events.push({ ...event });
}
}
return events;
}
/**
* Get events for a specific session
*/
getForSession(sessionId: string): DebugEvent[] {
const events: DebugEvent[] = [];
for (let i = 0; i < this.count; i++) {
const index = (this.tail + i) % this.capacity;
const event = this.events[index];
if (event && event.session_id === sessionId) {
events.push({ ...event });
}
}
return events;
}
/**
* Check if event exists
*/
has(eventId: string): boolean {
for (let i = 0; i < this.count; i++) {
const index = (this.tail + i) % this.capacity;
const event = this.events[index];
if (event && event.event_id === eventId) {
return true;
}
}
return false;
}
/**
* Get event by ID
*/
getById(eventId: string): DebugEvent | null {
for (let i = 0; i < this.count; i++) {
const index = (this.tail + i) % this.capacity;
const event = this.events[index];
if (event && event.event_id === eventId) {
return { ...event };
}
}
return null;
}
/**
* Generate unique event ID
*/
private generateEventId(): string {
return `event_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* Generate cursor for event
*/
private generateCursor(eventId: string): string {
return `cursor_${this.cursorSequence}_${eventId.substr(0, 8)}`;
}
/**
* Set capacity (with truncation if needed)
*/
setCapacity(newCapacity: number): void {
if (newCapacity < this.count) {
this.truncate(newCapacity);
}
// Create new array
const newEvents = new Array(newCapacity);
for (let i = 0; i < this.count; i++) {
const oldIndex = (this.tail + i) % this.capacity;
newEvents[i] = this.events[oldIndex];
}
this.events = newEvents;
this.capacity = newCapacity;
this.head = this.count;
this.tail = 0;
}
/**
* Set max size
*/
setMaxSize(newMaxSize: number): void {
this.maxSize = newMaxSize;
if (this.count > newMaxSize * 0.9) {
this.truncate(Math.floor(newMaxSize * 0.8));
}
}
/**
* Get recent events (for debugging)
*/
getRecent(limit: number = 10): DebugEvent[] {
const startIndex = Math.max(0, this.count - limit);
return this.getEventsInRange(startIndex, this.count);
}
/**
* Get the oldest events (for debugging)
*/
getOldest(limit: number = 10): DebugEvent[] {
const endIndex = Math.min(limit, this.count);
return this.getEventsInRange(this.tail, this.tail + endIndex);
}
}