Skip to main content
Glama

Telegram Bot MCP

by SmartManoj
webhook_server.py11.7 kB
#!/usr/bin/env python3 """ Telegram Bot Webhook Server using FastAPI This server handles Telegram webhooks and integrates with the MCP server for production deployment. """ import os import asyncio import logging from datetime import datetime from typing import Dict, Any, Optional from fastapi import FastAPI, Request, HTTPException, BackgroundTasks from fastapi.responses import JSONResponse import uvicorn from telegram import Bot, Update from telegram.ext import Application from dotenv import load_dotenv import httpx # Import our bot logic from bot_runner import TelegramBotRunner # Load environment variables load_dotenv() # Configure logging logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # Configuration TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "your-secret-key") SERVER_HOST = os.getenv("SERVER_HOST", "0.0.0.0") SERVER_PORT = int(os.getenv("SERVER_PORT", 8000)) MCP_PORT = int(os.getenv("MCP_PORT", 8001)) if not TELEGRAM_BOT_TOKEN: raise ValueError("TELEGRAM_BOT_TOKEN environment variable is required") # Initialize FastAPI app app = FastAPI( title="Telegram Bot MCP Webhook Server", description="Production webhook server for Telegram bot with MCP integration", version="1.0.0" ) # Global instances bot_runner: Optional[TelegramBotRunner] = None application: Optional[Application] = None mcp_client: Optional[httpx.AsyncClient] = None @app.on_event("startup") async def startup_event(): """Initialize the bot and MCP client on startup""" global bot_runner, application, mcp_client logger.info("Starting webhook server...") # Initialize bot runner bot_runner = TelegramBotRunner() await bot_runner.initialize() application = bot_runner.application # Initialize MCP client mcp_client = httpx.AsyncClient( base_url=f"http://localhost:{MCP_PORT}", timeout=httpx.Timeout(30.0) ) # Initialize the application await application.initialize() logger.info("Webhook server initialized successfully") @app.on_event("shutdown") async def shutdown_event(): """Clean up on shutdown""" global application, mcp_client logger.info("Shutting down webhook server...") if application: await application.shutdown() if mcp_client: await mcp_client.aclose() logger.info("Webhook server shutdown complete") @app.get("/") async def root(): """Root endpoint with server information""" return { "service": "Telegram Bot MCP Webhook Server", "version": "1.0.0", "status": "running", "timestamp": datetime.now().isoformat(), "endpoints": { "webhook": "/webhook", "health": "/health", "bot_info": "/bot/info", "mcp_status": "/mcp/status" } } @app.get("/health") async def health_check(): """Health check endpoint""" try: # Check bot status bot_status = "ok" if application and application.bot else "error" # Check MCP status mcp_status = "unknown" if mcp_client: try: response = await mcp_client.get("/health", timeout=5.0) mcp_status = "ok" if response.status_code == 200 else "error" except: mcp_status = "offline" return { "status": "healthy" if bot_status == "ok" else "unhealthy", "timestamp": datetime.now().isoformat(), "services": { "telegram_bot": bot_status, "mcp_server": mcp_status } } except Exception as e: logger.error(f"Health check failed: {e}") raise HTTPException(status_code=500, detail="Health check failed") @app.post("/webhook") async def webhook(request: Request, background_tasks: BackgroundTasks): """Handle Telegram webhook updates""" try: # Get the update data update_data = await request.json() # Verify webhook (optional, but recommended for production) # You can implement signature verification here # Create Update object update = Update.de_json(update_data, application.bot) if update: # Process update in background background_tasks.add_task(process_update, update) return JSONResponse( content={"status": "ok", "message": "Update received"}, status_code=200 ) else: logger.warning("Received invalid update data") return JSONResponse( content={"status": "error", "message": "Invalid update"}, status_code=400 ) except Exception as e: logger.error(f"Webhook error: {e}") raise HTTPException(status_code=500, detail="Webhook processing failed") async def process_update(update: Update): """Process a Telegram update""" try: # Process the update through the application await application.process_update(update) # Log the update for monitoring if update.message: user_id = update.message.from_user.id if update.message.from_user else "unknown" text = update.message.text or "non-text message" logger.info(f"Processed message from user {user_id}: {text[:50]}...") except Exception as e: logger.error(f"Error processing update: {e}") @app.get("/bot/info") async def get_bot_info(): """Get bot information""" try: if not application or not application.bot: raise HTTPException(status_code=503, detail="Bot not initialized") me = await application.bot.get_me() return { "bot_info": { "id": me.id, "username": me.username, "first_name": me.first_name, "is_bot": me.is_bot, "can_join_groups": me.can_join_groups, "can_read_all_group_messages": me.can_read_all_group_messages, "supports_inline_queries": me.supports_inline_queries }, "webhook_info": await get_webhook_info() } except Exception as e: logger.error(f"Error getting bot info: {e}") raise HTTPException(status_code=500, detail=str(e)) async def get_webhook_info(): """Get webhook information""" try: webhook_info = await application.bot.get_webhook_info() return { "url": webhook_info.url, "has_custom_certificate": webhook_info.has_custom_certificate, "pending_update_count": webhook_info.pending_update_count, "last_error_date": webhook_info.last_error_date.isoformat() if webhook_info.last_error_date else None, "last_error_message": webhook_info.last_error_message, "max_connections": webhook_info.max_connections, "allowed_updates": webhook_info.allowed_updates } except Exception as e: logger.warning(f"Could not get webhook info: {e}") return {"error": str(e)} @app.get("/mcp/status") async def get_mcp_status(): """Get MCP server status""" try: if not mcp_client: return {"status": "not_initialized"} # Try to ping the MCP server response = await mcp_client.get("/", timeout=5.0) if response.status_code == 200: return { "status": "connected", "server_url": str(mcp_client.base_url), "response_time": response.elapsed.total_seconds() } else: return { "status": "error", "status_code": response.status_code } except httpx.TimeoutException: return {"status": "timeout"} except httpx.ConnectError: return {"status": "connection_failed"} except Exception as e: return {"status": "error", "details": str(e)} @app.post("/mcp/tool/{tool_name}") async def call_mcp_tool(tool_name: str, request: Request): """Proxy tool calls to MCP server""" try: if not mcp_client: raise HTTPException(status_code=503, detail="MCP client not initialized") # Forward the request to MCP server data = await request.json() response = await mcp_client.post(f"/tools/{tool_name}", json=data) return response.json() except Exception as e: logger.error(f"MCP tool call error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/admin/set_webhook") async def set_webhook(webhook_url: str, secret_token: Optional[str] = None): """Set the webhook URL for the bot (admin endpoint)""" try: if not application or not application.bot: raise HTTPException(status_code=503, detail="Bot not initialized") await application.bot.set_webhook( url=webhook_url, secret_token=secret_token or WEBHOOK_SECRET ) return { "status": "success", "message": f"Webhook set to: {webhook_url}", "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Error setting webhook: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.delete("/admin/delete_webhook") async def delete_webhook(): """Delete the webhook (admin endpoint)""" try: if not application or not application.bot: raise HTTPException(status_code=503, detail="Bot not initialized") await application.bot.delete_webhook() return { "status": "success", "message": "Webhook deleted", "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Error deleting webhook: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/stats") async def get_stats(): """Get server and bot statistics""" try: # Get basic stats from bot_runner if available stats = { "server": { "uptime": "unknown", # You could track this "requests_processed": "unknown", "timestamp": datetime.now().isoformat() }, "bot": { "status": "running" if application else "stopped" }, "mcp": await get_mcp_status() } return stats except Exception as e: logger.error(f"Error getting stats: {e}") raise HTTPException(status_code=500, detail=str(e)) # Main execution if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Telegram Bot Webhook Server") parser.add_argument("--host", default=SERVER_HOST, help="Server host") parser.add_argument("--port", type=int, default=SERVER_PORT, help="Server port") parser.add_argument("--reload", action="store_true", help="Enable auto-reload") parser.add_argument("--log-level", default="info", help="Log level") args = parser.parse_args() logger.info(f"Starting webhook server on {args.host}:{args.port}") uvicorn.run( "webhook_server:app", host=args.host, port=args.port, reload=args.reload, log_level=args.log_level, access_log=True )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SmartManoj/Telegram-Bot-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server