Skip to main content
Glama

Telegram MCP Server

mcp-server.js14.5 kB
import http from "http"; import { randomUUID } from "crypto"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"; import dotenv from "dotenv"; import { z } from "zod"; import TelegramClient from "./telegram-client.js"; import MessageSyncService from "./message-sync-service.js"; dotenv.config(); const HOST = process.env.MCP_HOST ?? process.env.FASTMCP_HOST ?? "127.0.0.1"; const PORT = Number(process.env.MCP_PORT ?? process.env.FASTMCP_PORT ?? "8080"); const telegramClient = new TelegramClient( process.env.TELEGRAM_API_ID, process.env.TELEGRAM_API_HASH, process.env.TELEGRAM_PHONE_NUMBER, "./data/session.json", ); const messageSyncService = new MessageSyncService(telegramClient, { dbPath: "./data/messages.db", batchSize: 100, interJobDelayMs: 3000, interBatchDelayMs: 1200, }); let telegramReady = false; async function initializeTelegram() { if (telegramReady) return; console.log("[startup] Initializing Telegram dialogs..."); const dialogsReady = await telegramClient.initializeDialogCache(); if (!dialogsReady) { throw new Error("Failed to initialize Telegram dialog list"); } messageSyncService.resumePendingJobs(); telegramReady = true; } /** * Represents an active MCP session – a transport plus its server instance. */ const sessions = new Map(); const listChannelsSchema = { limit: z.number().int().positive().optional().describe("Maximum number of channels to return (default: 50)"), }; const searchChannelsSchema = { keywords: z .string() .min(1) .describe("Keywords to search for in channel titles or usernames"), limit: z.number().int().positive().optional().describe("Maximum number of results to return (default: 100)"), }; const getChannelMessagesSchema = { channelId: z .union([ z.number({ invalid_type_error: "channelId must be a number" }), z.string({ invalid_type_error: "channelId must be a string" }).min(1), ]) .describe("Numeric channel ID or username"), limit: z.number().int().positive().optional().describe("Maximum number of messages to return (default: 100)"), filterPattern: z .string() .optional() .describe("Optional regex to filter message content"), }; const scheduleMessageSyncSchema = { channelId: z .union([ z.number({ invalid_type_error: "channelId must be a number" }), z.string({ invalid_type_error: "channelId must be a string" }).min(1), ]) .describe("Numeric channel ID or username"), depth: z .number({ invalid_type_error: "depth must be a number" }) .int() .positive() .max(50000) .optional() .describe("Maximum messages to retain per channel (default 1000)"), }; const searchSyncedMessagesSchema = { channelId: z .union([ z.number({ invalid_type_error: "channelId must be a number" }), z.string({ invalid_type_error: "channelId must be a string" }).min(1), ]) .describe("Numeric channel ID or username"), pattern: z .string({ invalid_type_error: "pattern must be a string" }) .min(1) .describe("Regular expression used to match message text"), limit: z .number({ invalid_type_error: "limit must be a number" }) .int() .positive() .max(200) .optional() .describe("Maximum number of matches to return (default 50)"), caseInsensitive: z .boolean({ invalid_type_error: "caseInsensitive must be a boolean" }) .optional() .describe("Whether the pattern should be case-insensitive (default true)"), }; function createServerInstance() { const server = new McpServer({ name: "example-mcp-server", version: "1.0.0", }); server.tool( "listChannels", "Lists available Telegram dialogs for the authenticated account.", listChannelsSchema, async ({ limit }) => { await telegramClient.ensureLogin(); const dialogs = await telegramClient.listDialogs(limit ?? 50); return { content: [ { type: "text", text: JSON.stringify(dialogs, null, 2), }, ], }; }, ); server.tool( "searchChannels", "Searches dialogs by title or username.", searchChannelsSchema, async ({ keywords, limit }) => { await telegramClient.ensureLogin(); const matches = await telegramClient.searchDialogs(keywords, limit ?? 100); return { content: [ { type: "text", text: JSON.stringify(matches, null, 2), }, ], }; }, ); server.tool( "getChannelMessages", "Retrieves recent messages for a channel by numeric ID or username.", getChannelMessagesSchema, async ({ channelId, limit, filterPattern }) => { await telegramClient.ensureLogin(); const { peerTitle, messages } = await telegramClient.getMessagesByChannelId( channelId, limit ?? 100, ); let formatted = messages.map((msg) => ({ id: msg.id, date: msg.date ? new Date(msg.date * 1000).toISOString() : "unknown", text: msg.text ?? msg.message ?? "", from_id: msg.from_id ?? "unknown", })); if (filterPattern) { try { const regex = new RegExp(filterPattern); formatted = formatted.filter((msg) => msg.text && regex.test(msg.text)); } catch (error) { throw new Error(`Invalid filterPattern: ${error.message}`); } } return { content: [ { type: "text", text: JSON.stringify( { peerTitle, totalFetched: messages.length, returned: formatted.length, messages: formatted, }, null, 2, ), }, ], }; }, ); server.tool( "scheduleMessageSync", "Schedules a background job to archive channel messages locally.", scheduleMessageSyncSchema, async ({ channelId, depth }) => { await telegramClient.ensureLogin(); const job = messageSyncService.addJob(channelId, { depth }); void messageSyncService.processQueue(); return { content: [ { type: "text", text: JSON.stringify(job, null, 2), }, ], }; }, ); server.tool( "searchSyncedMessages", "Searches stored messages for a channel using a regular expression.", searchSyncedMessagesSchema, async ({ channelId, pattern, limit, caseInsensitive }) => { const results = messageSyncService.searchMessages({ channelId, pattern, limit, caseInsensitive, }); return { content: [ { type: "text", text: JSON.stringify(results, null, 2), }, ], }; }, ); server.tool( "getSyncedMessageStats", "Returns summary statistics for stored messages in a channel.", { channelId: z .union([ z.number({ invalid_type_error: "channelId must be a number" }), z.string({ invalid_type_error: "channelId must be a string" }).min(1), ]) .describe("Numeric channel ID or username"), }, async ({ channelId }) => { const stats = messageSyncService.getMessageStats(channelId); return { content: [ { type: "text", text: JSON.stringify(stats, null, 2), }, ], }; }, ); server.tool( "listMessageSyncJobs", "Lists tracked message sync jobs and their current status.", {}, async () => { const jobs = messageSyncService.listJobs(); return { content: [ { type: "text", text: JSON.stringify(jobs, null, 2), }, ], }; }, ); return server; } async function ensureSession(req, res, body) { const sessionId = req.headers["mcp-session-id"]; if (sessionId && typeof sessionId === "string") { const existing = sessions.get(sessionId); if (existing) { return existing; } res.writeHead(404, { "Content-Type": "application/json" }).end( JSON.stringify({ jsonrpc: "2.0", error: { code: -32001, message: "Session not found", }, id: null, }), ); return null; } if (!isInitializeRequest(body)) { res.writeHead(400, { "Content-Type": "application/json" }).end( JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: "Bad Request: No valid session ID provided", }, id: null, }), ); return null; } const record = { server: null, transport: null, sessionId: null }; const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), onsessioninitialized: (sessionId) => { record.sessionId = sessionId; sessions.set(sessionId, record); }, onsessionclosed: (sessionId) => { const existing = sessions.get(sessionId); if (existing) { void existing.server?.close().catch((error) => { console.error(`[server] error closing session ${sessionId}: ${error.message}`); }); } sessions.delete(sessionId); }, }); record.transport = transport; transport.onerror = (error) => { console.error(`[transport] error: ${error.message}`); }; transport.onclose = () => { if (record.sessionId) { sessions.delete(record.sessionId); } void record.server?.close().catch((error) => { console.error(`[server] error closing transport session: ${error.message}`); }); }; const serverInstance = createServerInstance(); record.server = serverInstance; await serverInstance.connect(transport); return record; } function readBody(req) { return new Promise((resolve, reject) => { const chunks = []; req .on("data", (chunk) => chunks.push(chunk)) .on("end", () => { try { const raw = Buffer.concat(chunks).toString("utf8"); resolve(raw.length ? JSON.parse(raw) : {}); } catch (error) { reject(error); } }) .on("error", (error) => reject(error)); }); } async function handlePost(req, res) { const body = await readBody(req); const sessionRecord = await ensureSession(req, res, body); if (!sessionRecord) { return; } try { await sessionRecord.transport.handleRequest(req, res, body); } catch (error) { console.error(`[http] POST handling failed: ${error?.message ?? error}`); if (!res.headersSent) { res.writeHead(500, { "Content-Type": "application/json" }).end( JSON.stringify({ jsonrpc: "2.0", error: { code: -32603, message: "Internal server error", }, id: null, }), ); } } } async function handleSessionRequest(req, res) { const sessionIdHeader = req.headers["mcp-session-id"]; if (!sessionIdHeader || typeof sessionIdHeader !== "string") { res.writeHead(400, { "Content-Type": "application/json" }).end( JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: "Invalid or missing session ID", }, id: null, }), ); return; } const record = sessions.get(sessionIdHeader); if (!record) { res.writeHead(404, { "Content-Type": "application/json" }).end( JSON.stringify({ jsonrpc: "2.0", error: { code: -32001, message: "Session not found", }, id: null, }), ); return; } await record.transport.handleRequest(req, res); } await initializeTelegram().catch((error) => { console.error(`[startup] Telegram initialization failed: ${error?.message ?? error}`); process.exit(1); }); const server = http.createServer(async (req, res) => { try { const url = new URL(req.url ?? "", `http://${req.headers.host ?? `${HOST}:${PORT}`}`); if (req.method === "OPTIONS") { res.writeHead(204).end(); return; } if (req.method === "GET" && url.pathname === "/health") { res.writeHead(200, { "Content-Type": "application/json" }).end( JSON.stringify({ status: "ok" }), ); return; } if (req.method === "POST" && url.pathname === "/mcp") { await handlePost(req, res); return; } if ((req.method === "GET" || req.method === "DELETE") && url.pathname === "/mcp") { await handleSessionRequest(req, res); return; } if (req.method === "POST") { res.writeHead(404, { "Content-Type": "application/json" }).end( JSON.stringify({ jsonrpc: "2.0", error: { code: -32601, message: "Endpoint not found", }, id: null, }), ); return; } res.writeHead(405, { Allow: "GET, POST, DELETE" }).end(); } catch (error) { console.error(`[http] unexpected error: ${error?.message ?? error}`); if (!res.headersSent) { res.writeHead(500, { "Content-Type": "application/json" }).end( JSON.stringify({ jsonrpc: "2.0", error: { code: -32603, message: "Internal server error", }, id: null, }), ); } } }); server.listen(PORT, HOST, () => { console.log(`[startup] MCP HTTP server listening on http://${HOST}:${PORT}/mcp`); }); server.on("error", (error) => { console.error(`[http] server error: ${error.message}`); }); async function shutdown() { console.log("[shutdown] received termination signal, closing resources..."); server.closeAllConnections?.(); server.close(() => { console.log("[shutdown] HTTP server closed"); }); try { await messageSyncService.shutdown(); } catch (error) { console.error(`[shutdown] error while stopping message sync: ${error?.message ?? error}`); } try { await telegramClient.destroy(); } catch (error) { console.error(`[shutdown] error while closing Telegram client: ${error?.message ?? error}`); } } process.on("SIGINT", () => { void shutdown().finally(() => process.exit(0)); }); process.on("SIGTERM", () => { void shutdown().finally(() => process.exit(0)); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kfastov/telegram-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server