import os
import pickle
from typing import Any, List
from pathlib import Path
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
from mcp.server.fastmcp import FastMCP
# Initialize FastMCP server
mcp = FastMCP("eyelevel-rag")
class LocalRAG:
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
"""Initialize the local RAG system with sentence transformer model."""
self.model = SentenceTransformer(model_name)
self.index = None
self.documents = []
self.metadata = []
self.index_file = "rag_index.faiss"
self.docs_file = "rag_documents.pkl"
# Load existing index if available
self.load_index()
# Auto-load default knowledge base if no documents are loaded
if len(self.documents) == 0:
self.auto_load_default_documents()
def auto_load_default_documents(self):
"""Auto-load default documents into the knowledge base."""
default_files = [
"ecom-doc/dict/용어정리.md",
"ecom-doc/dict/코드_컨테이너.md",
"ecom-doc/dict/코드_패키지_단위.md",
]
for file_path in default_files:
if os.path.exists(file_path):
try:
chunks = self.load_markdown_file(file_path)
if chunks:
file_name = os.path.basename(file_path)
metadata = [{'source_file': file_name, 'chunk_id': i} for i in range(len(chunks))]
# Add to existing documents
all_documents = self.documents + chunks
all_metadata = self.metadata + metadata
# Build the index with all documents
self.build_index(all_documents, all_metadata)
except Exception:
# Silently handle errors - don't write to stdout in STDIO servers
continue
def load_markdown_file(self, file_path: str) -> List[str]:
"""Load and split markdown file into chunks."""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
chunks = []
lines = content.split('\n')
current_section = ""
current_table_header = ""
in_table = False
for line in lines:
line = line.strip()
# Skip empty lines
if not line:
continue
# Handle headers
if line.startswith('#'):
# Save previous section if exists
if current_section and len(current_section) > 50:
chunks.append(current_section.strip())
current_section = line + "\n\n"
in_table = False
current_table_header = ""
continue
# Detect table headers
if '|' in line and not line.startswith('|--'):
if not in_table:
# First table row - this is the header
current_table_header = line
in_table = True
continue
else:
# Table data row - create a chunk with header + this row
table_chunk = f"{current_section.strip()}\n\n{current_table_header}\n{line}"
if len(table_chunk) > 30: # Only add meaningful chunks
chunks.append(table_chunk)
continue
# Handle table separator
if line.startswith('|--'):
continue
# Handle regular content
if not in_table:
current_section += line + "\n"
else:
# End of table, reset
in_table = False
current_table_header = ""
current_section += line + "\n"
# Add final section if exists
if current_section and len(current_section) > 50:
chunks.append(current_section.strip())
return chunks
def create_embeddings(self, texts: List[str]) -> np.ndarray:
"""Create embeddings for the given texts."""
return self.model.encode(texts, convert_to_numpy=True)
def build_index(self, documents: List[str], metadata: List[dict]):
"""Build FAISS index from documents."""
embeddings = self.create_embeddings(documents)
# Create FAISS index
dimension = embeddings.shape[1]
self.index = faiss.IndexFlatIP(dimension) # Inner product for cosine similarity
# Normalize embeddings for cosine similarity
faiss.normalize_L2(embeddings)
self.index.add(embeddings)
self.documents = documents
self.metadata = metadata
# Save index and documents
self.save_index()
def save_index(self):
"""Save the FAISS index and documents to disk."""
if self.index is not None:
faiss.write_index(self.index, self.index_file)
with open(self.docs_file, 'wb') as f:
pickle.dump({'documents': self.documents, 'metadata': self.metadata}, f)
def load_index(self):
"""Load the FAISS index and documents from disk."""
if os.path.exists(self.index_file) and os.path.exists(self.docs_file):
try:
self.index = faiss.read_index(self.index_file)
with open(self.docs_file, 'rb') as f:
data = pickle.load(f)
self.documents = data['documents']
self.metadata = data['metadata']
except Exception as e:
# Silently handle errors in STDIO servers - don't write to stdout
self.index = None
self.documents = []
self.metadata = []
def search(self, query: str, top_k: int = 5) -> List[dict]:
"""Search for relevant documents given a query."""
if self.index is None or len(self.documents) == 0:
return []
# Create query embedding
query_embedding = self.create_embeddings([query])
faiss.normalize_L2(query_embedding)
# Search in the index
scores, indices = self.index.search(query_embedding, top_k)
results = []
for i, (score, idx) in enumerate(zip(scores[0], indices[0])):
if idx != -1: # Valid result
results.append({
'text': self.documents[idx],
'score': float(score),
'metadata': self.metadata[idx],
'rank': i + 1
})
return results
# Initialize RAG system
rag_system = LocalRAG()
@mcp.tool()
def search_doc_for_rag_context(query: str) -> str:
"""
Searches and retrieves relevant context from a knowledge base,
based on the user's query.
Args:
query: The search query supplied by the user.
Returns:
str: Relevant text content that can be used by the LLM to answer the query.
"""
results = rag_system.search(query, top_k=5)
if not results:
return "No relevant documents found in the knowledge base."
# Format results
context_parts = []
for result in results:
context_parts.append(f"[Relevance: {result['score']:.3f}]\n{result['text']}")
return "\n\n---\n\n".join(context_parts)
# # @mcp.tool()
# def ingest_markdown_file(local_file_path: str) -> str:
# """
# Ingest a markdown file into the knowledge base.
# Args:
# local_file_path: The path to the local markdown file to ingest.
# Returns:
# str: A message indicating the file has been processed and ingested.
# """
# if not os.path.exists(local_file_path):
# return f"File not found: {local_file_path}"
# if not local_file_path.endswith('.md'):
# return f"File must be a markdown (.md) file: {local_file_path}"
# try:
# # Load and chunk the markdown file
# chunks = rag_system.load_markdown_file(local_file_path)
# if not chunks:
# return f"No content could be extracted from {local_file_path}"
# # Create metadata for each chunk
# file_name = os.path.basename(local_file_path)
# metadata = [{'source_file': file_name, 'chunk_id': i} for i in range(len(chunks))]
# # Add to existing documents or create new index
# all_documents = rag_system.documents + chunks
# all_metadata = rag_system.metadata + metadata
# # Rebuild the index with all documents
# rag_system.build_index(all_documents, all_metadata)
# return f"Successfully ingested {len(chunks)} chunks from {file_name} into the knowledge base."
# except Exception as e:
# return f"Error ingesting file {local_file_path}: {str(e)}"
# # @mcp.tool()
# def list_indexed_documents() -> str:
# """
# List all documents currently in the knowledge base.
# Returns:
# str: A summary of all indexed documents.
# """
# if not rag_system.documents:
# return "No documents currently indexed in the knowledge base."
# # Group by source file
# file_counts = {}
# for meta in rag_system.metadata:
# source_file = meta.get('source_file', 'unknown')
# file_counts[source_file] = file_counts.get(source_file, 0) + 1
# summary = f"Knowledge base contains {len(rag_system.documents)} total chunks from {len(file_counts)} files:\n\n"
# for file_name, chunk_count in file_counts.items():
# summary += f"- {file_name}: {chunk_count} chunks\n"
# return summary
# # @mcp.tool()
# def clear_knowledge_base() -> str:
# """
# Clear all documents from the knowledge base.
# Returns:
# str: A confirmation message.
# """
# rag_system.documents = []
# rag_system.metadata = []
# rag_system.index = None
# # Remove saved files
# if os.path.exists(rag_system.index_file):
# os.remove(rag_system.index_file)
# if os.path.exists(rag_system.docs_file):
# os.remove(rag_system.docs_file)
# return "Knowledge base has been cleared successfully."
if __name__ == "__main__":
# Initialize and run the server
mcp.run(transport='stdio')