doc_mcp_server.py•2.76 kB
#!/usr/bin/env python3
import json
import redis
import numpy as np
from sentence_transformers import SentenceTransformer
from mcp.server.fastmcp import FastMCP
# Initialize FastMCP server
mcp = FastMCP("doc-embeddings")
# Initialize Redis and model
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
model = SentenceTransformer('all-MiniLM-L6-v2')
def cosine_similarity(a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
@mcp.tool()
async def fetch_file(file_path: str) -> str:
"""Fetch the complete content of a file.
Args:
file_path: Path to the file to retrieve
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return f"File: {file_path}\n\n{content}"
except FileNotFoundError:
return f"File not found: {file_path}"
except Exception as e:
return f"Error reading file {file_path}: {e}"
@mcp.tool()
async def search_docs(query: str, top_k: int = 5) -> str:
"""Search documentation using semantic similarity.
Args:
query: Search query
top_k: Number of results to return (default: 5)
"""
query_embedding = model.encode(query).tolist()
doc_ids = r.smembers('doc_ids')
if not doc_ids:
return "No documents found in database"
similarities = []
for doc_id in doc_ids:
doc_data = r.hgetall(f"doc:{doc_id}")
if doc_data and 'embedding' in doc_data:
doc_embedding = json.loads(doc_data['embedding'])
similarity = cosine_similarity(query_embedding, doc_embedding)
similarities.append((float(similarity), doc_id, doc_data))
similarities.sort(key=lambda x: x[0], reverse=True)
results = []
for i, (score, doc_id, doc_data) in enumerate(similarities[:top_k]):
results.append(f"Result {i+1} (Score: {score:.3f}):\nHeading: {doc_data['heading']}\nFile: {doc_data['file_path']}\n")
return "\n---\n".join(results)
@mcp.tool()
async def list_documents() -> str:
"""List all available headings in the database."""
doc_ids = r.smembers('doc_ids')
if not doc_ids:
return "No headings found in database"
files = {}
for doc_id in doc_ids:
doc_data = r.hgetall(f"doc:{doc_id}")
if doc_data:
file_path = doc_data['file_path']
if file_path not in files:
files[file_path] = 0
files[file_path] += 1
result = f"Found {len(doc_ids)} headings from {len(files)} files:\n\n"
for file_path, count in files.items():
result += f"• {file_path} ({count} headings)\n"
return result
if __name__ == "__main__":
mcp.run(transport="stdio")