Skip to main content
Glama
waldzellai

Exa Websets MCP Server

by waldzellai
EventService.ts16.7 kB
/** * Event Service Implementation * * Provides CRUD operations for Websets events with filtering, pagination, * and integration with the event system components. */ import { BaseService } from './BaseService.js'; import { WebsetsApiClient } from '../api/WebsetsApiClient.js'; import { WebsetEvent, PaginatedResponse } from '../types/websets.js'; /** * List events response type */ export interface ListEventsResponse extends PaginatedResponse<WebsetEvent> { events?: WebsetEvent[]; nextCursor?: string; } /** * Event query parameters */ export interface EventQueryParams { /** Cursor for pagination */ cursor?: string; /** Maximum number of events to return */ limit?: number; /** Filter by event types */ types?: string[]; /** Stream format for real-time events */ streamFormat?: 'jsonl' | 'json'; /** Batch size for streaming */ batchSize?: number; } /** * Event streaming options */ export interface EventStreamOptions { /** Event types to filter by */ types?: string[]; /** Stream format */ format?: 'jsonl' | 'json'; /** Batch size for processing */ batchSize?: number; /** Maximum total events to stream */ limit?: number; /** Callback for each event batch */ onBatch?: (events: WebsetEvent[]) => void; /** Callback for stream completion */ onComplete?: (totalEvents: number) => void; /** Callback for stream errors */ onError?: (error: Error) => void; } /** * Event statistics */ export interface EventStats { /** Total events */ totalEvents: number; /** Events by type */ eventsByType: Record<string, number>; /** Recent event rate (events per hour) */ recentEventRate: number; /** Most active event types */ mostActiveTypes: Array<{ type: string; count: number }>; /** Event timeline (last 24 hours) */ timeline: Array<{ hour: number; count: number }>; } /** * Event filter criteria */ export interface EventFilter { /** Filter by event types */ types?: string[]; /** Filter by date range */ dateRange?: { start: Date; end: Date; }; /** Filter by webset ID */ websetId?: string; /** Filter by search ID */ searchId?: string; /** Filter by enrichment ID */ enrichmentId?: string; /** Filter by item ID */ itemId?: string; /** Text search in event data */ textSearch?: string; } /** * Service for managing Websets events */ export class EventService extends BaseService { constructor(apiClient: WebsetsApiClient) { super(apiClient); } /** * List events with filtering and pagination * @param params Query parameters * @returns Promise resolving to events response */ async listEvents(params: EventQueryParams = {}): Promise<ListEventsResponse> { try { this.logOperation('Listing events', params); const sanitizedParams = this.sanitizeParams({ cursor: params.cursor, limit: params.limit, types: params.types, streamFormat: params.streamFormat, batchSize: params.batchSize, }); const response = await this.handleGetRequest<ListEventsResponse>('/events', sanitizedParams); this.logOperation('Listed events successfully', { eventCount: response.data?.length || 0, hasMore: !!response.nextCursor, }); return { ...response, events: response.data || [], }; } catch (error) { this.logOperation('Failed to list events', { error, params }); throw error; } } /** * Get a specific event by ID * @param eventId Event ID * @returns Promise resolving to event */ async getEvent(eventId: string): Promise<WebsetEvent> { try { this.validateRequired({ eventId }, ['eventId']); this.logOperation('Getting event', { eventId }); const endpoint = this.buildEndpoint('/events/{eventId}', { eventId }); const event = await this.handleGetRequest<WebsetEvent>(endpoint); this.logOperation('Retrieved event successfully', { eventId, eventType: event.type, }); return event; } catch (error) { this.logOperation('Failed to get event', { error, eventId }); throw error; } } /** * Stream events with real-time processing * @param options Streaming options * @returns Promise resolving when streaming completes */ async streamEvents(options: EventStreamOptions = {}): Promise<void> { try { this.logOperation('Starting event stream', options); let totalEvents = 0; let cursor: string | undefined; const batchSize = options.batchSize || 25; while (true) { const response = await this.listEvents({ cursor, limit: batchSize, types: options.types, streamFormat: options.format, batchSize, }); if (!response.events || response.events.length === 0) { break; } // Process batch if (options.onBatch) { try { options.onBatch(response.events); } catch (batchError) { this.logOperation('Error in batch callback', { batchError }); } } totalEvents += response.events.length; // Check if we've reached the limit if (options.limit && totalEvents >= options.limit) { break; } // Check if there are more events if (!response.nextCursor) { break; } cursor = response.nextCursor; } if (options.onComplete) { try { options.onComplete(totalEvents); } catch (completeError) { this.logOperation('Error in completion callback', { completeError }); } } this.logOperation('Event streaming completed', { totalEvents }); } catch (error) { this.logOperation('Failed to stream events', { error, options }); if (options.onError) { try { options.onError(error as Error); } catch (errorCallbackError) { this.logOperation('Error in error callback', { errorCallbackError }); } } throw error; } } /** * Get events by type * @param eventType Event type to filter by * @param limit Maximum number of events to return * @returns Promise resolving to events */ async getEventsByType(eventType: string, limit: number = 100): Promise<WebsetEvent[]> { try { this.validateRequired({ eventType }, ['eventType']); this.logOperation('Getting events by type', { eventType, limit }); const response = await this.listEvents({ types: [eventType], limit, }); this.logOperation('Retrieved events by type', { eventType, eventCount: response.events?.length || 0, }); return response.events || []; } catch (error) { this.logOperation('Failed to get events by type', { error, eventType }); throw error; } } /** * Get recent events * @param hours Number of hours to look back * @param limit Maximum number of events to return * @returns Promise resolving to recent events */ async getRecentEvents(hours: number = 24, limit: number = 100): Promise<WebsetEvent[]> { try { this.logOperation('Getting recent events', { hours, limit }); // Get events and filter by time (API doesn't support time filtering directly) const response = await this.listEvents({ limit: limit * 2 }); // Get more to account for filtering const cutoffTime = new Date(Date.now() - hours * 60 * 60 * 1000); const recentEvents = (response.events || []) .filter((event: WebsetEvent) => new Date(event.createdAt) >= cutoffTime) .slice(0, limit); this.logOperation('Retrieved recent events', { hours, eventCount: recentEvents.length, }); return recentEvents; } catch (error) { this.logOperation('Failed to get recent events', { error, hours }); throw error; } } /** * Search events with advanced filtering * @param filter Filter criteria * @param limit Maximum number of events to return * @returns Promise resolving to filtered events */ async searchEvents(filter: EventFilter, limit: number = 100): Promise<WebsetEvent[]> { try { this.logOperation('Searching events', { filter, limit }); // Start with basic type filtering if provided const response = await this.listEvents({ types: filter.types, limit: limit * 2, // Get more to account for additional filtering }); let events = response.events || []; // Apply additional filters if (filter.dateRange) { events = events.filter((event: WebsetEvent) => { const eventDate = new Date(event.createdAt); return eventDate >= filter.dateRange!.start && eventDate <= filter.dateRange!.end; }); } if (filter.websetId) { events = events.filter((event: WebsetEvent) => event.data && 'websetId' in event.data && event.data.websetId === filter.websetId ); } if (filter.searchId) { events = events.filter((event: WebsetEvent) => event.data && 'searchId' in event.data && event.data.searchId === filter.searchId ); } if (filter.enrichmentId) { events = events.filter((event: WebsetEvent) => event.data && 'enrichmentId' in event.data && event.data.enrichmentId === filter.enrichmentId ); } if (filter.itemId) { events = events.filter((event: WebsetEvent) => event.data && 'itemId' in event.data && event.data.itemId === filter.itemId ); } if (filter.textSearch) { const searchTerm = filter.textSearch.toLowerCase(); events = events.filter((event: WebsetEvent) => { const eventStr = JSON.stringify(event).toLowerCase(); return eventStr.includes(searchTerm); }); } // Limit results events = events.slice(0, limit); this.logOperation('Searched events successfully', { filter, eventCount: events.length, }); return events; } catch (error) { this.logOperation('Failed to search events', { error, filter }); throw error; } } /** * Get event statistics * @param hours Number of hours to analyze (default: 24) * @returns Promise resolving to event statistics */ async getEventStats(hours: number = 24): Promise<EventStats> { try { this.logOperation('Getting event statistics', { hours }); const events = await this.getRecentEvents(hours, 1000); // Get more events for better stats const stats: EventStats = { totalEvents: events.length, eventsByType: {}, recentEventRate: 0, mostActiveTypes: [], timeline: [], }; // Count events by type for (const event of events) { stats.eventsByType[event.type] = (stats.eventsByType[event.type] || 0) + 1; } // Calculate event rate (events per hour) stats.recentEventRate = events.length / hours; // Get most active types stats.mostActiveTypes = Object.entries(stats.eventsByType) .map(([type, count]) => ({ type, count })) .sort((a, b) => b.count - a.count) .slice(0, 10); // Create timeline (hourly buckets) const timeline = new Array(24).fill(0); const now = new Date(); for (const event of events) { const eventTime = new Date(event.createdAt); const hoursAgo = Math.floor((now.getTime() - eventTime.getTime()) / (60 * 60 * 1000)); if (hoursAgo >= 0 && hoursAgo < 24) { timeline[23 - hoursAgo]++; } } stats.timeline = timeline.map((count, index) => ({ hour: index, count })); this.logOperation('Generated event statistics', { totalEvents: stats.totalEvents, typeCount: Object.keys(stats.eventsByType).length, eventRate: stats.recentEventRate, }); return stats; } catch (error) { this.logOperation('Failed to get event statistics', { error, hours }); throw error; } } /** * Get events for a specific webset * @param websetId Webset ID * @param limit Maximum number of events to return * @returns Promise resolving to webset events */ async getWebsetEvents(websetId: string, limit: number = 100): Promise<WebsetEvent[]> { try { this.validateRequired({ websetId }, ['websetId']); this.logOperation('Getting webset events', { websetId, limit }); const events = await this.searchEvents({ websetId }, limit); this.logOperation('Retrieved webset events', { websetId, eventCount: events.length, }); return events; } catch (error) { this.logOperation('Failed to get webset events', { error, websetId }); throw error; } } /** * Monitor events in real-time * @param callback Callback function for new events * @param options Monitoring options * @returns Function to stop monitoring */ monitorEvents( callback: (event: WebsetEvent) => void, options: { types?: string[]; pollInterval?: number; batchSize?: number; } = {} ): () => void { const pollInterval = options.pollInterval || 5000; // 5 seconds const batchSize = options.batchSize || 10; let lastEventTime = new Date(); let isMonitoring = true; const poll = async () => { if (!isMonitoring) return; try { // Get events since last poll const events = await this.searchEvents({ types: options.types, dateRange: { start: lastEventTime, end: new Date(), }, }, batchSize); // Process new events for (const event of events) { if (new Date(event.createdAt) > lastEventTime) { try { callback(event); } catch (callbackError) { this.logOperation('Error in event monitor callback', { callbackError }); } } } // Update last event time if (events.length > 0) { const latestEvent = events.reduce((latest, event) => new Date(event.createdAt) > new Date(latest.createdAt) ? event : latest ); lastEventTime = new Date(latestEvent.createdAt); } } catch (error) { this.logOperation('Error polling events', { error }); } // Schedule next poll if (isMonitoring) { setTimeout(poll, pollInterval); } }; // Start polling setTimeout(poll, pollInterval); this.logOperation('Started event monitoring', options); // Return stop function return () => { isMonitoring = false; this.logOperation('Stopped event monitoring'); }; } /** * Get event types available in the system * @returns Promise resolving to available event types */ async getEventTypes(): Promise<string[]> { try { this.logOperation('Getting available event types'); // Get recent events to determine available types const events = await this.getRecentEvents(24 * 7, 1000); // Last week, up to 1000 events const types = new Set<string>(); for (const event of events) { types.add(event.type); } const eventTypes = Array.from(types).sort(); this.logOperation('Retrieved event types', { typeCount: eventTypes.length, types: eventTypes, }); return eventTypes; } catch (error) { this.logOperation('Failed to get event types', { error }); throw error; } } /** * Validate event data structure * @param event Event to validate * @returns True if event is valid */ validateEvent(event: any): event is WebsetEvent { if (!event || typeof event !== 'object') { return false; } const requiredFields = ['id', 'type', 'createdAt']; for (const field of requiredFields) { if (!(field in event)) { return false; } } // Validate event type const validTypes = [ 'webset.created', 'webset.deleted', 'webset.paused', 'webset.idle', 'webset.search.created', 'webset.search.canceled', 'webset.search.completed', 'webset.search.updated', 'webset.export.created', 'webset.export.completed', 'webset.item.created', 'webset.item.enriched', ]; if (!validTypes.includes(event.type)) { return false; } // Validate timestamp if (isNaN(Date.parse(event.createdAt))) { return false; } return true; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/waldzellai/exa-mcp-server-websets'

If you have feedback or need assistance with the MCP directory API, please join our Discord server