Skip to main content
Glama

Bucket Feature Flags MCP Server

Official
by reflagcom
sse.ts7.33 kB
import { forgetAuthToken, getAuthToken, rememberAuthToken, } from "./feedback/promptStorage"; import { HttpClient } from "./httpClient"; import { Logger, loggerWithPrefix } from "./logger"; interface AblyTokenDetails { token: string; expires: number; } interface AblyTokenRequest { keyName: string; } const ABLY_TOKEN_ERROR_MIN = 40000; const ABLY_TOKEN_ERROR_MAX = 49999; export class AblySSEChannel { private isOpen: boolean = false; private eventSource: EventSource | null = null; private retryInterval: ReturnType<typeof setInterval> | null = null; private logger: Logger; constructor( private userId: string, private channel: string, private sseBaseUrl: string, private messageHandler: (message: any) => void, private httpClient: HttpClient, logger: Logger, ) { this.logger = loggerWithPrefix(logger, "[SSE]"); if (!this.sseBaseUrl.endsWith("/")) { this.sseBaseUrl += "/"; } } private async refreshTokenRequest() { const params = new URLSearchParams({ userId: this.userId }); const res = await this.httpClient.get({ path: `/feedback/prompting-auth`, params, }); if (res.ok) { const body = await res.json(); if (body.success) { delete body.success; const tokenRequest: AblyTokenRequest = body; this.logger.debug("obtained new token request", tokenRequest); return tokenRequest; } } this.logger.error("server did not release a token request", res); return; } private async refreshToken() { const cached = getAuthToken(this.userId); if (cached && cached.channel === this.channel) { this.logger.debug("using existing token", cached.channel, cached.token); return cached.token; } const tokenRequest = await this.refreshTokenRequest(); if (!tokenRequest) { return; } const url = new URL( `keys/${encodeURIComponent(tokenRequest.keyName)}/requestToken`, this.sseBaseUrl, ); const res = await fetch(url, { method: "post", headers: { "Content-Type": "application/json", }, body: JSON.stringify(tokenRequest), }); if (res.ok) { const details: AblyTokenDetails = await res.json(); this.logger.debug("obtained new token", details); rememberAuthToken( this.userId, this.channel, details.token, new Date(details.expires), ); return details.token; } this.logger.error("server did not release a token"); return; } private async onError(e: Event) { if (e instanceof MessageEvent) { let errorCode: number | undefined; try { const errorPayload = JSON.parse(e.data); errorCode = errorPayload?.code && Number(errorPayload.code); } catch (error: any) { this.logger.warn("received unparsable error message", error, e); } if ( errorCode && errorCode >= ABLY_TOKEN_ERROR_MIN && errorCode <= ABLY_TOKEN_ERROR_MAX ) { this.logger.warn("event source token expired, refresh required"); forgetAuthToken(this.userId); } } else { const connectionState = (e as any)?.target?.readyState; if (connectionState === 2) { this.logger.debug("event source connection closed", e); } else if (connectionState === 1) { this.logger.warn("event source connection failed to open", e); } else { this.logger.warn("event source unexpected error occurred", e); } } this.disconnect(); } private onMessage(e: MessageEvent) { let payload: any; try { if (e.data) { const message = JSON.parse(e.data); if (message.data) { payload = JSON.parse(message.data); } } } catch (error: any) { this.logger.warn("received unparsable message", error, e); return; } if (payload) { this.logger.debug("received message", payload); try { this.messageHandler(payload); } catch (error: any) { this.logger.warn("failed to handle message", error, payload); } return; } this.logger.warn("received invalid message", e); } private onOpen(e: Event) { this.logger.debug("event source connection opened", e); } public async connect() { if (this.isOpen) { this.logger.warn("channel connection already open"); return; } this.isOpen = true; try { const token = await this.refreshToken(); if (!token) return; const url = new URL("sse", this.sseBaseUrl); url.searchParams.append("v", "1.2"); url.searchParams.append("accessToken", token); url.searchParams.append("channels", this.channel); url.searchParams.append("rewind", "1"); this.eventSource = new EventSource(url); this.eventSource.addEventListener("error", (e) => this.onError(e)); this.eventSource.addEventListener("open", (e) => this.onOpen(e)); this.eventSource.addEventListener("message", (m) => this.onMessage(m)); this.logger.debug("channel connection opened"); } finally { this.isOpen = !!this.eventSource; } } public disconnect() { if (!this.isOpen) { this.logger.warn("channel connection already closed"); return; } if (this.eventSource) { this.eventSource.close(); this.eventSource = null; this.logger.debug("channel connection closed"); } this.isOpen = false; } public open(options?: { retryInterval?: number; retryCount?: number }) { const retryInterval = options?.retryInterval ?? 1000 * 30; const retryCount = options?.retryCount ?? 3; let retriesRemaining = retryCount; const tryConnect = async () => { try { await this.connect(); retriesRemaining = retryCount; } catch (e) { if (retriesRemaining > 0) { this.logger.warn( `failed to connect, ${retriesRemaining} retries remaining`, e, ); } else { this.logger.warn(`failed to connect, no retries remaining`, e); } } }; void tryConnect(); this.retryInterval = setInterval(() => { if (!this.isConnected() && this.retryInterval) { if (retriesRemaining <= 0) { clearInterval(this.retryInterval); this.retryInterval = null; return; } retriesRemaining--; void tryConnect(); } }, retryInterval); } public close() { if (this.retryInterval) { clearInterval(this.retryInterval); this.retryInterval = null; } this.disconnect(); } public isActive() { return !!this.retryInterval; } public isConnected() { return this.isOpen && !!this.eventSource; } } export function openAblySSEChannel({ userId, channel, callback, httpClient, sseBaseUrl, logger, }: { userId: string; channel: string; callback: (req: object) => void; httpClient: HttpClient; logger: Logger; sseBaseUrl: string; }) { const sse = new AblySSEChannel( userId, channel, sseBaseUrl, callback, httpClient, logger, ); sse.open(); return sse; } export function closeAblySSEChannel(channel: AblySSEChannel) { channel.close(); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/reflagcom/bucket-javascript-sdk'

If you have feedback or need assistance with the MCP directory API, please join our Discord server