import WebSocket from "ws";
export interface Subscription {
id: string;
type: string;
channel: string;
assets?: string[];
markets?: string[];
callbackType: "notification" | "log";
createdAt: string;
eventsReceived: number;
lastEventAt: string | null;
}
const CLOB_WS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market";
const INITIAL_RECONNECT_DELAY = 1000;
const MAX_RECONNECT_DELAY = 60000;
class WebSocketManager {
private ws: WebSocket | null = null;
private subscriptions = new Map<string, Subscription>();
private reconnectAttempts = 0;
private shouldRun = false;
private totalEvents = 0;
private eventsByType: Record<string, number> = {};
private connectionErrors = 0;
private reconnectCount = 0;
private messageBuffer: Array<{ subscription_id: string; data: unknown }> = [];
private maxBuffer = 200;
connect(): void {
if (this.ws && this.ws.readyState === WebSocket.OPEN) return;
this.shouldRun = true;
this._connect();
}
private _connect(): void {
try {
this.ws = new WebSocket(CLOB_WS_URL);
this.ws.on("open", () => {
this.reconnectAttempts = 0;
// Resubscribe all
for (const sub of this.subscriptions.values()) {
this._sendSubscription(sub);
}
});
this.ws.on("message", (raw) => {
try {
const data = JSON.parse(raw.toString());
this._handleMessage(data);
} catch {
// ignore parse errors
}
});
this.ws.on("close", () => {
if (this.shouldRun) this._reconnect();
});
this.ws.on("error", () => {
this.connectionErrors++;
});
} catch {
this.connectionErrors++;
if (this.shouldRun) this._reconnect();
}
}
private _reconnect(): void {
this.reconnectCount++;
const delay = Math.min(
INITIAL_RECONNECT_DELAY * 2 ** this.reconnectAttempts,
MAX_RECONNECT_DELAY,
);
this.reconnectAttempts++;
setTimeout(() => {
if (this.shouldRun) this._connect();
}, delay);
}
disconnect(): void {
this.shouldRun = false;
if (this.ws) {
this.ws.close();
this.ws = null;
}
}
subscribe(params: {
type: string;
assets?: string[];
markets?: string[];
callbackType?: "notification" | "log";
}): string {
const id = crypto.randomUUID();
const sub: Subscription = {
id,
type: params.type,
channel: "market",
assets: params.assets,
markets: params.markets,
callbackType: params.callbackType ?? "notification",
createdAt: new Date().toISOString(),
eventsReceived: 0,
lastEventAt: null,
};
this.subscriptions.set(id, sub);
// Ensure connected
this.connect();
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this._sendSubscription(sub);
}
return id;
}
private _sendSubscription(sub: Subscription): void {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;
const msg: Record<string, unknown> = {
type: sub.type,
};
if (sub.assets) msg.assets_id = sub.assets;
if (sub.markets) msg.markets = sub.markets;
this.ws.send(JSON.stringify(msg));
}
unsubscribe(subscriptionId: string): boolean {
return this.subscriptions.delete(subscriptionId);
}
private _handleMessage(data: any): void {
const eventType = data.event_type ?? data.type ?? "unknown";
this.totalEvents++;
this.eventsByType[eventType] = (this.eventsByType[eventType] ?? 0) + 1;
// Match to subscriptions and buffer
for (const sub of this.subscriptions.values()) {
let matches = true;
if (sub.assets && data.asset_id) {
matches = sub.assets.includes(data.asset_id);
}
if (sub.markets && data.market) {
matches = matches && sub.markets.includes(data.market);
}
if (matches) {
sub.eventsReceived++;
sub.lastEventAt = new Date().toISOString();
if (this.messageBuffer.length < this.maxBuffer) {
this.messageBuffer.push({ subscription_id: sub.id, data });
}
}
}
}
getBufferedMessages(subscriptionId?: string): Array<{ subscription_id: string; data: unknown }> {
if (subscriptionId) {
const msgs = this.messageBuffer.filter((m) => m.subscription_id === subscriptionId);
this.messageBuffer = this.messageBuffer.filter((m) => m.subscription_id !== subscriptionId);
return msgs;
}
const msgs = [...this.messageBuffer];
this.messageBuffer = [];
return msgs;
}
getStatus(): Record<string, unknown> {
return {
connected: this.ws?.readyState === WebSocket.OPEN,
subscriptions: {
total: this.subscriptions.size,
active: Array.from(this.subscriptions.values()).map((s) => ({
id: s.id,
type: s.type,
createdAt: s.createdAt,
eventsReceived: s.eventsReceived,
lastEventAt: s.lastEventAt,
})),
},
statistics: {
totalEvents: this.totalEvents,
eventsByType: this.eventsByType,
connectionErrors: this.connectionErrors,
reconnectCount: this.reconnectCount,
bufferedMessages: this.messageBuffer.length,
},
};
}
}
export const wsManager = new WebSocketManager();