import os
import json
import ssl
from typing import Dict, Any, Optional
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from dataclasses import dataclass
import certifi
# MCP Server
from mcp.server.fastmcp import FastMCP, Context
# Slack SDK
from slack_sdk.web.async_client import AsyncWebClient
from slack_sdk.errors import SlackApiError
# Environment setup
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
SLACK_USER_TOKEN = os.environ.get("SLACK_USER_TOKEN")
DEFAULT_SEARCH_COUNT = int(os.environ.get("DEFAULT_SEARCH_COUNT", "20"))
# Application Context
@dataclass
class AppContext:
slack_client: AsyncWebClient
config: Dict[str, Any]
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
"""Manage the server lifecycle"""
print("Initializing Slack Search MCP Server...")
# Create SSL context (to avoid SSL certificate errors)
ssl_context = ssl.create_default_context(cafile=certifi.where())
# Initialize Slack client with SSL settings
slack_client = AsyncWebClient(token=SLACK_USER_TOKEN, ssl=ssl_context)
# Load configuration
config = {
"slack": {
"default_channels": ["general", "random"],
"excluded_channels": [],
"max_results_per_search": 100
}
}
# Try to load config from file if exists
config_file = "config.json"
if os.path.exists(config_file):
with open(config_file, 'r') as f:
file_config = json.load(f)
config.update(file_config)
try:
yield AppContext(
slack_client=slack_client,
config=config
)
finally:
print("Shutting down Slack Search MCP Server...")
def parse_slack_response(raw_response: dict) -> dict:
"""
Parse Slack search response to include only necessary information
"""
if not raw_response.get("ok", False):
return raw_response
messages = raw_response.get("messages", {})
matches = messages.get("matches", [])
# Filter each message to include only necessary fields
filtered_matches = []
for match in matches:
filtered_match = {
"text": match.get("text", ""),
"ts": match.get("ts", ""),
"permalink": match.get("permalink", ""),
"user": match.get("user", ""),
"username": match.get("username", ""),
"channel": {
"id": match.get("channel", {}).get("id", ""),
"name": match.get("channel", {}).get("name", "")
}
}
filtered_matches.append(filtered_match)
# Build filtered response
filtered_response = {
"messages": {
"matches": filtered_matches,
"total": messages.get("total", 0),
"pagination": messages.get("pagination", {})
}
}
return filtered_response
# Instantiate the MCP server
mcp = FastMCP("SlackSearchServer", lifespan=app_lifespan)
# Tool: Get Slack search results
@mcp.tool()
async def get_slack_search_results(
query: str,
ctx: Context,
count: int = 20,
cursor: Optional[str] = None,
highlight: bool = True
) -> dict:
"""
Slack検索APIを呼び出して検索結果を取得します。
Args:
query: 検索クエリ
count: 取得件数
cursor: ページネーション用カーソル
highlight: ヒット部分のマークアップ有効化
ctx: MCPコンテキスト
Returns:
Slack APIのレスポンス辞書
"""
app_ctx = ctx.request_context.lifespan_context
slack_client = app_ctx.slack_client
try:
# Call Slack search.messages API
response = await slack_client.search_messages(
query=query,
count=count,
cursor=cursor,
highlight=highlight,
sort="timestamp",
sort_dir="desc"
)
# Parse and filter the response to include only necessary information
parsed_response = parse_slack_response(response.data)
return parsed_response
except SlackApiError as e:
error_response = {
"ok": False,
"error": e.response["error"],
"error_message": str(e)
}
# Log specific errors
if e.response["error"] == "invalid_auth":
ctx.error("Authentication error: Please check your Slack token")
elif e.response["error"] == "missing_scope":
ctx.error("Missing required scope: search:read")
elif e.response["error"] == "ratelimited":
ctx.warn("Rate limited by Slack API")
return error_response
except Exception as e:
return {
"ok": False,
"error": "unknown_error",
"error_message": str(e)
}
# Main function
if __name__ == "__main__":
mcp.run(transport='stdio')