Skip to main content
Glama
kenjisekino

Claude RAG MCP Pipeline

by kenjisekino
mcp_server.py3.21 kB
#!/usr/bin/env python3 import asyncio import json import sys import os from typing import Any, Sequence # Set working directory and paths os.chdir(os.path.dirname(os.path.abspath(__file__))) sys.path.append(os.path.join(os.path.dirname(__file__), 'src')) print("Starting MCP server...", file=sys.stderr) from mcp.server import Server from mcp.types import Tool, TextContent import mcp.server.stdio from src.rag_system import ConversationalRAGSystem # Initialize RAG system print("Initializing RAG system...", file=sys.stderr) rag_system = ConversationalRAGSystem(embedding_provider="openai") print("RAG system ready", file=sys.stderr) # Create server server = Server("personal-documents") @server.list_tools() async def list_tools() -> list[Tool]: print("Tool list requested", file=sys.stderr) return [ Tool( name="search_documents", description="Search through personal document collection", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Search query or question" } }, "required": ["query"] } ), Tool( name="document_stats", description="Get statistics about document collection", inputSchema={ "type": "object", "properties": {} } ) ] @server.call_tool() async def call_tool(name: str, arguments: Any) -> Sequence[TextContent]: print(f"Tool called: {name}", file=sys.stderr) if name == "search_documents": query = arguments.get("query", "") print(f"Searching for: {query}", file=sys.stderr) try: result = rag_system.query(query, n_results=5) response = f"**Answer:** {result['answer']}\n\n" if result['sources']: sources = [s['source'] for s in result['sources']] response += f"**Sources:** {', '.join(sources)}" return [TextContent(type="text", text=response)] except Exception as e: error_msg = f"Error searching documents: {str(e)}" print(error_msg, file=sys.stderr) return [TextContent(type="text", text=error_msg)] elif name == "document_stats": try: stats = rag_system.get_system_stats() response = f"Document collection contains {stats['total_documents']} chunks" return [TextContent(type="text", text=response)] except Exception as e: return [TextContent(type="text", text=f"Error getting stats: {str(e)}")] return [TextContent(type="text", text=f"Unknown tool: {name}")] async def main(): print("Starting stdio server...", file=sys.stderr) async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): print("MCP server connected and running", file=sys.stderr) await server.run(read_stream, write_stream, server.create_initialization_options()) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kenjisekino/claude-rag-mcp-pipeline'

If you have feedback or need assistance with the MCP directory API, please join our Discord server