mcp_api.py•3.27 kB
from fastapi import APIRouter, HTTPException, Body
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Get index file from environment
INDEX_FILE = os.getenv("INDEX_FILE", "./data/faiss_index.bin")
# Initialize router
router = APIRouter(prefix="/mcp", tags=["MCP"])
class MCPAction(BaseModel):
"""Model for an MCP action request"""
action_type: str
payload: Dict[str, Any]
class MCPResponse(BaseModel):
"""Model for an MCP response"""
status: str
data: Optional[Any] = None
error: Optional[str] = None
# Reference to the vector store - will be set from main.py
vector_store = None
doc_processor = None
def configure(vs, dp):
"""Configure the router with the vector store and document processor"""
global vector_store, doc_processor
vector_store = vs
doc_processor = dp
@router.post("/action", response_model=MCPResponse)
async def handle_action(action: MCPAction = Body(...)):
"""Handle MCP action requests"""
if vector_store is None or doc_processor is None:
raise HTTPException(status_code=500, detail="Server not properly initialized")
try:
if action.action_type == "retrieve_documents":
return await retrieve_documents(action.payload)
elif action.action_type == "index_documents":
return await index_documents(action.payload)
else:
return MCPResponse(
status="error",
error=f"Unknown action type: {action.action_type}"
)
except Exception as e:
return MCPResponse(
status="error",
error=str(e)
)
async def retrieve_documents(payload: Dict[str, Any]) -> MCPResponse:
"""Retrieve documents based on a query"""
query = payload.get("query")
top_k = payload.get("top_k", 5)
if not query:
return MCPResponse(
status="error",
error="Query is required for retrieve_documents action"
)
# Get query embedding
query_embedding = doc_processor.get_embedding(query)
# Search vector store
results = vector_store.search(query_embedding, top_k)
return MCPResponse(
status="success",
data={
"documents": results,
"query": query,
"total_results": len(results)
}
)
async def index_documents(payload: Dict[str, Any]) -> MCPResponse:
"""Index documents from a directory"""
directory = payload.get("directory", "docs")
# Process documents
documents = doc_processor.process_documents(directory)
if not documents:
return MCPResponse(
status="warning",
data={"indexed_count": 0},
error="No documents found to index"
)
# Index documents
vector_store.index_documents(documents)
# Save index
os.makedirs(os.path.dirname(INDEX_FILE), exist_ok=True)
vector_store.save(INDEX_FILE)
return MCPResponse(
status="success",
data={
"indexed_count": len(documents),
"message": f"Successfully indexed {len(documents)} documents"
}
)