We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/musnows/muxue_rag_mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import os
import time
import uuid
from typing import List, Dict, Set
from .config import AppConfig
from .storage import RAGStorage
from .utils import is_text_file, read_file_content, chunk_text
from .logger import logger
class Indexer:
def __init__(self, target_dir: str, config: AppConfig):
self.target_dir = os.path.abspath(target_dir)
self.config = config
self.storage = RAGStorage(self.target_dir, config)
def index(self):
logger.info(f"Indexing directory: {self.target_dir}")
self.storage.initialize()
# Get existing files in DB
existing_data = self.storage.collection.get(include=['metadatas'])
existing_files: Dict[str, float] = {} # path -> mtime
file_ids: Dict[str, List[str]] = {} # path -> list of chunk ids
if existing_data and existing_data['ids']:
for i, meta in enumerate(existing_data['metadatas']):
if meta and 'file_path' in meta:
fpath = meta['file_path']
fmtime = meta.get('mtime', 0)
existing_files[fpath] = fmtime
if fpath not in file_ids:
file_ids[fpath] = []
file_ids[fpath].append(existing_data['ids'][i])
# Walk directory
current_files: Set[str] = set()
files_to_process = []
for root, dirs, files in os.walk(self.target_dir):
# Skip hidden directories
dirs[:] = [d for d in dirs if not d.startswith('.')]
for file in files:
file_path = os.path.join(root, file)
if not is_text_file(file_path):
continue
current_files.add(file_path)
mtime = os.path.getmtime(file_path)
# Check if needs update
# Use a small epsilon for float comparison
if file_path not in existing_files or abs(existing_files[file_path] - mtime) > 1e-6:
files_to_process.append(file_path)
# Identify deleted files
files_to_delete = []
for fpath in existing_files:
if fpath not in current_files:
files_to_delete.append(fpath)
# Delete removed files
if files_to_delete:
logger.info(f"Removing {len(files_to_delete)} deleted files from index...")
all_ids_to_delete = []
for fpath in files_to_delete:
if fpath in file_ids:
all_ids_to_delete.extend(file_ids[fpath])
if all_ids_to_delete:
self.storage.collection.delete(ids=all_ids_to_delete)
# Process new/modified files
# First, delete old chunks for modified files
ids_to_remove_for_update = []
for fpath in files_to_process:
if fpath in file_ids:
ids_to_remove_for_update.extend(file_ids[fpath])
if ids_to_remove_for_update:
self.storage.collection.delete(ids=ids_to_remove_for_update)
logger.info(f"Processing {len(files_to_process)} files...")
for i, file_path in enumerate(files_to_process):
try:
content = read_file_content(file_path)
if not content:
continue
chunks = chunk_text(content, self.config.processing.chunk_count)
if not chunks:
continue
mtime = os.path.getmtime(file_path)
file_name = os.path.basename(file_path)
doc_ids = [str(uuid.uuid4()) for _ in chunks]
metadatas = []
for chunk in chunks:
metadatas.append({
"file_path": file_path,
"file_name": file_name,
"mtime": mtime,
"chunk_index": chunks.index(chunk),
"total_chunks": len(chunks)
})
self.storage.add_documents(chunks, metadatas, doc_ids)
if (i + 1) % 10 == 0:
logger.info(f"Processed {i + 1}/{len(files_to_process)} files")
except Exception as e:
logger.error(f"Error processing {file_path}: {e}")
logger.info("Indexing complete.")
def clean(self):
self.storage.clear()