Skip to main content
Glama
qdrant-openai-indexer.py15.4 kB
#!/usr/bin/env python3 """ Qdrant OpenAI Indexer for KinDash Codebase This script indexes the codebase into Qdrant using OpenAI embeddings. It's designed to work with the MCP server wrapper for semantic code search. Usage: python scripts/qdrant-openai-indexer.py [directory] Environment variables: OPENAI_API_KEY - Required for generating embeddings """ import os import sys import json import hashlib import argparse from pathlib import Path from typing import List, Dict, Any, Optional import logging from datetime import datetime # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) try: from openai import OpenAI from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, PointStruct except ImportError as e: logger.error(f"Required package not installed: {e}") logger.error("Please install: pip install openai qdrant-client") sys.exit(1) # File extensions to index VALID_EXTENSIONS = { '.ts', '.tsx', '.js', '.jsx', '.json', '.prisma', '.md', '.mdx', '.css', '.scss', '.sql', '.sh', '.yml', '.yaml', '.env.example' } # Directories to exclude EXCLUDE_DIRS = { 'node_modules', '.git', 'dist', 'build', '.next', 'coverage', '.turbo', 'out', '.cache', '__pycache__', '.pytest_cache', '.vscode', '.idea', 'tmp', 'temp' } # Files to exclude EXCLUDE_FILES = { '.DS_Store', 'package-lock.json', 'yarn.lock', 'pnpm-lock.yaml', '.gitignore', '.env' } class CodebaseIndexer: """Indexes codebase files into Qdrant with OpenAI embeddings""" def __init__(self, collection_name: str = "kindash-codebase-openai"): self.openai_api_key = os.environ.get("OPENAI_API_KEY") if not self.openai_api_key: raise ValueError("OPENAI_API_KEY environment variable is required") self.collection_name = collection_name self.qdrant_url = os.environ.get("QDRANT_URL", "http://localhost:6333") self.embedding_model = os.environ.get("OPENAI_EMBEDDING_MODEL", "text-embedding-3-small") # Initialize clients self.openai_client = OpenAI(api_key=self.openai_api_key) self.qdrant_client = QdrantClient(url=self.qdrant_url) # Statistics self.stats = { "files_processed": 0, "files_skipped": 0, "total_tokens": 0, "errors": 0 } # Ensure collection exists self._ensure_collection() def _ensure_collection(self): """Ensure the Qdrant collection exists with correct configuration""" try: collections = self.qdrant_client.get_collections() collection_names = [c.name for c in collections.collections] if self.collection_name not in collection_names: logger.info(f"Creating collection: {self.collection_name}") self.qdrant_client.create_collection( collection_name=self.collection_name, vectors_config=VectorParams( size=1536, # text-embedding-3-small dimension distance=Distance.COSINE ) ) logger.info(f"Collection {self.collection_name} created successfully") else: logger.info(f"Collection {self.collection_name} already exists") # Get collection info info = self.qdrant_client.get_collection(self.collection_name) logger.info(f"Current points: {info.points_count}") except Exception as e: logger.error(f"Failed to ensure collection: {e}") raise def _should_index_file(self, file_path: Path) -> bool: """Check if file should be indexed""" # Check extension if file_path.suffix not in VALID_EXTENSIONS: return False # Check if file is excluded if file_path.name in EXCLUDE_FILES: return False # Check file size (skip very large files) try: size = file_path.stat().st_size if size > 1024 * 1024: # 1MB limit logger.warning(f"Skipping large file: {file_path} ({size} bytes)") return False except: return False return True def _collect_files(self, directory: Path) -> List[Path]: """Recursively collect all files to index""" files = [] for item in directory.iterdir(): # Skip excluded directories if item.is_dir() and item.name not in EXCLUDE_DIRS: files.extend(self._collect_files(item)) elif item.is_file() and self._should_index_file(item): files.append(item) return files def _extract_content(self, file_path: Path, base_path: Path) -> str: """Extract semantic content from file""" try: content = file_path.read_text(encoding='utf-8') relative_path = file_path.relative_to(base_path) # Build semantic description description = f"File: {relative_path}\n" description += f"Type: {file_path.suffix[1:]} file\n" description += f"Directory: {relative_path.parent}\n\n" # Add file-specific context if file_path.suffix in ['.tsx', '.jsx']: # React components if 'export default' in content or 'export const' in content: description += "React component file. " if 'useState' in content or 'useEffect' in content: description += "Uses React hooks. " elif file_path.suffix in ['.ts', '.js']: # TypeScript/JavaScript if 'class ' in content: description += "Contains class definitions. " if 'interface ' in content: description += "Contains TypeScript interfaces. " if 'app.get' in content or 'app.post' in content: description += "API endpoint definitions. " elif file_path.suffix == '.prisma': # Prisma schema description += "Database schema file. " if 'model ' in content: models = [line.split()[1] for line in content.split('\n') if line.strip().startswith('model ')] description += f"Models: {', '.join(models[:5])}. " # Add first few lines of actual content lines = content.split('\n')[:20] description += "\nContent preview:\n" description += '\n'.join(lines) return description except Exception as e: logger.error(f"Failed to extract content from {file_path}: {e}") return "" def _create_metadata(self, file_path: Path, base_path: Path) -> Dict[str, Any]: """Create metadata for the file""" relative_path = file_path.relative_to(base_path) return { "file": str(relative_path), "type": file_path.suffix[1:] if file_path.suffix else "unknown", "directory": str(relative_path.parent), "size": file_path.stat().st_size, "modified": datetime.fromtimestamp(file_path.stat().st_mtime).isoformat(), "indexed_at": datetime.now().isoformat() } def _generate_file_id(self, file_path: Path, base_path: Path) -> str: """Generate deterministic ID for file""" relative_path = str(file_path.relative_to(base_path)) return hashlib.md5(relative_path.encode()).hexdigest() def _batch_generate_embeddings(self, texts: List[str]) -> List[List[float]]: """Generate embeddings for multiple texts""" try: response = self.openai_client.embeddings.create( model=self.embedding_model, input=texts ) # Update token count (estimate) for text in texts: self.stats["total_tokens"] += len(text.split()) * 1.3 return [data.embedding for data in response.data] except Exception as e: logger.error(f"Failed to generate embeddings: {e}") raise def index_directory(self, directory: str, batch_size: int = 10): """Index all files in a directory""" base_path = Path(directory).resolve() if not base_path.exists(): logger.error(f"Directory not found: {directory}") return logger.info(f"Indexing directory: {base_path}") # Collect files files = self._collect_files(base_path) logger.info(f"Found {len(files)} files to index") # Process in batches for i in range(0, len(files), batch_size): batch_files = files[i:i + batch_size] batch_contents = [] batch_metadata = [] batch_ids = [] # Prepare batch for file_path in batch_files: content = self._extract_content(file_path, base_path) if content: batch_contents.append(content) batch_metadata.append(self._create_metadata(file_path, base_path)) batch_ids.append(self._generate_file_id(file_path, base_path)) self.stats["files_processed"] += 1 else: self.stats["files_skipped"] += 1 if not batch_contents: continue try: # Generate embeddings logger.info(f"Generating embeddings for batch {i//batch_size + 1}/{(len(files) + batch_size - 1)//batch_size}") embeddings = self._batch_generate_embeddings(batch_contents) # Create points points = [] for j, (embedding, metadata, point_id) in enumerate(zip(embeddings, batch_metadata, batch_ids)): points.append( PointStruct( id=point_id, vector=embedding, payload=metadata ) ) # Upsert to Qdrant result = self.qdrant_client.upsert( collection_name=self.collection_name, points=points, wait=True ) logger.info(f"Indexed {len(points)} files") except Exception as e: logger.error(f"Failed to index batch: {e}") self.stats["errors"] += 1 # Print statistics self._print_stats() def index_file(self, file_path: Path) -> bool: """Index a single file""" try: if not self._should_index_file(file_path): logger.info(f"Skipping file: {file_path}") return False # Get base path (parent directory) base_path = file_path.parent # Extract content and metadata content = self._extract_content(file_path, base_path) if not content: logger.warning(f"No content extracted from: {file_path}") return False metadata = self._create_metadata(file_path, base_path) file_id = self._generate_file_id(file_path, base_path) # Generate embedding logger.info(f"Generating embedding for: {file_path}") embeddings = self._batch_generate_embeddings([content]) if not embeddings: logger.error(f"Failed to generate embedding for: {file_path}") return False # Store in Qdrant point = PointStruct( id=file_id, vector=embeddings[0], payload=metadata ) self.qdrant_client.upsert( collection_name=self.collection_name, points=[point] ) logger.info(f"Successfully indexed: {file_path}") self.stats['files_processed'] += 1 return True except Exception as e: logger.error(f"Failed to index file {file_path}: {e}") self.stats['errors'] += 1 return False def _print_stats(self): """Print indexing statistics""" logger.info("\n" + "="*50) logger.info("Indexing Complete!") logger.info("="*50) logger.info(f"Files processed: {self.stats['files_processed']}") logger.info(f"Files skipped: {self.stats['files_skipped']}") logger.info(f"Errors: {self.stats['errors']}") logger.info(f"Estimated tokens used: {self.stats['total_tokens']:,.0f}") # Estimate cost cost = (self.stats['total_tokens'] / 1_000_000) * 0.02 # $0.02 per 1M tokens logger.info(f"Estimated cost: ${cost:.4f}") # Get final collection info try: info = self.qdrant_client.get_collection(self.collection_name) logger.info(f"Total points in collection: {info.points_count}") except: pass def main(): """Main entry point""" parser = argparse.ArgumentParser( description="Index codebase into Qdrant with OpenAI embeddings" ) parser.add_argument( "directory", nargs="?", default=".", help="Directory to index (default: current directory)" ) parser.add_argument( "--collection", default="kindash-codebase-openai", help="Qdrant collection name" ) parser.add_argument( "--batch-size", type=int, default=10, help="Batch size for processing files" ) parser.add_argument( "--file", help="Index a single file (for background indexer)" ) args = parser.parse_args() # Check for API key if not os.environ.get("OPENAI_API_KEY"): logger.error("OPENAI_API_KEY environment variable is required") logger.error("Export it with: export OPENAI_API_KEY='your-key-here'") sys.exit(1) # Create indexer indexer = CodebaseIndexer(collection_name=args.collection) if args.file: # Single file mode for background indexer file_path = Path(args.file) if file_path.exists() and file_path.is_file(): success = indexer.index_file(file_path) sys.exit(0 if success else 1) else: logger.error(f"File not found: {args.file}") sys.exit(1) else: # Directory mode indexer.index_directory(args.directory, batch_size=args.batch_size) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/steiner385/qdrant-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server