We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/SajidAlamQB/kedro-mcp-rag'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
Slack Bot Integration for Kedro RAG Assistant
This bot connects to Slack and uses the Kedro RAG system to answer questions.
"""
import os
import asyncio
import logging
from slack_bolt.async_app import AsyncApp
from slack_bolt.adapter.socket_mode.async_handler import AsyncSocketModeHandler
from kedro_rag import KedroRAG
import tempfile
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize Slack app
app = AsyncApp(token=os.environ.get("SLACK_BOT_TOKEN"))
# Global RAG instance
rag_system = None
async def get_rag():
"""Initialize or get existing RAG system"""
global rag_system
if rag_system is None:
try:
# Use the same directory as your MCP server
persist_directory = os.path.join(os.path.dirname(__file__), "kedro_knowledge_db")
os.makedirs(persist_directory, exist_ok=True)
logger.info(f"Initializing RAG system at {persist_directory}")
rag_system = KedroRAG(persist_directory=persist_directory)
# Check if we need to build knowledge base
collection_count = rag_system.collection.count()
if collection_count == 0:
logger.info("Building knowledge base from scratch...")
await rag_system.build_knowledge_base()
logger.info(f"Knowledge base built with {rag_system.collection.count()} chunks")
else:
logger.info(f"Using existing knowledge base with {collection_count} chunks")
except Exception as e:
logger.error(f"Failed to initialize RAG system: {e}")
raise
return rag_system
@app.event("app_mention")
async def handle_mention(event, say):
"""Handle when the bot is mentioned in a channel"""
try:
# Get the user's question (remove the bot mention)
text = event['text']
# Remove the bot mention from the text
words = text.split()
if words and words[0].startswith('<@'):
question = ' '.join(words[1:])
else:
question = text
if not question.strip():
await say("Hi! Ask me anything about Kedro and I'll help you find the answer!")
return
logger.info(f"Received question: {question}")
# Get RAG system and search for answer
rag = await get_rag()
# Search for relevant information
results = rag.search(question, top_k=3)
if not results:
await say("I couldn't find relevant information to answer your question. Could you try rephrasing it?")
return
# Format the response
answer_parts = []
answer_parts.append(f"**Answer to: {question}**\n")
for i, result in enumerate(results, 1):
content = result['content'][:500] + "..." if len(result['content']) > 500 else result['content']
source = result.get('source', 'Unknown')
answer_parts.append(f"**{i}.** {content}")
if source != 'Unknown':
answer_parts.append(f" *Source: {source}*")
answer_parts.append("")
response = "\n".join(answer_parts)
await say(response)
logger.info("Response sent successfully")
except Exception as e:
logger.error(f"Error handling mention: {e}")
await say("Sorry, I encountered an error while processing your request. Please try again.")
@app.event("message")
async def handle_message(event, say):
"""Handle direct messages to the bot"""
# Only respond to direct messages (not channel messages)
if event.get('channel_type') == 'im':
question = event['text']
if not question.strip():
await say("Hi! Ask me anything about Kedro and I'll help you find the answer!")
return
try:
logger.info(f"Received DM: {question}")
# Get RAG system and search for answer
rag = await get_rag()
# Search for relevant information
results = rag.search(question, top_k=3)
if not results:
await say("I couldn't find relevant information to answer your question. Could you try rephrasing it?")
return
# Format the response
answer_parts = []
answer_parts.append(f"**Answer to: {question}**\n")
for i, result in enumerate(results, 1):
content = result['content'][:500] + "..." if len(result['content']) > 500 else result['content']
source = result.get('source', 'Unknown')
answer_parts.append(f"**{i}.** {content}")
if source != 'Unknown':
answer_parts.append(f" *Source: {source}*")
answer_parts.append("")
response = "\n".join(answer_parts)
await say(response)
logger.info("DM response sent successfully")
except Exception as e:
logger.error(f"Error handling DM: {e}")
await say("Sorry, I encountered an error while processing your request. Please try again.")
@app.command("/kedro")
async def handle_kedro_command(ack, respond, command):
"""Handle the /kedro slash command"""
await ack()
question = command['text']
if not question.strip():
await respond("Please provide a question. Example: `/kedro how do I create a pipeline?`")
return
try:
logger.info(f"Received slash command: {question}")
# Get RAG system and search for answer
rag = await get_rag()
# Search for relevant information
results = rag.search(question, top_k=3)
if not results:
await respond("I couldn't find relevant information to answer your question. Could you try rephrasing it?")
return
# Format the response
answer_parts = []
answer_parts.append(f"**Answer to: {question}**\n")
for i, result in enumerate(results, 1):
content = result['content'][:500] + "..." if len(result['content']) > 500 else result['content']
source = result.get('source', 'Unknown')
answer_parts.append(f"**{i}.** {content}")
if source != 'Unknown':
answer_parts.append(f" *Source: {source}*")
answer_parts.append("")
response = "\n".join(answer_parts)
await respond(response)
logger.info("Slash command response sent successfully")
except Exception as e:
logger.error(f"Error handling slash command: {e}")
await respond("Sorry, I encountered an error while processing your request. Please try again.")
async def main():
"""Main function to start the bot"""
try:
# Initialize the RAG system
await get_rag()
# Start the socket mode handler
handler = AsyncSocketModeHandler(app, os.environ["SLACK_APP_TOKEN"])
await handler.start_async()
except Exception as e:
logger.error(f"Failed to start bot: {e}")
raise
if __name__ == "__main__":
# Run the bot
asyncio.run(main())