Skip to main content
Glama
ticker.js22.2 kB
"use strict"; var __importDefault = (this && this.__importDefault) || function (mod) { return (mod && mod.__esModule) ? mod : { "default": mod }; }; Object.defineProperty(exports, "__esModule", { value: true }); exports.KiteTicker = void 0; const ws_1 = __importDefault(require("ws")); const utils_1 = __importDefault(require("./utils")); /** * Read timeout duration in seconds. Default: 5 seconds. * @type {number} */ let read_timeout = 5; /** * Maximum delay for reconnection attempts. Default: 0 (no delay). * @type {number} */ let reconnect_max_delay = 0; /** * Maximum number of reconnection attempts. Default: 0 (no retries). * @type {number} */ let reconnect_max_tries = 0; /** * Outgoing message flags. * @type {string} */ let mSubscribe = 'subscribe', mUnSubscribe = 'unsubscribe', mSetMode = 'mode'; /** * Public constants. * @type {string} */ const modeFull = 'full', modeQuote = 'quote', modeLTP = 'ltp'; /** * WebSocket connection instance. * @type {(WebSocket | null)} */ let ws = null; /** * Event triggers and their associated callbacks. * @type {Object} */ let triggers = { 'connect': [], 'ticks': [], 'disconnect': [], 'error': [], 'close': [], 'reconnect': [], 'noreconnect': [], 'message': [], 'order_update': [] }; /** * Timer for reading data. * @type {any} */ let read_timer = null; /** * Timestamp of the last read operation. * @type {any} */ let last_read = 0; /** * Flag indicating whether auto-reconnect is enabled. * @type {boolean} */ let auto_reconnect = false; /** * Flag to control reconnection behavior. * @type {boolean} */ let should_reconnect = true; /** * Current count of reconnection attempts. * @type {number} */ let current_reconnection_count = 0; /** * Last interval used for reconnecting. * @type {any} */ let last_reconnect_interval = 0; /** * Current WebSocket URL in use. * @type {string} */ let current_ws_url = ''; /** * Default maximum delay for reconnection attempts in seconds. * @type {number} */ const defaultReconnectMaxDelay = 60; /** * Default maximum number of reconnection attempts. * @type {number} */ const defaultReconnectMaxRetries = 50; /** * Maximum allowed value for the number of reconnection attempts. * @type {number} */ const maximumReconnectMaxRetries = 300; /** * Minimum allowed value for the maximum delay for reconnection attempts in seconds. * @type {number} */ const minimumReconnectMaxDelay = 5; // segment constants /** * Constants representing different market segments. * @type {number} */ const NseCM = 1, NseFO = 2, NseCD = 3, BseCM = 4, BseFO = 5, BseCD = 6, McxFO = 7, McxSX = 8, Indices = 9; /** * @classdesc * Ticker client class. The WebSocket client for connecting to Kite connect streaming quotes service. * * Getting started: * --------------------------- * * ~~~~ * import { KiteTicker } from "kiteconnect"; * * const apiKey = 'your_api_key'; * const accessToken = 'generated_access_token'; * * const ticker = new KiteTicker({ * api_key: apiKey, * access_token: accessToken * }); * * ticker.connect(); * ticker.on('ticks', onTicks); * ticker.on('connect', subscribe); * ticker.on('disconnect', onDisconnect); * ticker.on('error', onError); * ticker.on('close', onClose); * ticker.on('order_update', onTrade); * * function onTicks(ticks: any[]): void { * console.log("Ticks", ticks); * } * * function subscribe(): void { * const tokens = [738561, 256265]; * ticker.subscribe(tokens); * ticker.setMode(ticker.modeFull, tokens); * } * * function onDisconnect(error: Error): void { * console.log("Closed connection on disconnect", error); * } * * function onError(error: Error): void { * console.log("Closed connection on error", error); * } * * function onClose(reason: string): void { * console.log("Closed connection on close", reason); * } * * function onTrade(order: any): void { * console.log("Order update", order); * } * ~~~~ * * ------------- * ~~~~ * [{ * tradable: true, * mode: 'full', * instrument_token: 738561, * last_price: 2940.7, * last_traded_quantity: 1, * average_traded_price: 2933.55, * volume_traded: 2827705, * total_buy_quantity: 213779, * total_sell_quantity: 425119, * ohlc: { open: 2915, high: 2949, low: 2910.35, close: 2913.35 }, * change: 0.9387818147493404, * last_trade_time: 2024-06-12T07:16:09.000Z, * exchange_timestamp: 2024-06-12T07:16:09.000Z, * oi: 0, * oi_day_high: 0, * oi_day_low: 0, * depth: { buy: [Array], sell: [Array] } * }, * { * tradable: false, * mode: 'full', * instrument_token: 256265, * last_price: 23406.85, * ohlc: { high: 23441.95, low: 23295.95, open: 23344.45, close: 23264.85 }, * change: 0.6103628435171514, * exchange_timestamp: 2024-06-12T07:16:09.000Z * } * ] * ~~~~ * * Auto reconnection is enabled by default and it can be disabled by passing `reconnect` param while initialising `KiteTicker`. Auto reonnection mechanism is based on [Exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) algorithm in which * next retry interval will be increased exponentially. `max_delay` and `max_tries` params can be used to tweak * the alogrithm where `max_delay` is the maximum delay after which subsequent reconnection interval will become constant and * `max_tries` is maximum number of retries before it quits reconnection. * For example if `max_delay` is 60 seconds and `max_tries` is 50 then the first reconnection interval starts from * minimum interval which is 2 seconds and keep increasing up to 60 seconds after which it becomes constant and when reconnection attempt * is reached upto 50 then it stops reconnecting. * Callback `reconnect` will be called with current reconnect attempt and next reconnect interval and * `on_noreconnect` is called when reconnection attempts reaches max retries. * * Here is an example demonstrating auto reconnection. * ------------- * ~~~~ * import { KiteTicker } from "kiteconnect"; * * const apiKey = 'your_api_key'; * const accessToken = 'generated_access_token'; * const ticker = new KiteTicker({ * api_key: 'api_key', * access_token: 'access_token' * }) * ticker.autoReconnect(true, 10, 5); * ticker.connect(); * ticker.on('ticks', onTicks); * ticker.on('connect', subscribe); * ticker.on('noreconnect', () => { * console.log('noreconnect') * }) * ticker.on('reconnect', (reconnect_count:any, reconnect_interval:any) => { * console.log('Reconnecting: attempt - ', reconnect_count, ' interval - ', reconnect_interval) * }) * * function onTicks(ticks: any[]) { * console.log('Ticks', ticks) * } * * function subscribe() { * const items = [738561] * ticker.subscribe(items) * ticker.setMode(ticker.modeFull, items) * } * ~~~~ * * @constructor * @name KiteTicker * @param {Object} params * @param {string} params.api_key API key issued you. * @param {string} params.access_token Access token obtained after successful login flow. * @param {bool} [params.reconnect] Enable/Disable auto reconnect. Enabled by default. * @param {number} [params.max_retry=50] is maximum number re-connection attempts. Defaults to 50 attempts and maximum up to 300 attempts. * @param {number} [params.max_delay=60] in seconds is the maximum delay after which subsequent re-connection interval will become constant. Defaults to 60s and minimum acceptable value is 5s. * @param {string} [params.root='wss://websocket.kite.trade/'] Kite websocket root. */ class KiteTicker { /** * Creates an instance of KiteTicker. * * @constructor * @param {KiteTickerParams} params */ constructor(params) { this.root = params.root || 'wss://ws.kite.trade/'; this.api_key = params.api_key; this.access_token = params.access_token; this.modeFull = modeFull; this.modeQuote = modeQuote; this.modeLTP = modeLTP; // Set reconnect to true for undefined if (params.reconnect === undefined) { params.reconnect = true; } this.autoReconnect(params.reconnect, params.max_retry, params.max_delay); } /** * @param {bool} t * @param {number} [max_retry=50] * @param {number} [max_delay=60] */ autoReconnect(t, max_retry, max_delay) { auto_reconnect = t; // Set default values max_retry = max_retry || defaultReconnectMaxRetries; max_delay = max_delay || defaultReconnectMaxDelay; // Set reconnect constraints reconnect_max_tries = max_retry >= maximumReconnectMaxRetries ? maximumReconnectMaxRetries : max_retry; reconnect_max_delay = max_delay <= minimumReconnectMaxDelay ? minimumReconnectMaxDelay : max_delay; } /** * Establishes a WebSocket connection to the server. * * This method creates a WebSocket connection using the provided credentials and options. * If a connection is already established or in the process of being established, this method does nothing. * * @returns {void} */ connect() { // Skip if its already connected if (ws && (ws.readyState === ws.CONNECTING || ws.readyState === ws.OPEN)) return; const url = this.root + '?api_key=' + this.api_key + '&access_token=' + this.access_token + '&uid=' + (new Date().getTime().toString()); ws = new ws_1.default(url, { headers: { 'X-Kite-Version': '3', 'User-Agent': utils_1.default.getUserAgent() } }); // Set binaryType to arraybuffer ws.binaryType = 'arraybuffer'; ws.onopen = () => { // Reset last reconnect interval last_reconnect_interval = null; // Reset current_reconnection_count attempt current_reconnection_count = 0; // Store current open connection url to check for auto re-connection. if (!current_ws_url) current_ws_url = url; // Trigger on connect event trigger('connect'); // If there isn't an incoming message in n seconds, assume disconnection. clearInterval(read_timer); last_read = new Date(); read_timer = setInterval(() => { // @ts-ignore if ((new Date() - last_read) / 1000 >= read_timeout) { // reset current_ws_url incase current connection times out // This is determined when last heart beat received time interval // exceeds read_timeout value current_ws_url = ''; if (ws) ws.close(); clearInterval(read_timer); this.triggerDisconnect(); } }, read_timeout * 1000); }; ws.onmessage = function (e) { // Binary tick data. if (e.data instanceof ArrayBuffer) { // Trigger on message event when binary message is received trigger('message', [e.data]); if (e.data.byteLength > 2) { const d = parseBinary(e.data); if (d) trigger('ticks', [d]); } } else { parseTextMessage(e.data); } // Set last read time to check for connection timeout last_read = new Date(); }; ws.onerror = function (e) { trigger('error', [e]); // Force close to avoid ghost connections if (this && this.readyState == this.OPEN) this.close(); }; ws.onclose = (e) => { trigger('close', [e]); // the ws id doesn't match the current global id, // meaning it's a ghost close event. just ignore. if (current_ws_url && (url != current_ws_url)) return; this.triggerDisconnect(e); }; } attemptReconnection() { // Try reconnecting only so many times. // Or if reconnection is not allowed if ((current_reconnection_count > reconnect_max_tries) || !should_reconnect) { trigger('noreconnect'); process.exit(1); } if (current_reconnection_count > 0) { last_reconnect_interval = Math.pow(2, current_reconnection_count); } else if (!last_reconnect_interval) { last_reconnect_interval = 1; } if (last_reconnect_interval > reconnect_max_delay) { last_reconnect_interval = reconnect_max_delay; } current_reconnection_count++; trigger('reconnect', [current_reconnection_count, last_reconnect_interval]); setTimeout(() => { this.connect(); }, last_reconnect_interval * 1000); } /** * @param {?WebSocket.CloseEvent} [e] * @returns {void} */ triggerDisconnect(e) { ws = null; trigger('disconnect', [e]); if (auto_reconnect) this.attemptReconnection(); } /** * This method closes the WebSocket connection if it is currently open. * It checks the readyState to ensure that the connection is not * already in the process of closing or closed. */ disconnect() { if (ws && ws.readyState !== ws_1.default.CLOSING && ws.readyState !== ws_1.default.CLOSED) { // Stop reconnection mechanism should_reconnect = false; // Close and clear the ws object ws.close(); ws = null; } } /** * Checks if the WebSocket connection is currently open. * * This method returns a boolean value indicating whether a WebSocket connection is currently open. * * @returns {boolean} A boolean value indicating whether the WebSocket connection is open. */ connected() { return (ws !== null && ws.readyState === ws.OPEN); } /** * * @param {string} e * @param {Function} callback */ on(e, callback) { if (triggers.hasOwnProperty(e)) { triggers[e].push(callback); } } ; /** * * @param {(string[] | number[])} tokens * @returns {{}} */ subscribe(tokens) { if (tokens.length > 0) { send({ 'a': mSubscribe, 'v': tokens }); } return tokens; } ; /** * * @param {(string[] | number[])} tokens * @returns {{}} */ unsubscribe(tokens) { if (tokens.length > 0) { send({ 'a': mUnSubscribe, 'v': tokens }); } return tokens; } ; /** * * @param {string} mode * @param {(string[] | number[])} tokens * @returns {{}} */ setMode(mode, tokens) { if (tokens.length > 0) { send({ 'a': mSetMode, 'v': [mode, tokens] }); } return tokens; } ; /** * * * @param {ArrayBuffer} binpacks * @returns {{}} */ parseBinary(binpacks) { return parseBinary(binpacks); } } exports.KiteTicker = KiteTicker; // send a message via the socket // automatically encodes json if possible /** * @param {(AnyObject | string)} message */ function send(message) { if (!ws || ws.readyState != ws.OPEN) return; try { if (typeof (message) == 'object') { message = JSON.stringify(message); } ws.send(message); } catch (e) { ws.close(); } ; } // trigger event callbacks /** * @param {string} e * @param {?any[]} [args] * @returns {void} */ function trigger(e, args) { if (!triggers[e]) return; for (let n = 0; n < triggers[e].length; n++) { triggers[e][n].apply(triggers[e][n], args ? args : []); } } /** * @param {(string | AnyObject)} data */ function parseTextMessage(data) { try { data = JSON.parse(data); } catch (e) { return; } if (data.type === 'order') { trigger('order_update', [data.data]); } } // parse received binary message. each message is a combination of multiple tick packets // [2-bytes num packets][size1][tick1][size2][tick2] ... /** * @param {ArrayBuffer} binpacks * @returns {Tick[]} */ function parseBinary(binpacks) { const packets = splitPackets(binpacks), ticks = []; for (let n = 0; n < packets.length; n++) { const bin = packets[n], instrument_token = buf2long(bin.slice(0, 4)), segment = instrument_token & 0xff; let tradable = true; if (segment === Indices) tradable = false; // Add price divisor based on segment let divisor = 100.0; if (segment === NseCD) { divisor = 10000000.0; } else if (segment == BseCD) { divisor = 10000.0; } // Parse LTP if (bin.byteLength === 8) { ticks.push({ tradable: tradable, mode: modeLTP, instrument_token, last_price: buf2long(bin.slice(4, 8)) / divisor }); // Parse indices quote and full mode } else if (bin.byteLength === 28 || bin.byteLength === 32) { let mode = modeQuote; if (bin.byteLength === 32) mode = modeFull; const tick = { tradable, mode, instrument_token, last_price: buf2long(bin.slice(4, 8)) / divisor, ohlc: { high: buf2long(bin.slice(8, 12)) / divisor, low: buf2long(bin.slice(12, 16)) / divisor, open: buf2long(bin.slice(16, 20)) / divisor, close: buf2long(bin.slice(20, 24)) / divisor }, change: buf2long(bin.slice(24, 28)) }; // Compute the change price using close price and last price if (tick.ohlc.close != 0) { tick.change = (tick.last_price - tick.ohlc.close) * 100 / tick.ohlc.close; } // Full mode with timestamp in seconds if (bin.byteLength === 32) { tick.exchange_timestamp = null; const timestamp = buf2long(bin.slice(28, 32)); if (timestamp) tick.exchange_timestamp = new Date(timestamp * 1000); } ticks.push(tick); } else if (bin.byteLength === 44 || bin.byteLength === 184) { let mode = modeQuote; if (bin.byteLength === 184) mode = modeFull; const tick = { tradable, mode, instrument_token, last_price: buf2long(bin.slice(4, 8)) / divisor, last_traded_quantity: buf2long(bin.slice(8, 12)), average_traded_price: buf2long(bin.slice(12, 16)) / divisor, volume_traded: buf2long(bin.slice(16, 20)), total_buy_quantity: buf2long(bin.slice(20, 24)), total_sell_quantity: buf2long(bin.slice(24, 28)), ohlc: { open: buf2long(bin.slice(28, 32)) / divisor, high: buf2long(bin.slice(32, 36)) / divisor, low: buf2long(bin.slice(36, 40)) / divisor, close: buf2long(bin.slice(40, 44)) / divisor }, // To be computed later change: 0 }; // Compute the change price using close price and last price if (tick.ohlc.close != 0) { tick.change = (tick.last_price - tick.ohlc.close) * 100 / tick.ohlc.close; } // Parse full mode if (bin.byteLength === 184) { // Parse last trade time tick.last_trade_time = null; const last_trade_time = buf2long(bin.slice(44, 48)); if (last_trade_time) tick.last_trade_time = new Date(last_trade_time * 1000); // Parse timestamp tick.exchange_timestamp = null; const timestamp = buf2long(bin.slice(60, 64)); if (timestamp) tick.exchange_timestamp = new Date(timestamp * 1000); // Parse OI tick.oi = buf2long(bin.slice(48, 52)); tick.oi_day_high = buf2long(bin.slice(52, 56)); tick.oi_day_low = buf2long(bin.slice(56, 60)); tick.depth = { buy: [], sell: [] }; let s = 0, depth = bin.slice(64, 184); for (let i = 0; i < 10; i++) { s = i * 12; tick.depth[i < 5 ? 'buy' : 'sell'].push({ quantity: buf2long(depth.slice(s, s + 4)), price: buf2long(depth.slice(s + 4, s + 8)) / divisor, orders: buf2long(depth.slice(s + 8, s + 10)) }); } } ticks.push(tick); } } return ticks; } // split one long binary message into individual tick packets /** * @param {ArrayBuffer} bin * @returns {{}} */ function splitPackets(bin) { // number of packets let num = buf2long(bin.slice(0, 2)), j = 2, packets = []; for (let i = 0; i < num; i++) { // first two bytes is the packet length const size = buf2long(bin.slice(j, j + 2)), packet = bin.slice(j + 2, j + 2 + size); packets.push(packet); j += 2 + size; } return packets; } // Big endian byte array to long. /** * @param {ArrayBuffer} buf * @returns {number} */ function buf2long(buf) { let b = new Uint8Array(buf), val = 0, len = b.length; for (let i = 0, j = len - 1; i < len; i++, j--) { val += b[j] << (i * 8); } return val; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yogendhra9/ZerodhaMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server