Skip to main content
Glama
service.py27.1 kB
""" Slack Bot API Service using Official Slack SDK A wrapper around the official slack-sdk for MCP integration """ from slack_sdk.web.async_client import AsyncWebClient from slack_sdk.errors import SlackApiError from typing import Dict, List, Optional, Union, Any from dataclasses import dataclass import logging import httpx import io import os, urllib.parse from urllib.parse import urlparse from pathlib import Path from src.services.slack.schemas.slack import SlackResponse from fastmcp import Context from src.core.storeage_manager import TokenStorageManager from src.middleware.auth import extract_user_from_context from fastmcp.server.dependencies import get_context from src.utils.env_handler import SLACK_REDIRECT_URI as ENV_SLACK_REDIRECT_URI, SLACK_CLIENT_ID as ENV_SLACK_CLIENT_ID from src.utils.config_handler import load_config storageManager = TokenStorageManager() logger = logging.getLogger(__name__) class SlackBotAPIService: @classmethod async def from_context(cls, ctx: Context): storageManager = TokenStorageManager() jwt_client_id = extract_user_from_context(ctx) # If there is no client id (JWT missing), instruct to add JWT token if not jwt_client_id: await ctx.error("Missing JWT token. Please add JWT token.") return { "requires_auth": True, "error": "Missing JWT token. Please add JWT token.", "message": "Add a valid JWT token in the Authorization header: 'Bearer <token>'" } token_data = storageManager.read_token(jwt_client_id) access_token = token_data.get("access_token") if token_data else None if not access_token: await ctx.info("No valid Slack OAuth token found. Generating OAuth URL for authorization.") # Use the existing get_oauth_url method oauth_data = await cls.get_oauth_url() # Add the requires_auth flag to indicate authentication is needed oauth_data["requires_auth"] = True return oauth_data return cls(access_token) @classmethod async def get_oauth_url(cls): config = load_config().get("slack", {}) slack_redirect_uri = ENV_SLACK_REDIRECT_URI slack_client_id = ENV_SLACK_CLIENT_ID ctx = get_context() jwt_payload = ctx.get_state("jwt_payload") if ctx else None jwt_client_id = jwt_payload.get('sub') if jwt_payload else "" await ctx.info("Generating OAuth URL") scopes = config.get("scopes", []) scope_string = ",".join(scopes) oauth_url = ( f"https://slack.com/oauth/v2/authorize?" f"client_id={slack_client_id}&" f"scope={scope_string}&" f"redirect_uri={urllib.parse.quote(slack_redirect_uri)}&" f"state=client_id:{jwt_client_id}" ) return { "oauth_url": oauth_url, "instructions": "Visit this URL to authorize the application with your Slack workspace", "callback_url": slack_redirect_uri, "scopes": scopes, "state": f"client_id:{jwt_client_id}" } """ Slack Bot API Service using Official Slack SDK Wrapper around AsyncWebClient for MCP integration """ def __init__(self, bot_token: str): """ Initialize with bot token using official Slack SDK Args: bot_token: Slack bot token (xoxb-...) """ if not bot_token.startswith('xoxb-'): raise ValueError("Bot token must start with 'xoxb-'") self.bot_token = bot_token self.client = AsyncWebClient(token=bot_token) def _handle_response(self, response) -> SlackResponse: """Convert Slack SDK response to our standard format""" return SlackResponse( ok=response.get("ok", False), data=response.data, error=response.get("error"), warning=response.get("warning") ) async def _safe_api_call(self, method_name: str, **kwargs) -> SlackResponse: """Safely call Slack API with error handling""" try: method = getattr(self.client, method_name) response = await method(**kwargs) return self._handle_response(response) except SlackApiError as e: logger.error(f"Slack API Error in {method_name}: {e.response['error']}") return SlackResponse( ok=False, data=e.response, error=e.response["error"] ) except Exception as e: logger.error(f"Unexpected error in {method_name}: {str(e)}") return SlackResponse( ok=False, data={}, error=str(e) ) def _is_url(self, path: str) -> bool: """Check if the given string is a URL""" try: result = urlparse(path) return all([result.scheme, result.netloc]) except: return False def _is_file_path(self, path: str) -> bool: """Check if the given string is a valid file path""" return os.path.isfile(path) or Path(path).exists() async def send_message( self, channel: str, text: Optional[str] = None, blocks: Optional[List[Dict]] = None, attachments: Optional[List[Dict]] = None, thread_ts: Optional[str] = None, username: Optional[str] = None, icon_emoji: Optional[str] = None, icon_url: Optional[str] = None ) -> SlackResponse: """Send a message to a channel""" kwargs = {"channel": channel} if text: kwargs["text"] = text if blocks: kwargs["blocks"] = blocks if attachments: kwargs["attachments"] = attachments if thread_ts: kwargs["thread_ts"] = thread_ts if username: kwargs["username"] = username if icon_emoji: kwargs["icon_emoji"] = icon_emoji if icon_url: kwargs["icon_url"] = icon_url return await self._safe_api_call("chat_postMessage", **kwargs) async def update_message( self, channel: str, ts: str, text: Optional[str] = None, blocks: Optional[List[Dict]] = None, attachments: Optional[List[Dict]] = None ) -> SlackResponse: """Update an existing message""" kwargs = {"channel": channel, "ts": ts} if text: kwargs["text"] = text if blocks: kwargs["blocks"] = blocks if attachments: kwargs["attachments"] = attachments return await self._safe_api_call("chat_update", **kwargs) async def delete_message(self, channel: str, ts: str) -> SlackResponse: """Delete a message""" return await self._safe_api_call("chat_delete", channel=channel, ts=ts) async def schedule_message( self, channel: str, post_at: int, text: Optional[str] = None, blocks: Optional[List[Dict]] = None, attachments: Optional[List[Dict]] = None ) -> SlackResponse: """Schedule a message to be sent later""" kwargs = {"channel": channel, "post_at": post_at} if text: kwargs["text"] = text if blocks: kwargs["blocks"] = blocks if attachments: kwargs["attachments"] = attachments return await self._safe_api_call("chat_scheduleMessage", **kwargs) # ==================== CHANNELS ==================== async def list_channels( self, exclude_archived: bool = True, limit: int = 100, cursor: Optional[str] = None, types: str = "public_channel,private_channel" ) -> SlackResponse: """List all channels""" kwargs = { "exclude_archived": exclude_archived, "limit": limit, "types": types } if cursor: kwargs["cursor"] = cursor return await self._safe_api_call("conversations_list", **kwargs) async def get_channel_info(self, channel: str) -> SlackResponse: """Get information about a channel""" return await self._safe_api_call("conversations_info", channel=channel) async def create_channel( self, name: str, is_private: bool = False ) -> SlackResponse: """Create a new channel""" return await self._safe_api_call("conversations_create", name=name, is_private=is_private) async def join_channel(self, channel: str) -> SlackResponse: """Join a channel""" return await self._safe_api_call("conversations_join", channel=channel) async def leave_channel(self, channel: str) -> SlackResponse: """Leave a channel""" return await self._safe_api_call("conversations_leave", channel=channel) async def invite_to_channel(self, channel: str, users: Union[str, List[str]]) -> SlackResponse: """Invite users to a channel""" if isinstance(users, list): users = ",".join(users) return await self._safe_api_call("conversations_invite", channel=channel, users=users) async def kick_from_channel(self, channel: str, user: str) -> SlackResponse: """Remove a user from a channel""" return await self._safe_api_call("conversations_kick", channel=channel, user=user) async def set_channel_topic(self, channel: str, topic: str) -> SlackResponse: """Set channel topic""" return await self._safe_api_call("conversations_setTopic", channel=channel, topic=topic) async def set_channel_purpose(self, channel: str, purpose: str) -> SlackResponse: """Set channel purpose""" return await self._safe_api_call("conversations_setPurpose", channel=channel, purpose=purpose) async def archive_channel(self, channel: str) -> SlackResponse: """Archive a channel""" return await self._safe_api_call("conversations_archive", channel=channel) async def unarchive_channel(self, channel: str) -> SlackResponse: """Unarchive a channel""" return await self._safe_api_call("conversations_unarchive", channel=channel) # ==================== CHANNEL HISTORY ==================== async def get_channel_history( self, channel: str, limit: int = 100, cursor: Optional[str] = None, latest: Optional[str] = None, oldest: Optional[str] = None ) -> SlackResponse: """Get channel message history""" kwargs = {"channel": channel, "limit": limit} if cursor: kwargs["cursor"] = cursor if latest: kwargs["latest"] = latest if oldest: kwargs["oldest"] = oldest return await self._safe_api_call("conversations_history", **kwargs) async def get_thread_replies( self, channel: str, ts: str, limit: int = 100, cursor: Optional[str] = None ) -> SlackResponse: """Get replies to a threaded message""" kwargs = {"channel": channel, "ts": ts, "limit": limit} if cursor: kwargs["cursor"] = cursor return await self._safe_api_call("conversations_replies", **kwargs) # ==================== USERS ==================== async def list_users( self, limit: int = 100, cursor: Optional[str] = None ) -> SlackResponse: """List all users in workspace""" kwargs = {"limit": limit} if cursor: kwargs["cursor"] = cursor return await self._safe_api_call("users_list", **kwargs) async def get_user_info(self, user: str) -> SlackResponse: """Get information about a user""" return await self._safe_api_call("users_info", user=user) async def get_user_profile(self, user: str) -> SlackResponse: """Get user's profile information""" return await self._safe_api_call("users_profile_get", user=user) async def set_user_presence(self, presence: str) -> SlackResponse: """Set bot's presence (auto or away)""" return await self._safe_api_call("users_setPresence", presence=presence) # ==================== FILES (DYNAMIC UPLOAD) ==================== async def upload_file( self, channels: Union[str, List[str]], file_source: str, filename: Optional[str] = None, title: Optional[str] = None, initial_comment: Optional[str] = None, thread_ts: Optional[str] = None ) -> SlackResponse: """ Dynamic file upload - automatically detects if source is URL, file path, or content Args: channels: Channel(s) to upload to file_source: Can be URL, local file path, or text content filename: Optional filename (required for text content) title: File title initial_comment: Comment to add with file thread_ts: Thread timestamp if uploading to thread """ if isinstance(channels, list): channels = ",".join(channels) # Auto-detect source type and route to appropriate method if self._is_url(file_source): return await self._upload_from_url( channels, file_source, filename, title, initial_comment, thread_ts ) elif self._is_file_path(file_source): return await self._upload_from_path( channels, file_source, filename, title, initial_comment, thread_ts ) else: # Treat as text content if not filename: return SlackResponse( ok=False, data={}, error="filename is required when uploading text content" ) return await self._upload_content( channels, file_source, filename, title, initial_comment, thread_ts ) async def _upload_from_url( self, channels: str, file_url: str, filename: Optional[str], title: Optional[str], initial_comment: Optional[str], thread_ts: Optional[str] ) -> SlackResponse: """Download file from URL and upload to Slack""" try: async with httpx.AsyncClient() as client: response = await client.get(file_url) response.raise_for_status() # Get filename from URL if not provided if not filename: parsed_url = urlparse(file_url) filename = os.path.basename(parsed_url.path) if not filename or '.' not in filename: # Try to get extension from content-type content_type = response.headers.get('content-type', '') if 'pdf' in content_type: filename = "downloaded_file.pdf" elif 'image' in content_type: filename = "downloaded_image.jpg" else: filename = "downloaded_file" # Create file-like object from downloaded content file_data = io.BytesIO(response.content) kwargs = { "channel": channels, "file": file_data, "filename": filename } if title: kwargs["title"] = title else: kwargs["title"] = f"File from {file_url}" if initial_comment: kwargs["initial_comment"] = initial_comment if thread_ts: kwargs["thread_ts"] = thread_ts return await self._safe_api_call("files_upload_v2", **kwargs) except httpx.HTTPError as e: return SlackResponse( ok=False, data={}, error=f"Failed to download file from URL: {str(e)}" ) except Exception as e: return SlackResponse( ok=False, data={}, error=f"Upload failed: {str(e)}" ) async def _upload_from_path( self, channels: str, file_path: str, filename: Optional[str], title: Optional[str], initial_comment: Optional[str], thread_ts: Optional[str] ) -> SlackResponse: """Upload file from local file system path""" try: if not filename: filename = os.path.basename(file_path) kwargs = { "channel": channels, "file": file_path, "filename": filename } if title: kwargs["title"] = title else: kwargs["title"] = f"File: {filename}" if initial_comment: kwargs["initial_comment"] = initial_comment if thread_ts: kwargs["thread_ts"] = thread_ts return await self._safe_api_call("files_upload_v2", **kwargs) except Exception as e: return SlackResponse( ok=False, data={}, error=f"Failed to upload file from path: {str(e)}" ) async def _upload_content( self, channels: str, content: str, filename: str, title: Optional[str], initial_comment: Optional[str], thread_ts: Optional[str] ) -> SlackResponse: """Upload text content as a file""" try: # Create file-like object from content file_data = io.BytesIO(content.encode('utf-8')) kwargs = { "channel": channels, "file": file_data, "filename": filename } if title: kwargs["title"] = title else: kwargs["title"] = f"Text file: {filename}" if initial_comment: kwargs["initial_comment"] = initial_comment if thread_ts: kwargs["thread_ts"] = thread_ts return await self._safe_api_call("files_upload_v2", **kwargs) except Exception as e: return SlackResponse( ok=False, data={}, error=f"Failed to upload content: {str(e)}" ) # Convenience methods for specific upload types async def upload_file_from_url( self, channels: Union[str, List[str]], file_url: str, filename: Optional[str] = None, title: Optional[str] = None, initial_comment: Optional[str] = None, thread_ts: Optional[str] = None ) -> SlackResponse: """Upload file from URL (explicit method)""" return await self.upload_file(channels, file_url, filename, title, initial_comment, thread_ts) async def upload_file_from_path( self, channels: Union[str, List[str]], file_path: str, filename: Optional[str] = None, title: Optional[str] = None, initial_comment: Optional[str] = None, thread_ts: Optional[str] = None ) -> SlackResponse: """Upload file from local path (explicit method)""" return await self.upload_file(channels, file_path, filename, title, initial_comment, thread_ts) async def upload_file_content( self, channels: Union[str, List[str]], content: str, filename: str, title: Optional[str] = None, initial_comment: Optional[str] = None, thread_ts: Optional[str] = None ) -> SlackResponse: """Upload text content as file (explicit method)""" return await self.upload_file(channels, content, filename, title, initial_comment, thread_ts) async def list_files( self, user: Optional[str] = None, channel: Optional[str] = None, ts_from: Optional[str] = None, ts_to: Optional[str] = None, types: Optional[str] = None, count: int = 100, page: int = 1 ) -> SlackResponse: """List files in workspace""" kwargs = {"count": count, "page": page} if user: kwargs["user"] = user if channel: kwargs["channel"] = channel if ts_from: kwargs["ts_from"] = ts_from if ts_to: kwargs["ts_to"] = ts_to if types: kwargs["types"] = types return await self._safe_api_call("files_list", **kwargs) async def get_file_info(self, file: str) -> SlackResponse: """Get information about a file""" return await self._safe_api_call("files_info", file=file) async def delete_file(self, file: str) -> SlackResponse: """Delete a file""" return await self._safe_api_call("files_delete", file=file) # ==================== REACTIONS ==================== async def add_reaction(self, name: str, channel: str, timestamp: str) -> SlackResponse: """Add emoji reaction to a message""" return await self._safe_api_call("reactions_add", name=name, channel=channel, timestamp=timestamp) async def remove_reaction(self, name: str, channel: str, timestamp: str) -> SlackResponse: """Remove emoji reaction from a message""" return await self._safe_api_call("reactions_remove", name=name, channel=channel, timestamp=timestamp) async def get_reactions(self, channel: str, timestamp: str) -> SlackResponse: """Get reactions for a message""" return await self._safe_api_call("reactions_get", channel=channel, timestamp=timestamp) # ==================== PINS ==================== async def pin_message(self, channel: str, timestamp: str) -> SlackResponse: """Pin a message to channel""" return await self._safe_api_call("pins_add", channel=channel, timestamp=timestamp) async def unpin_message(self, channel: str, timestamp: str) -> SlackResponse: """Unpin a message from channel""" return await self._safe_api_call("pins_remove", channel=channel, timestamp=timestamp) async def list_pins(self, channel: str) -> SlackResponse: """List pinned items in channel""" return await self._safe_api_call("pins_list", channel=channel) # ==================== BOOKMARKS ==================== async def add_bookmark( self, channel_id: str, title: str, type: str, link: Optional[str] = None, emoji: Optional[str] = None ) -> SlackResponse: """Add a bookmark to channel""" kwargs = { "channel_id": channel_id, "title": title, "type": type } if link: kwargs["link"] = link if emoji: kwargs["emoji"] = emoji return await self._safe_api_call("bookmarks_add", **kwargs) async def remove_bookmark(self, channel_id: str, bookmark_id: str) -> SlackResponse: """Remove a bookmark from channel""" return await self._safe_api_call("bookmarks_remove", channel_id=channel_id, bookmark_id=bookmark_id) async def list_bookmarks(self, channel_id: str) -> SlackResponse: """List bookmarks in channel""" return await self._safe_api_call("bookmarks_list", channel_id=channel_id) # ==================== USER GROUPS ==================== async def create_usergroup( self, name: str, handle: Optional[str] = None, description: Optional[str] = None, channels: Optional[List[str]] = None ) -> SlackResponse: """Create a user group""" kwargs = {"name": name} if handle: kwargs["handle"] = handle if description: kwargs["description"] = description if channels: kwargs["channels"] = ",".join(channels) return await self._safe_api_call("usergroups_create", **kwargs) async def list_usergroups(self, include_disabled: bool = False) -> SlackResponse: """List user groups""" return await self._safe_api_call("usergroups_list", include_disabled=include_disabled) async def update_usergroup( self, usergroup: str, name: Optional[str] = None, handle: Optional[str] = None, description: Optional[str] = None ) -> SlackResponse: """Update a user group""" kwargs = {"usergroup": usergroup} if name: kwargs["name"] = name if handle: kwargs["handle"] = handle if description: kwargs["description"] = description return await self._safe_api_call("usergroups_update", **kwargs) async def disable_usergroup(self, usergroup: str) -> SlackResponse: """Disable a user group""" return await self._safe_api_call("usergroups_disable", usergroup=usergroup) # ==================== TEAM INFO ==================== async def get_team_info(self) -> SlackResponse: """Get team information""" return await self._safe_api_call("team_info") async def get_team_profile(self) -> SlackResponse: """Get team profile fields""" return await self._safe_api_call("team_profile_get") # ==================== EMOJI ==================== async def list_emoji(self) -> SlackResponse: """List custom emoji for team""" return await self._safe_api_call("emoji_list") # ==================== DND (Do Not Disturb) ==================== async def get_dnd_info(self, user: Optional[str] = None) -> SlackResponse: """Get Do Not Disturb info for user""" kwargs = {} if user: kwargs["user"] = user return await self._safe_api_call("dnd_info", **kwargs) async def get_dnd_team_info(self, users: Optional[List[str]] = None) -> SlackResponse: """Get DND info for multiple users""" kwargs = {} if users: kwargs["users"] = ",".join(users) return await self._safe_api_call("dnd_teamInfo", **kwargs)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bernerspace/bernerspace'

If you have feedback or need assistance with the MCP directory API, please join our Discord server