Skip to main content
Glama

MCP RAG Server

document_processor.py8.35 kB
import os import re import glob import numpy as np from typing import List, Dict, Any, Optional from sentence_transformers import SentenceTransformer from tqdm import tqdm class DocumentProcessor: def __init__(self, model_name: str = "all-MiniLM-L6-v2"): """ Initialize the document processor. Args: model_name: Name of the sentence-transformers model to use """ self.model = SentenceTransformer(model_name) def get_embedding(self, text: str) -> np.ndarray: """ Get embedding for a text string. Args: text: The text to embed Returns: Embedding vector as numpy array """ return self.model.encode(text, show_progress_bar=False) def process_documents(self, directory: str) -> List[Dict[str, Any]]: """ Process all documents in a directory. Args: directory: Directory containing text documents Returns: List of document dictionaries with embeddings and metadata """ documents = [] # Get all text files, including .move files files = glob.glob(os.path.join(directory, "**/*.txt"), recursive=True) files.extend(glob.glob(os.path.join(directory, "**/*.md"), recursive=True)) files.extend(glob.glob(os.path.join(directory, "**/*.move"), recursive=True)) if not files: print(f"No text files found in {directory}") return documents print(f"Processing {len(files)} documents...") for file_path in tqdm(files): try: # Read content with open(file_path, 'r', encoding='utf-8') as f: content = f.read().strip() if not content: print(f"Skipping empty file: {file_path}") continue # Extract file extension ext = os.path.splitext(file_path)[1].lower() # Process differently based on file type if ext == '.move': chunks = self._process_move_file(content, file_path) else: # Default text processing for other file types chunks = self._chunk_text(content) # Process each chunk as a separate document for i, chunk in enumerate(chunks): # Skip empty chunks if not chunk.strip(): continue # Create document with metadata doc = { 'id': f"{os.path.basename(file_path)}_{i}", 'path': file_path, 'chunk_index': i, 'content': chunk, 'file_type': ext[1:] if ext else 'txt', # Remove the dot 'embedding': self.get_embedding(chunk) } documents.append(doc) except Exception as e: print(f"Error processing {file_path}: {str(e)}") print(f"Processed {len(documents)} document chunks") return documents def _process_move_file(self, content: str, file_path: str) -> List[str]: """ Process a Move language file, extracting modules, structs, and functions. Args: content: The file content file_path: Path to the file Returns: List of content chunks with semantic meaning """ chunks = [] # Extract repo and file information for context repo_info = file_path.split('docs/move_files/')[-1] if 'docs/move_files/' in file_path else file_path file_context = f"File: {repo_info}\n\n" # Extract module declaration module_match = re.search(r'module\s+([a-zA-Z0-9_:]+)\s*{', content) module_name = module_match.group(1) if module_match else "unknown_module" # Add file header as a chunk with module info header_pattern = r'(\/\/.*?|\s*\/\*[\s\S]*?\*\/\s*)*module' header_match = re.search(header_pattern, content) if header_match: header = header_match.group(0).replace('module', '') if header.strip(): header_chunk = f"{file_context}Module: {module_name}\n\nHeader Comments:\n{header.strip()}" chunks.append(header_chunk) # Add module overview chunks.append(f"{file_context}Move Module: {module_name}") # Extract use statements for dependencies use_statements = re.findall(r'use\s+([^;]+);', content) if use_statements: use_chunk = f"{file_context}Module: {module_name}\n\nDependencies:\n" for stmt in use_statements: use_chunk += f"use {stmt};\n" chunks.append(use_chunk) # Extract structs with their fields and methods struct_matches = re.finditer(r'struct\s+([a-zA-Z0-9_]+)(?:\s*<[^>]+>)?\s*{([^}]+)}', content) for struct_match in struct_matches: struct_name = struct_match.group(1) struct_body = struct_match.group(2) struct_chunk = f"{file_context}Module: {module_name}\n\nStruct: {struct_name}\n\n{struct_body.strip()}" chunks.append(struct_chunk) # Extract functions with their bodies function_matches = re.finditer(r'(public\s+)?(inline\s+)?(fun\s+([a-zA-Z0-9_]+)(?:\s*<[^>]+>)?\s*\([^)]*\)(?:\s*:[^{]+)?\s*{([^}]+)})', content) for func_match in function_matches: func_full = func_match.group(0) func_name = func_match.group(4) func_body = func_match.group(5) func_chunk = f"{file_context}Module: {module_name}\n\nFunction: {func_name}\n\n{func_full.strip()}" chunks.append(func_chunk) # If no structured content was found, fall back to normal chunking if not chunks: text_chunks = self._chunk_text(content) for chunk in text_chunks: chunks.append(f"{file_context}{chunk}") return chunks def _chunk_text(self, text: str, max_chunk_size: int = 512) -> List[str]: """ Split text into chunks based on paragraphs and size. Args: text: Text to split max_chunk_size: Maximum size of each chunk Returns: List of text chunks """ # Split by paragraphs first paragraphs = re.split(r'\n\s*\n', text) chunks = [] current_chunk = "" for para in paragraphs: # If paragraph is already too big, split it further if len(para) > max_chunk_size: if current_chunk: chunks.append(current_chunk) current_chunk = "" # Split large paragraph into sentences sentences = re.split(r'(?<=[.!?])\s+', para) temp_chunk = "" for sentence in sentences: if len(temp_chunk) + len(sentence) <= max_chunk_size: temp_chunk += (" " if temp_chunk else "") + sentence else: if temp_chunk: chunks.append(temp_chunk) temp_chunk = sentence if temp_chunk: current_chunk = temp_chunk else: # Check if adding this paragraph exceeds the chunk size if len(current_chunk) + len(para) <= max_chunk_size: current_chunk += ("\n\n" if current_chunk else "") + para else: chunks.append(current_chunk) current_chunk = para if current_chunk: chunks.append(current_chunk) return chunks

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ProbonoBonobo/sui-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server