Skip to main content
Glama
server.py8.86 kB
"""Personal RAG MCP Server with FastMCP.""" import os import uuid import hashlib from typing import List, Optional from datetime import datetime from fastmcp import FastMCP from .storage.sqlite_store import SQLiteStore from .storage.qdrant_store import QdrantStore from .storage.schema import DocumentMetadata from .utils.embeddings import EmbeddingClient from .utils.chunking import TextChunker from .pipeline.retriever import VectorRetriever from .pipeline.generator import LLMGenerator from .pipeline.pipeline import RAGPipeline # Initialize MCP server mcp = FastMCP("Personal RAG") # Global state for storage and pipeline # (initialized on first tool call) _sqlite_store: Optional[SQLiteStore] = None _qdrant_store: Optional[QdrantStore] = None _embedding_client: Optional[EmbeddingClient] = None _chunker: Optional[TextChunker] = None _pipeline: Optional[RAGPipeline] = None async def get_stores(): """Lazy initialization of storage components.""" global _sqlite_store, _qdrant_store, _embedding_client, _chunker, _pipeline if _sqlite_store is None: # Get config from environment sqlite_path = os.getenv("SQLITE_PATH", "/app/data/documents.db") qdrant_url = os.getenv("QDRANT_URL", "http://qdrant:6333") ollama_url = os.getenv("OLLAMA_URL", "http://ollama:11434") litellm_url = os.getenv("LITELLM_URL", "http://litellm:4000") # Initialize stores _sqlite_store = SQLiteStore(sqlite_path) await _sqlite_store.initialize() _qdrant_store = QdrantStore(qdrant_url) await _qdrant_store.initialize() # Initialize embedding client _embedding_client = EmbeddingClient(ollama_url) # Initialize chunker _chunker = TextChunker(chunk_size=512, chunk_overlap=50) # Initialize pipeline retriever = VectorRetriever( qdrant_store=_qdrant_store, sqlite_store=_sqlite_store, embedding_client=_embedding_client ) generator = LLMGenerator( api_base=litellm_url, model="ollama/llama3" ) _pipeline = RAGPipeline( retriever=retriever, generator=generator, top_k=5 ) return _sqlite_store, _qdrant_store, _embedding_client, _chunker, _pipeline @mcp.tool() async def store_memory( text: str, namespace: str = "notes/personal", tags: Optional[List[str]] = None, title: Optional[str] = None, category: Optional[str] = None, content_type: str = "note" ) -> str: """Store a note or memory in the knowledge base. Args: text: The text content to store namespace: Hierarchical namespace (e.g., 'notes/personal', 'documents/work') tags: List of tags for categorization title: Optional title category: Optional category ('work', 'personal', 'family') content_type: Type of content ('note', 'document', 'snippet') Returns: Confirmation message with document ID """ sqlite, qdrant, embeddings, chunker, _ = await get_stores() # Generate document ID doc_id = str(uuid.uuid4()) # Create metadata metadata = DocumentMetadata( id=doc_id, namespace=namespace, content_type=content_type, category=category, tags=tags or [], source="manual", source_hash=hashlib.sha256(text.encode()).hexdigest(), title=title, ingestion_method="mcp_tool" ) # Store full text in SQLite await sqlite.store_document(doc_id, text, metadata) # Check if chunking is needed if chunker.needs_chunking(text): # Chunk the text chunks = chunker.chunk_text(text) # Store each chunk chunk_ids = [] for i, chunk_text in enumerate(chunks): chunk_id = f"{doc_id}_chunk_{i}" chunk_ids.append(chunk_id) # Generate embedding embedding = await embeddings.embed_text(chunk_text) # Store in Qdrant await qdrant.upsert_vector( point_id=chunk_id, vector=embedding, payload={ "document_id": doc_id, "chunk_id": chunk_id, "chunk_index": i, "namespace": namespace, "content_type": content_type, "title": title } ) # Store chunk in SQLite await sqlite.store_chunk( chunk_id=chunk_id, document_id=doc_id, chunk_index=i, chunk_text=chunk_text, qdrant_point_id=chunk_id ) return f"Stored document '{title or doc_id}' with {len(chunks)} chunks. ID: {doc_id}" else: # Single document, no chunking embedding = await embeddings.embed_text(text) await qdrant.upsert_vector( point_id=doc_id, vector=embedding, payload={ "document_id": doc_id, "chunk_id": None, "namespace": namespace, "content_type": content_type, "title": title } ) return f"Stored document '{title or doc_id}'. ID: {doc_id}" @mcp.tool() async def search_memory( query: str, namespace: Optional[str] = None, limit: int = 5, content_type: Optional[str] = None ) -> str: """Search the knowledge base for relevant content. Args: query: Search query namespace: Optional namespace filter limit: Maximum number of results (default: 5) content_type: Optional content type filter Returns: Formatted search results """ _, _, _, _, pipeline = await get_stores() # Perform search results = await pipeline.search( query=query, limit=limit, namespace=namespace, content_type=content_type ) if not results: return "No results found." # Format results output = f"Found {len(results)} result(s):\n\n" for i, result in enumerate(results, 1): output += f"[{i}] {result.metadata.title or result.document_id}\n" output += f" Score: {result.score:.3f}\n" output += f" Namespace: {result.metadata.namespace}\n" output += f" Type: {result.metadata.content_type}\n" # Show snippet snippet = result.text[:200] + "..." if len(result.text) > 200 else result.text output += f" Snippet: {snippet}\n\n" return output @mcp.tool() async def ask_with_context( question: str, namespace: Optional[str] = None, limit: int = 5 ) -> str: """Ask a question using RAG (retrieval-augmented generation). Args: question: The question to answer namespace: Optional namespace filter limit: Maximum context chunks to retrieve (default: 5) Returns: Answer with source citations """ _, _, _, _, pipeline = await get_stores() # Set top_k temporarily original_top_k = pipeline.top_k pipeline.top_k = limit # Execute RAG query result = await pipeline.query( question=question, namespace=namespace ) # Restore original top_k pipeline.top_k = original_top_k # Format response answer = result["answer"] sources = result["sources"] output = f"{answer}\n\n" if sources: output += "Sources:\n" for i, source in enumerate(sources, 1): title = source["title"] or source["document_id"] output += f"[{i}] {title} (score: {source['score']:.3f})\n" output += f" {source['namespace']}\n" return output # Health check resource @mcp.resource("health://status") async def health_check() -> str: """Health check endpoint.""" try: sqlite, qdrant, _, _, _ = await get_stores() # Check stores qdrant_info = await qdrant.get_collection_info() return f"""Health Status: OK SQLite: Connected Qdrant: Connected ({qdrant_info['points_count']} points) """ except Exception as e: return f"Health Status: ERROR\n{str(e)}" def main(): """Main entry point with dual transport support.""" print("again") transport = os.getenv("TRANSPORT", "stdio") print("transport", transport) if transport == "http": # HTTP SSE transport for web clients import uvicorn port = int(os.getenv("PORT", 8765)) http_app = mcp.http_app(transport="sse") print("http_app (SSE mode)", http_app) uvicorn.run( http_app, host="0.0.0.0", port=port ) else: # stdio transport (default) mcp.run() print("HELLO") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/timerickson/personal-rag-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server