server.py•8.86 kB
"""Personal RAG MCP Server with FastMCP."""
import os
import uuid
import hashlib
from typing import List, Optional
from datetime import datetime
from fastmcp import FastMCP
from .storage.sqlite_store import SQLiteStore
from .storage.qdrant_store import QdrantStore
from .storage.schema import DocumentMetadata
from .utils.embeddings import EmbeddingClient
from .utils.chunking import TextChunker
from .pipeline.retriever import VectorRetriever
from .pipeline.generator import LLMGenerator
from .pipeline.pipeline import RAGPipeline
# Initialize MCP server
mcp = FastMCP("Personal RAG")
# Global state for storage and pipeline
# (initialized on first tool call)
_sqlite_store: Optional[SQLiteStore] = None
_qdrant_store: Optional[QdrantStore] = None
_embedding_client: Optional[EmbeddingClient] = None
_chunker: Optional[TextChunker] = None
_pipeline: Optional[RAGPipeline] = None
async def get_stores():
"""Lazy initialization of storage components."""
global _sqlite_store, _qdrant_store, _embedding_client, _chunker, _pipeline
if _sqlite_store is None:
# Get config from environment
sqlite_path = os.getenv("SQLITE_PATH", "/app/data/documents.db")
qdrant_url = os.getenv("QDRANT_URL", "http://qdrant:6333")
ollama_url = os.getenv("OLLAMA_URL", "http://ollama:11434")
litellm_url = os.getenv("LITELLM_URL", "http://litellm:4000")
# Initialize stores
_sqlite_store = SQLiteStore(sqlite_path)
await _sqlite_store.initialize()
_qdrant_store = QdrantStore(qdrant_url)
await _qdrant_store.initialize()
# Initialize embedding client
_embedding_client = EmbeddingClient(ollama_url)
# Initialize chunker
_chunker = TextChunker(chunk_size=512, chunk_overlap=50)
# Initialize pipeline
retriever = VectorRetriever(
qdrant_store=_qdrant_store,
sqlite_store=_sqlite_store,
embedding_client=_embedding_client
)
generator = LLMGenerator(
api_base=litellm_url,
model="ollama/llama3"
)
_pipeline = RAGPipeline(
retriever=retriever,
generator=generator,
top_k=5
)
return _sqlite_store, _qdrant_store, _embedding_client, _chunker, _pipeline
@mcp.tool()
async def store_memory(
text: str,
namespace: str = "notes/personal",
tags: Optional[List[str]] = None,
title: Optional[str] = None,
category: Optional[str] = None,
content_type: str = "note"
) -> str:
"""Store a note or memory in the knowledge base.
Args:
text: The text content to store
namespace: Hierarchical namespace (e.g., 'notes/personal', 'documents/work')
tags: List of tags for categorization
title: Optional title
category: Optional category ('work', 'personal', 'family')
content_type: Type of content ('note', 'document', 'snippet')
Returns:
Confirmation message with document ID
"""
sqlite, qdrant, embeddings, chunker, _ = await get_stores()
# Generate document ID
doc_id = str(uuid.uuid4())
# Create metadata
metadata = DocumentMetadata(
id=doc_id,
namespace=namespace,
content_type=content_type,
category=category,
tags=tags or [],
source="manual",
source_hash=hashlib.sha256(text.encode()).hexdigest(),
title=title,
ingestion_method="mcp_tool"
)
# Store full text in SQLite
await sqlite.store_document(doc_id, text, metadata)
# Check if chunking is needed
if chunker.needs_chunking(text):
# Chunk the text
chunks = chunker.chunk_text(text)
# Store each chunk
chunk_ids = []
for i, chunk_text in enumerate(chunks):
chunk_id = f"{doc_id}_chunk_{i}"
chunk_ids.append(chunk_id)
# Generate embedding
embedding = await embeddings.embed_text(chunk_text)
# Store in Qdrant
await qdrant.upsert_vector(
point_id=chunk_id,
vector=embedding,
payload={
"document_id": doc_id,
"chunk_id": chunk_id,
"chunk_index": i,
"namespace": namespace,
"content_type": content_type,
"title": title
}
)
# Store chunk in SQLite
await sqlite.store_chunk(
chunk_id=chunk_id,
document_id=doc_id,
chunk_index=i,
chunk_text=chunk_text,
qdrant_point_id=chunk_id
)
return f"Stored document '{title or doc_id}' with {len(chunks)} chunks. ID: {doc_id}"
else:
# Single document, no chunking
embedding = await embeddings.embed_text(text)
await qdrant.upsert_vector(
point_id=doc_id,
vector=embedding,
payload={
"document_id": doc_id,
"chunk_id": None,
"namespace": namespace,
"content_type": content_type,
"title": title
}
)
return f"Stored document '{title or doc_id}'. ID: {doc_id}"
@mcp.tool()
async def search_memory(
query: str,
namespace: Optional[str] = None,
limit: int = 5,
content_type: Optional[str] = None
) -> str:
"""Search the knowledge base for relevant content.
Args:
query: Search query
namespace: Optional namespace filter
limit: Maximum number of results (default: 5)
content_type: Optional content type filter
Returns:
Formatted search results
"""
_, _, _, _, pipeline = await get_stores()
# Perform search
results = await pipeline.search(
query=query,
limit=limit,
namespace=namespace,
content_type=content_type
)
if not results:
return "No results found."
# Format results
output = f"Found {len(results)} result(s):\n\n"
for i, result in enumerate(results, 1):
output += f"[{i}] {result.metadata.title or result.document_id}\n"
output += f" Score: {result.score:.3f}\n"
output += f" Namespace: {result.metadata.namespace}\n"
output += f" Type: {result.metadata.content_type}\n"
# Show snippet
snippet = result.text[:200] + "..." if len(result.text) > 200 else result.text
output += f" Snippet: {snippet}\n\n"
return output
@mcp.tool()
async def ask_with_context(
question: str,
namespace: Optional[str] = None,
limit: int = 5
) -> str:
"""Ask a question using RAG (retrieval-augmented generation).
Args:
question: The question to answer
namespace: Optional namespace filter
limit: Maximum context chunks to retrieve (default: 5)
Returns:
Answer with source citations
"""
_, _, _, _, pipeline = await get_stores()
# Set top_k temporarily
original_top_k = pipeline.top_k
pipeline.top_k = limit
# Execute RAG query
result = await pipeline.query(
question=question,
namespace=namespace
)
# Restore original top_k
pipeline.top_k = original_top_k
# Format response
answer = result["answer"]
sources = result["sources"]
output = f"{answer}\n\n"
if sources:
output += "Sources:\n"
for i, source in enumerate(sources, 1):
title = source["title"] or source["document_id"]
output += f"[{i}] {title} (score: {source['score']:.3f})\n"
output += f" {source['namespace']}\n"
return output
# Health check resource
@mcp.resource("health://status")
async def health_check() -> str:
"""Health check endpoint."""
try:
sqlite, qdrant, _, _, _ = await get_stores()
# Check stores
qdrant_info = await qdrant.get_collection_info()
return f"""Health Status: OK
SQLite: Connected
Qdrant: Connected ({qdrant_info['points_count']} points)
"""
except Exception as e:
return f"Health Status: ERROR\n{str(e)}"
def main():
"""Main entry point with dual transport support."""
print("again")
transport = os.getenv("TRANSPORT", "stdio")
print("transport", transport)
if transport == "http":
# HTTP SSE transport for web clients
import uvicorn
port = int(os.getenv("PORT", 8765))
http_app = mcp.http_app(transport="sse")
print("http_app (SSE mode)", http_app)
uvicorn.run(
http_app,
host="0.0.0.0",
port=port
)
else:
# stdio transport (default)
mcp.run()
print("HELLO")
if __name__ == "__main__":
main()