import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastmcp import FastMCP
from rag_pipeline import VectorStore, Retriever
import json
from datetime import datetime
# Initialize MCP server
mcp = FastMCP("research-assistant")
# Initialize RAG components
vector_store = VectorStore(use_local=True)
retriever = Retriever()
@mcp.tool()
def search_documents(query: str, top_k: int = 5) -> str:
"""
Search across your research library using semantic search.
Args:
query: Natural language query
top_k: Number of results to return (default 5)
Returns:
JSON string with search results, sources, and scores
"""
try:
results = retriever.retrieve(query, top_k)
# Log query
log_query(query, results)
# Format results
formatted = {
"query": query,
"query_time_ms": results["query_time_ms"],
"results": [
{
"text": r["text"][:500] + "..." if len(r["text"]) > 500 else r["text"],
"source": r["metadata"].get("title", "Unknown"),
"page": r["metadata"].get("page", "N/A"),
"score": r["score"]
}
for r in results["results"]
]
}
return json.dumps(formatted, indent=2)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
def get_document_summary(source_path: str) -> str:
"""
Get quick overview of a specific document.
Args:
source_path: Path or title of document
Returns:
JSON with document metadata and summary
"""
try:
# Search for document
results = vector_store.search(source_path, top_k=1)
if not results:
return json.dumps({"error": "Document not found"})
doc = results[0]
summary = {
"title": doc["metadata"].get("title", "Unknown"),
"author": doc["metadata"].get("author", "Unknown"),
"page_count": doc["metadata"].get("page_count", "N/A"),
"keywords": doc["metadata"].get("keywords", ""),
"preview": doc["text"][:500] + "..."
}
return json.dumps(summary, indent=2)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
def find_related_papers(query: str, top_k: int = 5) -> str:
"""
Find papers related to a topic or concept.
Args:
query: Topic or concept to find related papers
top_k: Number of papers to return
Returns:
JSON with related papers
"""
try:
results = retriever.retrieve(query, top_k)
# Get unique sources
unique_sources = {}
for r in results["results"]:
source = r["metadata"].get("title", "Unknown")
if source not in unique_sources:
unique_sources[source] = {
"title": source,
"relevance_score": r["score"],
"snippet": r["text"][:300] + "..."
}
return json.dumps(list(unique_sources.values()), indent=2)
except Exception as e:
return json.dumps({"error": str(e)})
def log_query(query: str, results: dict):
"""Log query for metrics tracking"""
log_dir = "./data/query_logs"
os.makedirs(log_dir, exist_ok=True)
log_entry = {
"timestamp": datetime.now().isoformat(),
"query": query,
"results_count": results["total_results"],
"query_time_ms": results["query_time_ms"],
"top_score": results["results"][0]["score"] if results["results"] else 0
}
log_file = os.path.join(log_dir, f"queries_{datetime.now().strftime('%Y%m%d')}.jsonl")
with open(log_file, 'a') as f:
f.write(json.dumps(log_entry) + "\n")
if __name__ == "__main__":
# Run MCP server
mcp.run()