document_processor.py•8.35 kB
import os
import re
import glob
import numpy as np
from typing import List, Dict, Any, Optional
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
class DocumentProcessor:
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
"""
Initialize the document processor.
Args:
model_name: Name of the sentence-transformers model to use
"""
self.model = SentenceTransformer(model_name)
def get_embedding(self, text: str) -> np.ndarray:
"""
Get embedding for a text string.
Args:
text: The text to embed
Returns:
Embedding vector as numpy array
"""
return self.model.encode(text, show_progress_bar=False)
def process_documents(self, directory: str) -> List[Dict[str, Any]]:
"""
Process all documents in a directory.
Args:
directory: Directory containing text documents
Returns:
List of document dictionaries with embeddings and metadata
"""
documents = []
# Get all text files, including .move files
files = glob.glob(os.path.join(directory, "**/*.txt"), recursive=True)
files.extend(glob.glob(os.path.join(directory, "**/*.md"), recursive=True))
files.extend(glob.glob(os.path.join(directory, "**/*.move"), recursive=True))
if not files:
print(f"No text files found in {directory}")
return documents
print(f"Processing {len(files)} documents...")
for file_path in tqdm(files):
try:
# Read content
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
if not content:
print(f"Skipping empty file: {file_path}")
continue
# Extract file extension
ext = os.path.splitext(file_path)[1].lower()
# Process differently based on file type
if ext == '.move':
chunks = self._process_move_file(content, file_path)
else:
# Default text processing for other file types
chunks = self._chunk_text(content)
# Process each chunk as a separate document
for i, chunk in enumerate(chunks):
# Skip empty chunks
if not chunk.strip():
continue
# Create document with metadata
doc = {
'id': f"{os.path.basename(file_path)}_{i}",
'path': file_path,
'chunk_index': i,
'content': chunk,
'file_type': ext[1:] if ext else 'txt', # Remove the dot
'embedding': self.get_embedding(chunk)
}
documents.append(doc)
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
print(f"Processed {len(documents)} document chunks")
return documents
def _process_move_file(self, content: str, file_path: str) -> List[str]:
"""
Process a Move language file, extracting modules, structs, and functions.
Args:
content: The file content
file_path: Path to the file
Returns:
List of content chunks with semantic meaning
"""
chunks = []
# Extract repo and file information for context
repo_info = file_path.split('docs/move_files/')[-1] if 'docs/move_files/' in file_path else file_path
file_context = f"File: {repo_info}\n\n"
# Extract module declaration
module_match = re.search(r'module\s+([a-zA-Z0-9_:]+)\s*{', content)
module_name = module_match.group(1) if module_match else "unknown_module"
# Add file header as a chunk with module info
header_pattern = r'(\/\/.*?|\s*\/\*[\s\S]*?\*\/\s*)*module'
header_match = re.search(header_pattern, content)
if header_match:
header = header_match.group(0).replace('module', '')
if header.strip():
header_chunk = f"{file_context}Module: {module_name}\n\nHeader Comments:\n{header.strip()}"
chunks.append(header_chunk)
# Add module overview
chunks.append(f"{file_context}Move Module: {module_name}")
# Extract use statements for dependencies
use_statements = re.findall(r'use\s+([^;]+);', content)
if use_statements:
use_chunk = f"{file_context}Module: {module_name}\n\nDependencies:\n"
for stmt in use_statements:
use_chunk += f"use {stmt};\n"
chunks.append(use_chunk)
# Extract structs with their fields and methods
struct_matches = re.finditer(r'struct\s+([a-zA-Z0-9_]+)(?:\s*<[^>]+>)?\s*{([^}]+)}', content)
for struct_match in struct_matches:
struct_name = struct_match.group(1)
struct_body = struct_match.group(2)
struct_chunk = f"{file_context}Module: {module_name}\n\nStruct: {struct_name}\n\n{struct_body.strip()}"
chunks.append(struct_chunk)
# Extract functions with their bodies
function_matches = re.finditer(r'(public\s+)?(inline\s+)?(fun\s+([a-zA-Z0-9_]+)(?:\s*<[^>]+>)?\s*\([^)]*\)(?:\s*:[^{]+)?\s*{([^}]+)})', content)
for func_match in function_matches:
func_full = func_match.group(0)
func_name = func_match.group(4)
func_body = func_match.group(5)
func_chunk = f"{file_context}Module: {module_name}\n\nFunction: {func_name}\n\n{func_full.strip()}"
chunks.append(func_chunk)
# If no structured content was found, fall back to normal chunking
if not chunks:
text_chunks = self._chunk_text(content)
for chunk in text_chunks:
chunks.append(f"{file_context}{chunk}")
return chunks
def _chunk_text(self, text: str, max_chunk_size: int = 512) -> List[str]:
"""
Split text into chunks based on paragraphs and size.
Args:
text: Text to split
max_chunk_size: Maximum size of each chunk
Returns:
List of text chunks
"""
# Split by paragraphs first
paragraphs = re.split(r'\n\s*\n', text)
chunks = []
current_chunk = ""
for para in paragraphs:
# If paragraph is already too big, split it further
if len(para) > max_chunk_size:
if current_chunk:
chunks.append(current_chunk)
current_chunk = ""
# Split large paragraph into sentences
sentences = re.split(r'(?<=[.!?])\s+', para)
temp_chunk = ""
for sentence in sentences:
if len(temp_chunk) + len(sentence) <= max_chunk_size:
temp_chunk += (" " if temp_chunk else "") + sentence
else:
if temp_chunk:
chunks.append(temp_chunk)
temp_chunk = sentence
if temp_chunk:
current_chunk = temp_chunk
else:
# Check if adding this paragraph exceeds the chunk size
if len(current_chunk) + len(para) <= max_chunk_size:
current_chunk += ("\n\n" if current_chunk else "") + para
else:
chunks.append(current_chunk)
current_chunk = para
if current_chunk:
chunks.append(current_chunk)
return chunks