Skip to main content
Glama

Telegram MCP Server

message-sync-service.js13.1 kB
import Database from 'better-sqlite3'; import path from 'path'; import fs from 'fs'; import { setTimeout as delay } from 'timers/promises'; import { normalizeChannelId } from './telegram-client.js'; const DEFAULT_DB_PATH = './data/messages.db'; const DEFAULT_TARGET_MESSAGES = 1000; const JOB_STATUS = { PENDING: 'pending', IN_PROGRESS: 'in_progress', IDLE: 'idle', ERROR: 'error', }; export default class MessageSyncService { constructor(telegramClient, options = {}) { this.telegramClient = telegramClient; this.dbPath = path.resolve(options.dbPath || DEFAULT_DB_PATH); this.batchSize = options.batchSize || 100; this.interJobDelayMs = options.interJobDelayMs || 3000; this.interBatchDelayMs = options.interBatchDelayMs || 1000; this.processing = false; this.stopRequested = false; this._initDatabase(); } _initDatabase() { const dir = path.dirname(this.dbPath); if (!fs.existsSync(dir)) { fs.mkdirSync(dir, { recursive: true }); } this.db = new Database(this.dbPath); this.db.pragma('journal_mode = WAL'); this.db.exec(` CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, channel_id TEXT NOT NULL UNIQUE, peer_title TEXT, peer_type TEXT, status TEXT NOT NULL DEFAULT '${JOB_STATUS.PENDING}', last_message_id INTEGER DEFAULT 0, oldest_message_id INTEGER, target_message_count INTEGER DEFAULT ${DEFAULT_TARGET_MESSAGES}, message_count INTEGER DEFAULT 0, last_synced_at TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, error TEXT ); `); this._ensureJobColumn('oldest_message_id', 'INTEGER'); this._ensureJobColumn('target_message_count', `INTEGER DEFAULT ${DEFAULT_TARGET_MESSAGES}`); this._ensureJobColumn('message_count', 'INTEGER DEFAULT 0'); this.db.exec(` CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, channel_id TEXT NOT NULL, message_id INTEGER NOT NULL, date INTEGER, from_id TEXT, text TEXT, raw_json TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, UNIQUE(channel_id, message_id) ); `); this.insertMessageStmt = this.db.prepare(` INSERT OR IGNORE INTO messages (channel_id, message_id, date, from_id, text, raw_json) VALUES (@channel_id, @message_id, @date, @from_id, @text, @raw_json) `); this.insertMessagesTx = this.db.transaction((records) => { for (const record of records) { this.insertMessageStmt.run(record); } }); } _ensureJobColumn(column, definition) { const existing = this.db.prepare("PRAGMA table_info(jobs)").all(); if (!existing.some((col) => col.name === column)) { this.db.exec(`ALTER TABLE jobs ADD COLUMN ${column} ${definition}`); } } addJob(channelId, options = {}) { const normalizedId = String(normalizeChannelId(channelId)); const target = options.depth && options.depth > 0 ? Number(options.depth) : DEFAULT_TARGET_MESSAGES; const stmt = this.db.prepare(` INSERT INTO jobs (channel_id, status, error, target_message_count, updated_at) VALUES (?, '${JOB_STATUS.PENDING}', NULL, ?, CURRENT_TIMESTAMP) ON CONFLICT(channel_id) DO UPDATE SET status='${JOB_STATUS.PENDING}', error=NULL, target_message_count=?, updated_at=CURRENT_TIMESTAMP RETURNING *; `); return stmt.get(normalizedId, target, target); } listJobs() { return this.db.prepare(` SELECT id, channel_id, peer_title, peer_type, status, last_message_id, oldest_message_id, target_message_count, message_count, last_synced_at, created_at, updated_at, error FROM jobs ORDER BY updated_at DESC `).all(); } async processQueue() { if (this.processing) { return; } if (this.stopRequested) { return; } this.processing = true; try { while (true) { if (this.stopRequested) { break; } const job = this._getNextJob(); if (!job) { break; } await this._processJob(job); await delay(this.interJobDelayMs); } } finally { this.processing = false; } } resumePendingJobs() { void this.processQueue(); } async shutdown() { this.stopRequested = true; while (this.processing) { await delay(100); } if (this.db && this.db.open) { this.db.close(); } } _getNextJob() { return this.db.prepare(` SELECT * FROM jobs WHERE status IN ('${JOB_STATUS.PENDING}', '${JOB_STATUS.IN_PROGRESS}') ORDER BY updated_at ASC LIMIT 1 `).get(); } searchMessages({ channelId, pattern, limit = 50, caseInsensitive = true }) { const normalizedId = String(normalizeChannelId(channelId)); const flags = caseInsensitive ? "i" : ""; let regex; try { regex = new RegExp(pattern, flags); } catch (error) { throw new Error(`Invalid pattern: ${error.message}`); } const rows = this.db.prepare(` SELECT message_id, date, from_id, text FROM messages WHERE channel_id = ? ORDER BY message_id DESC `).all(normalizedId); const matches = []; for (const row of rows) { const text = row.text || ""; if (regex.test(text)) { matches.push({ messageId: row.message_id, date: row.date ? new Date(row.date * 1000).toISOString() : null, fromId: row.from_id, text, }); if (matches.length >= limit) { break; } } } return matches; } getMessageStats(channelId) { const normalizedId = String(normalizeChannelId(channelId)); const summary = this.db.prepare(` SELECT COUNT(*) AS total, MIN(message_id) AS oldestMessageId, MAX(message_id) AS newestMessageId, MIN(date) AS oldestDate, MAX(date) AS newestDate FROM messages WHERE channel_id = ? `).get(normalizedId); return { total: summary.total || 0, oldestMessageId: summary.oldestMessageId || null, newestMessageId: summary.newestMessageId || null, oldestDate: summary.oldestDate ? new Date(summary.oldestDate * 1000).toISOString() : null, newestDate: summary.newestDate ? new Date(summary.newestDate * 1000).toISOString() : null, }; } async _processJob(job) { this._updateJobStatus(job.id, JOB_STATUS.IN_PROGRESS); try { const newerDetails = await this._syncNewerMessages(job); const backfillResult = await this._backfillHistory( job, newerDetails.totalMessages, newerDetails.targetCount, newerDetails.lastMessageId, ); const finalCount = backfillResult.finalCount; const finalOldest = backfillResult.oldestMessageId ?? newerDetails.oldestMessageId ?? job.oldest_message_id; const finalLatest = newerDetails.lastMessageId ?? job.last_message_id ?? 0; const shouldContinue = newerDetails.hasMoreNewer || backfillResult.hasMoreOlder; const finalStatus = shouldContinue ? JOB_STATUS.PENDING : JOB_STATUS.IDLE; this._updateJobRecord(job.id, { status: finalStatus, peerTitle: newerDetails.peerTitle, peerType: newerDetails.peerType, lastMessageId: finalLatest, oldestMessageId: finalOldest, messageCount: finalCount, targetCount: newerDetails.targetCount, }); } catch (error) { const waitMatch = /wait of (\d+) seconds is required/i.exec(error.message || ""); if (waitMatch) { const waitSeconds = Number(waitMatch[1]); this.db.prepare(` UPDATE jobs SET status = ?, error = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? `).run(JOB_STATUS.PENDING, `Rate limited, waiting ${waitSeconds}s`, job.id); await delay(waitSeconds * 1000); } else { this._markJobError(job.id, error); } } } _updateJobStatus(id, status) { this.db.prepare(` UPDATE jobs SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? `).run(status, id); } _updateJobRecord(id, { status, peerTitle, peerType, lastMessageId, oldestMessageId, messageCount, targetCount, }) { this.db.prepare(` UPDATE jobs SET status = ?, peer_title = ?, peer_type = ?, last_message_id = ?, oldest_message_id = ?, message_count = ?, target_message_count = ?, last_synced_at = CURRENT_TIMESTAMP, error = NULL, updated_at = CURRENT_TIMESTAMP WHERE id = ? `).run( status, peerTitle, peerType, lastMessageId ?? 0, oldestMessageId ?? null, messageCount ?? 0, targetCount ?? DEFAULT_TARGET_MESSAGES, id, ); } _markJobError(id, error) { this.db.prepare(` UPDATE jobs SET status = ?, error = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? `).run(JOB_STATUS.ERROR, error.message || String(error), id); } _countMessages(channelId) { return this.db.prepare(` SELECT COUNT(*) AS cnt FROM messages WHERE channel_id = ? `).get(String(channelId)).cnt; } async _syncNewerMessages(job) { const minId = job.last_message_id || 0; const { peerTitle, peerType, messages } = await this.telegramClient.getMessagesByChannelId(job.channel_id, this.batchSize, { minId }); const newMessages = messages .filter((msg) => msg.id > minId) .sort((a, b) => a.id - b.id); let lastMessageId = job.last_message_id || 0; let oldestMessageId = job.oldest_message_id || null; if (newMessages.length) { const records = newMessages.map((msg) => ({ channel_id: String(job.channel_id), message_id: msg.id, date: msg.date ?? null, from_id: msg.from_id ?? null, text: msg.text ?? null, raw_json: JSON.stringify(msg), })); this.insertMessagesTx(records); lastMessageId = newMessages[newMessages.length - 1].id; oldestMessageId = oldestMessageId ? Math.min(oldestMessageId, newMessages[0].id) : newMessages[0].id; } const totalMessages = this._countMessages(job.channel_id); return { peerTitle, peerType, lastMessageId, oldestMessageId, totalMessages, targetCount: job.target_message_count || DEFAULT_TARGET_MESSAGES, hasMoreNewer: newMessages.length >= this.batchSize, }; } async _backfillHistory(job, currentCount, targetCount, newestMessageId) { if (currentCount >= targetCount) { return { finalCount: currentCount, oldestMessageId: job.oldest_message_id ?? null, hasMoreOlder: false, insertedCount: 0, }; } const peer = await this.telegramClient.client.resolvePeer( normalizeChannelId(job.channel_id), ); let total = currentCount; let currentOldest = job.oldest_message_id ?? null; let insertedCount = 0; let nextOffsetId = job.oldest_message_id ?? newestMessageId ?? job.last_message_id ?? 0; while (total < targetCount) { if (!nextOffsetId || nextOffsetId <= 1) { break; } const chunkLimit = Math.min(this.batchSize, targetCount - total); const iterator = this.telegramClient.client.iterHistory(peer, { limit: chunkLimit, chunkSize: chunkLimit, reverse: false, offset: { id: nextOffsetId, date: 0 }, addOffset: 0, }); const records = []; let lowestIdInChunk = null; let chunkCount = 0; for await (const message of iterator) { const serialized = this.telegramClient._serializeMessage(message, peer); records.push({ channel_id: String(job.channel_id), message_id: serialized.id, date: serialized.date ?? null, from_id: serialized.from_id ?? null, text: serialized.text ?? null, raw_json: JSON.stringify(serialized), }); lowestIdInChunk = lowestIdInChunk === null ? serialized.id : Math.min(lowestIdInChunk, serialized.id); currentOldest = currentOldest ? Math.min(currentOldest, serialized.id) : serialized.id; chunkCount += 1; } if (!records.length) { break; } this.insertMessagesTx(records); total += chunkCount; insertedCount += chunkCount; nextOffsetId = lowestIdInChunk ?? nextOffsetId; if (total >= targetCount) { break; } await delay(this.interBatchDelayMs); } return { finalCount: this._countMessages(job.channel_id), oldestMessageId: currentOldest, hasMoreOlder: insertedCount > 0 && total < targetCount, insertedCount, }; } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kfastov/telegram-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server