Skip to main content
Glama
fast_index.py7.97 kB
#!/usr/bin/env python3 """Fast parallel indexer for ickyMCP.""" import sys import os os.environ['HF_HUB_DISABLE_SYMLINKS_WARNING'] = '1' sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import time from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from typing import Optional from src.config import CHUNK_SIZE, CHUNK_OVERLAP, DB_PATH from src.database import VectorDatabase from src.parsers import parse_document, is_supported from src.chunker import TextChunker from src.embedder import get_embedder @dataclass class FileResult: path: str chunks: list page_count: int file_type: str file_size: int modified_time: float error: Optional[str] = None def parse_file(file_path: Path) -> FileResult: """Parse a single file - runs in thread pool.""" try: stat = file_path.stat() parsed = parse_document(file_path) if parsed is None or not parsed.text.strip(): return FileResult( path=str(file_path), chunks=[], page_count=0, file_type="unknown", file_size=stat.st_size, modified_time=stat.st_mtime, error="No content extracted" ) # Chunk the text chunker = TextChunker(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) chunks = chunker.chunk_text(parsed.text) return FileResult( path=str(file_path), chunks=chunks, page_count=parsed.page_count, file_type=parsed.file_type, file_size=stat.st_size, modified_time=stat.st_mtime ) except Exception as e: stat = file_path.stat() return FileResult( path=str(file_path), chunks=[], page_count=0, file_type="unknown", file_size=stat.st_size, modified_time=stat.st_mtime, error=str(e) ) def fast_index(target_dir: str, patterns: list = None, max_workers: int = 8, batch_size: int = 64): """Fast parallel indexing with thread pool for parsing and batched embeddings.""" if patterns is None: patterns = ["*.pdf"] target_path = Path(target_dir) print(f"=== ickyMCP Fast Indexer ===") print(f"Target: {target_dir}") print(f"Workers: {max_workers}, Batch size: {batch_size}") print() # Collect files files = [] for pattern in patterns: files.extend(list(target_path.rglob(pattern))) files = [f for f in files if f.is_file() and is_supported(f)] print(f"Found {len(files)} files to index") print() # Initialize components print("Loading embedding model...") start_time = time.time() embedder = get_embedder() if hasattr(embedder, 'load_model'): embedder.load_model() print(f"Model loaded in {time.time() - start_time:.1f}s") print() # Clear and initialize database db = VectorDatabase(DB_PATH) db.connect() db.delete_all() # Phase 1: Parse all files in parallel print(f"Phase 1: Parsing {len(files)} files with {max_workers} workers...") parse_start = time.time() results = [] errors = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_file = {executor.submit(parse_file, f): f for f in files} completed = 0 for future in as_completed(future_to_file): result = future.result() completed += 1 if result.error: errors.append(f"{result.path}: {result.error}") elif result.chunks: results.append(result) if completed % 50 == 0: print(f" Parsed {completed}/{len(files)} files...") parse_time = time.time() - parse_start print(f"Parsing complete: {len(results)} successful, {len(errors)} errors in {parse_time:.1f}s") print() # Phase 2: Collect all chunks and embed in batches print("Phase 2: Generating embeddings...") embed_start = time.time() # Flatten all chunks with metadata all_chunks = [] for result in results: for chunk in result.chunks: all_chunks.append({ 'result': result, 'chunk': chunk }) print(f" Total chunks: {len(all_chunks)}") # Embed in batches all_texts = [item['chunk'].text for item in all_chunks] all_embeddings = [] for i in range(0, len(all_texts), batch_size): batch = all_texts[i:i+batch_size] batch_embeddings = embedder.embed_documents(batch, show_progress=False) all_embeddings.extend(batch_embeddings) if (i // batch_size) % 10 == 0: print(f" Embedded {min(i+batch_size, len(all_texts))}/{len(all_texts)} chunks...") embed_time = time.time() - embed_start print(f"Embedding complete in {embed_time:.1f}s") print() # Phase 3: Bulk insert into database print("Phase 3: Storing in database...") store_start = time.time() # Group chunks by document doc_chunks = {} for item, embedding in zip(all_chunks, all_embeddings): path = item['result'].path if path not in doc_chunks: doc_chunks[path] = { 'result': item['result'], 'chunks_data': [] } doc_chunks[path]['chunks_data'].append({ 'chunk': item['chunk'], 'embedding': embedding }) # Insert documents and chunks for path, data in doc_chunks.items(): result = data['result'] doc_id = db.add_document( path=path, file_type=result.file_type, file_size=result.file_size, modified_time=result.modified_time, page_count=result.page_count ) for chunk_data in data['chunks_data']: chunk = chunk_data['chunk'] embedding = chunk_data['embedding'] db.add_chunk( document_id=doc_id, chunk_index=chunk.chunk_index, chunk_text=chunk.text, token_count=chunk.token_count, embedding=embedding, page_number=chunk.page_number, start_char=chunk.start_char, end_char=chunk.end_char ) db.update_document_chunk_count(doc_id, len(data['chunks_data'])) db.conn.commit() store_time = time.time() - store_start print(f"Storage complete in {store_time:.1f}s") print() # Summary total_time = time.time() - start_time stats = db.get_stats() print("=== INDEXING COMPLETE ===") print(f"Documents indexed: {stats['total_documents']}") print(f"Total chunks: {stats['total_chunks']}") print(f"Database size: {stats['index_size_mb']:.1f} MB") print(f"Errors: {len(errors)}") print() print(f"Time breakdown:") print(f" Model load: {start_time:.1f}s") print(f" Parsing: {parse_time:.1f}s") print(f" Embedding: {embed_time:.1f}s") print(f" Storage: {store_time:.1f}s") print(f" TOTAL: {total_time:.1f}s") if errors and len(errors) <= 10: print() print("Errors:") for e in errors: print(f" - {e[:100]}") db.close() return stats if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Fast parallel indexer for ickyMCP") parser.add_argument("target_dir", help="Directory to index") parser.add_argument("--workers", type=int, default=8, help="Number of parallel workers") parser.add_argument("--batch", type=int, default=64, help="Embedding batch size") parser.add_argument("--patterns", nargs="+", default=["*.pdf"], help="File patterns to match") args = parser.parse_args() fast_index(args.target_dir, args.patterns, args.workers, args.batch)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dl1683/ickyMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server