MCP DuckDuckGo Search Server

by spences10
Verified
  • service_handlers
from typing import Dict, Optional import aiohttp import logging from .base import BaseServiceHandler logger = logging.getLogger(__name__) class SlackHandler(BaseServiceHandler): SLACK_API_URL = "https://slack.com/api" async def validate_config(self, config: Dict[str, str]) -> bool: """Validate Slack configuration""" required_fields = ['bot_token', 'channel'] return all(field in config and config[field] for field in required_fields) async def initialize(self) -> bool: """Initialize Slack client""" try: self._client = aiohttp.ClientSession(headers={ 'Authorization': f'Bearer {self.config["bot_token"]}', 'Content-Type': 'application/json; charset=utf-8' }) return True except Exception as e: logger.error(f"Slack initialization failed: {str(e)}") return False async def test_connection(self) -> bool: """Test Slack connection by checking auth""" if not self._client: return False try: async with self._client.get(f"{self.SLACK_API_URL}/auth.test") as response: data = await response.json() if data.get('ok'): logger.info(f"Slack connection successful - authenticated as {data.get('user')}") return True else: logger.error(f"Slack connection failed - {data.get('error')}") return False except Exception as e: logger.error(f"Slack connection test failed: {str(e)}") return False async def send_message(self, text: str, thread_ts: str = None, blocks: list = None) -> Optional[dict]: """Send a message to the configured channel""" if not self.is_initialized: if not await self.setup(): return None try: data = { "channel": self.config['channel'], "text": text } if thread_ts: data["thread_ts"] = thread_ts if blocks: data["blocks"] = blocks async with self._client.post(f"{self.SLACK_API_URL}/chat.postMessage", json=data) as response: result = await response.json() if result.get('ok'): return result else: logger.error(f"Failed to send message - {result.get('error')}") return None except Exception as e: logger.error(f"Error sending message: {str(e)}") return None async def get_channel_info(self) -> Optional[dict]: """Get information about the configured channel""" if not self.is_initialized: if not await self.setup(): return None try: channel = self.config['channel'] # Remove the # if present if channel.startswith('#'): channel = channel[1:] async with self._client.get( f"{self.SLACK_API_URL}/conversations.info", params={"channel": channel} ) as response: result = await response.json() if result.get('ok'): return result.get('channel') else: logger.error(f"Failed to get channel info - {result.get('error')}") return None except Exception as e: logger.error(f"Error getting channel info: {str(e)}") return None async def upload_file(self, file_content: bytes, filename: str, filetype: str = None, thread_ts: str = None) -> Optional[dict]: """Upload a file to the configured channel""" if not self.is_initialized: if not await self.setup(): return None try: data = aiohttp.FormData() data.add_field('channels', self.config['channel']) data.add_field('file', file_content, filename=filename) if filetype: data.add_field('filetype', filetype) if thread_ts: data.add_field('thread_ts', thread_ts) async with self._client.post(f"{self.SLACK_API_URL}/files.upload", data=data) as response: result = await response.json() if result.get('ok'): return result else: logger.error(f"Failed to upload file - {result.get('error')}") return None except Exception as e: logger.error(f"Error uploading file: {str(e)}") return None async def close(self): """Close the aiohttp session""" if self._client: await self._client.close() self._client = None self._initialized = False