Skip to main content
Glama
app.pyโ€ข16 kB
import os import asyncio import discord from discord.ext import commands from fastapi import FastAPI from fastmcp import FastMCP from fastmcp.server.auth import BearerAuthProvider from fastmcp.server.auth.providers.bearer import RSAKeyPair from fastmcp.server.dependencies import get_access_token import uvicorn from dotenv import load_dotenv import logging from datetime import datetime import jwt # Load environment variables load_dotenv() # -------------------------------- # Setup Audit Logging # -------------------------------- logging.basicConfig( filename="audit.log", level=logging.INFO, format="%(message)s" ) def log_audit(tool: str, user_id: str, args: dict, success: bool, error: str = None): entry = { "timestamp": datetime.utcnow().isoformat(), "tool": tool, "user_id": user_id, "args": args, "success": success } if error: entry["error"] = error logging.info(entry) # Helper function to create a temporary bot connection async def create_temp_bot(token: str) -> commands.Bot: """Create a temporary bot instance for a single operation""" intents = discord.Intents.default() intents.message_content = True bot = commands.Bot(command_prefix="!", intents=intents) # Start the bot and wait for it to be ready await bot.start(token) return bot async def cleanup_bot(bot: commands.Bot): """Clean up a temporary bot instance""" if bot and not bot.is_closed(): await bot.close() def get_discord_token() -> str: """Extract Discord token from authenticated JWT""" access_token = get_access_token() # Try to get discord_token from additional_claims if available if hasattr(access_token, 'additional_claims') and access_token.additional_claims: discord_token = access_token.additional_claims.get("discord_token") if discord_token: return discord_token # Fallback: decode the JWT directly to get claims try: decoded = jwt.decode(access_token.token, options={"verify_signature": False}) discord_token = decoded.get("discord_token") if discord_token: return discord_token except: pass raise ValueError("No Discord token found in authentication") # -------------------------------- # Setup Authentication # -------------------------------- # Generate RSA key pair for development (use external IdP for production) key_pair = RSAKeyPair.generate() # Configure Bearer Token authentication auth = BearerAuthProvider( public_key=key_pair.public_key, issuer="https://discord-mcp-server.local", audience="discord-mcp-server" ) # -------------------------------- # Set up FastAPI + FastMCP with Authentication # -------------------------------- app = FastAPI() mcp = FastMCP(name="Discord MCP Server", auth=auth) # -------------------------------- # Token Generation Utility # -------------------------------- def generate_access_token(discord_token: str, user_id: str = "discord-user") -> str: """Generate a JWT access token containing the Discord token""" return key_pair.create_token( subject=user_id, issuer="https://discord-mcp-server.local", audience="discord-mcp-server", additional_claims={ "discord_token": discord_token, "permissions": ["discord:read", "discord:write", "discord:moderate"] }, expires_in_seconds=3600 # 1 hour ) # -------------------------------- # MCP Tool: send_message # -------------------------------- @mcp.tool async def send_message(channel_id: str, message: str) -> dict: """ Send a message to a specific Discord channel. Args: channel_id (str): The Discord channel ID to send the message to message (str): The message content to send Returns: dict: Success response with message details or error message """ bot = None try: # Get Discord token from authenticated JWT access_token = get_access_token() discord_token = get_discord_token() tool_name = "send_message" args = {"channel_id": channel_id, "message": message} bot = await create_temp_bot(discord_token) channel = bot.get_channel(int(channel_id)) or await bot.fetch_channel(int(channel_id)) sent = await channel.send(message) result = { "status": "success", "message_id": sent.id, "channel_id": sent.channel.id, "timestamp": sent.created_at.isoformat() } log_audit(tool_name, access_token.client_id, args, success=True) return result except ValueError as e: return {"error": str(e)} except Exception as e: log_audit(tool_name, access_token.client_id if 'access_token' in locals() else "unknown", args if 'args' in locals() else {}, success=False, error=str(e)) return {"error": str(e)} finally: if bot: await cleanup_bot(bot) @mcp.tool async def get_messages(channel_id: str, limit: int = 10) -> dict: """ Fetches the last `limit` messages from a Discord channel. Args: channel_id (str): The channel to fetch messages from. limit (int): Number of messages to fetch (default: 10). Returns: dict: A list of message dictionaries or an error message. """ bot = None try: # Get Discord token from authenticated JWT access_token = get_access_token() discord_token = get_discord_token() tool_name = "get_messages" args = {"channel_id": channel_id, "limit": limit} bot = await create_temp_bot(discord_token) channel = bot.get_channel(int(channel_id)) or await bot.fetch_channel(int(channel_id)) # Check if it's a text channel if not isinstance(channel, (discord.TextChannel, discord.Thread)): return {"error": "Invalid channel type. Must be a text or thread channel."} # Retrieve messages (returns async iterator) messages = await channel.history(limit=limit).flatten() # Build structured output result = { "messages": [ { "id": str(msg.id), "author": str(msg.author), "content": msg.content, "timestamp": msg.created_at.isoformat() } for msg in messages ] } log_audit(tool_name, access_token.client_id, args, success=True) return result except discord.Forbidden: log_audit(tool_name, access_token.client_id if 'access_token' in locals() else "unknown", args, success=False, error="Missing permission to read messages in that channel.") return {"error": "Missing permission to read messages in that channel."} except discord.HTTPException as e: log_audit(tool_name, access_token.client_id if 'access_token' in locals() else "unknown", args, success=False, error=f"Discord API error: {str(e)}") return {"error": f"Discord API error: {str(e)}"} except Exception as e: log_audit(tool_name, access_token.client_id if 'access_token' in locals() else "unknown", args, success=False, error=f"Unexpected error: {str(e)}") return {"error": f"Unexpected error: {str(e)}"} finally: if bot: await cleanup_bot(bot) @mcp.tool async def get_channel_info(channel_id: str) -> dict: """ Retrieves metadata about a given Discord channel. Args: channel_id (str): The channel ID to query. Returns: dict: Channel metadata or error message. """ bot = None try: # Get Discord token from authenticated JWT access_token = get_access_token() discord_token = get_discord_token() tool_name = "get_channel_info" args = {"channel_id": channel_id} bot = await create_temp_bot(discord_token) channel = bot.get_channel(int(channel_id)) or await bot.fetch_channel(int(channel_id)) # Determine type channel_type = ( "text" if isinstance(channel, discord.TextChannel) else "voice" if isinstance(channel, discord.VoiceChannel) else "thread" if isinstance(channel, discord.Thread) else "category" if isinstance(channel, discord.CategoryChannel) else "unknown" ) # Build metadata info = { "id": str(channel.id), "name": channel.name, "type": channel_type, "guild_id": str(channel.guild.id), "guild_name": channel.guild.name, "position": channel.position, "created_at": channel.created_at.isoformat(), } # Optional fields (text-only) if isinstance(channel, discord.TextChannel): info.update({ "topic": channel.topic, "nsfw": channel.is_nsfw(), "slowmode_delay": channel.slowmode_delay }) log_audit(tool_name, access_token.client_id, args, success=True) return info except discord.NotFound: log_audit(tool_name, access_token.client_id if 'access_token' in locals() else "unknown", args, success=False, error="Channel not found.") return {"error": "Channel not found."} except discord.Forbidden: log_audit(tool_name, access_token.client_id if 'access_token' in locals() else "unknown", args, success=False, error="Bot doesn't have permission to access this channel.") return {"error": "Bot doesn't have permission to access this channel."} except discord.HTTPException as e: log_audit(tool_name, access_token.client_id if 'access_token' in locals() else "unknown", args, success=False, error=f"Discord API error: {str(e)}") return {"error": f"Discord API error: {str(e)}"} except Exception as e: log_audit(tool_name, access_token.client_id if 'access_token' in locals() else "unknown", args, success=False, error=f"Unexpected error: {str(e)}") return {"error": f"Unexpected error: {str(e)}"} finally: if bot: await cleanup_bot(bot) @mcp.tool async def search_messages(channel_id: str, query: str, limit: int = 20) -> dict: """ Searches recent messages in a Discord channel containing a keyword. Args: channel_id (str): ID of the Discord channel. query (str): Keyword to search in message content. limit (int): Max messages to return (default: 20). Returns: dict: List of matched messages or error. """ bot = None try: # Get Discord token from authenticated JWT access_token = get_access_token() discord_token = get_discord_token() tool_name = "search_messages" args = {"channel_id": channel_id, "query": query, "limit": limit} bot = await create_temp_bot(discord_token) channel = bot.get_channel(int(channel_id)) or await bot.fetch_channel(int(channel_id)) if not isinstance(channel, (discord.TextChannel, discord.Thread)): return {"error": "Invalid channel type. Must be a text or thread channel."} history = await channel.history(limit=100).flatten() # Fetch more to search through matches = [ { "id": str(msg.id), "author": str(msg.author), "content": msg.content, "timestamp": msg.created_at.isoformat() } for msg in history if query.lower() in msg.content.lower() ] log_audit(tool_name, access_token.client_id, args, success=True) return {"matches": matches[:limit]} except discord.Forbidden: log_audit(tool_name, access_token.client_id if 'access_token' in locals() else "unknown", args, success=False, error="Bot lacks permission to read messages in this channel.") return {"error": "Bot lacks permission to read messages in this channel."} except discord.HTTPException as e: log_audit(tool_name, access_token.client_id if 'access_token' in locals() else "unknown", args, success=False, error=f"Discord API error: {str(e)}") return {"error": f"Discord API error: {str(e)}"} except Exception as e: log_audit(tool_name, access_token.client_id if 'access_token' in locals() else "unknown", args, success=False, error=f"Unexpected error: {str(e)}") return {"error": f"Unexpected error: {str(e)}"} finally: if bot: await cleanup_bot(bot) @mcp.tool async def moderate_content(channel_id: str, message_ids: list[str]) -> dict: """ Deletes specific messages in a channel. Args: channel_id (str): The ID of the channel. message_ids (list[str]): List of message IDs to delete. Returns: dict: Status of deleted messages or error. """ bot = None try: # Get Discord token from authenticated JWT access_token = get_access_token() discord_token = get_discord_token() tool_name = "moderate_content" args = {"channel_id": channel_id, "message_ids": message_ids} deleted = [] failed = [] bot = await create_temp_bot(discord_token) channel = bot.get_channel(int(channel_id)) or await bot.fetch_channel(int(channel_id)) for mid in message_ids: try: msg = await channel.fetch_message(int(mid)) await msg.delete() deleted.append(mid) except discord.NotFound: failed.append({"id": mid, "error": "Not found"}) except discord.Forbidden: failed.append({"id": mid, "error": "Permission denied"}) except discord.HTTPException as e: failed.append({"id": mid, "error": str(e)}) log_audit(tool_name, access_token.client_id, args, success=True) return { "status": "partial" if failed else "success", "deleted": deleted, "failed": failed } except discord.Forbidden: log_audit(tool_name, access_token.client_id if 'access_token' in locals() else "unknown", args, success=False, error="Bot lacks permission to manage messages in this channel.") return {"error": "Bot lacks permission to manage messages in this channel."} except discord.HTTPException as e: log_audit(tool_name, access_token.client_id if 'access_token' in locals() else "unknown", args, success=False, error=f"Discord API error: {str(e)}") return {"error": f"Discord API error: {str(e)}"} except Exception as e: log_audit(tool_name, access_token.client_id if 'access_token' in locals() else "unknown", args, success=False, error=f"Unexpected error: {str(e)}") return {"error": f"Unexpected error: {str(e)}"} finally: if bot: await cleanup_bot(bot) # -------------------------------- # Start MCP Server # -------------------------------- def start(): """Start the MCP server with authentication""" print("๐Ÿš€ Starting Authenticated Discord MCP Server...") print("๐Ÿ” Authentication: Bearer Token (JWT)") print("๐Ÿ“ Available tools:") print(" - send_message: Send messages to Discord channels") print(" - get_messages: Fetch recent messages from channels") print(" - get_channel_info: Get channel metadata") print(" - search_messages: Search for messages containing keywords") print(" - moderate_content: Delete specific messages") print("๐Ÿ”— Server will run on http://localhost:8000") print("\n๐Ÿ”‘ To generate an access token, use:") print(" python -c \"from app import generate_access_token; print(generate_access_token('YOUR_DISCORD_BOT_TOKEN'))\"") uvicorn.run(app, host="0.0.0.0", port=8000) if __name__ == "__main__": start()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/henishshah18/Discord_MCP_Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server