#!/usr/bin/env python3
"""
Vector Store Manager for YTPipe
Manages vector storage across multiple database backends.
Supported Backends:
- ChromaDB (local, persistent)
- Pinecone (cloud, production)
- Weaviate (cloud/local, enterprise)
- FAISS (local, high-performance)
- Qdrant (cloud/local, production)
"""
import json
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import numpy as np
@dataclass
class VectorConfig:
"""Vector database configuration."""
backend: str = "chromadb" # chromadb, pinecone, weaviate, faiss, qdrant
persist_directory: str = "./vector_store"
collection_name: str = "youtube_transcripts"
# Cloud credentials (optional)
api_key: Optional[str] = None
environment: Optional[str] = None
url: Optional[str] = None
class VectorStoreManager:
"""
Manages vector storage across multiple backends.
"""
def __init__(self, config: VectorConfig):
"""
Initialize vector store manager.
Args:
config: Vector database configuration
"""
self.config = config
self.client = None
self.collection = None
# Initialize the selected backend
self._initialize_backend()
def _initialize_backend(self):
"""Initialize the selected vector database backend."""
backend = self.config.backend.lower()
if backend == "chromadb":
self._init_chromadb()
elif backend == "pinecone":
self._init_pinecone()
elif backend == "weaviate":
self._init_weaviate()
elif backend == "faiss":
self._init_faiss()
elif backend == "qdrant":
self._init_qdrant()
else:
raise ValueError(f"Unsupported backend: {backend}")
def _init_chromadb(self):
"""Initialize ChromaDB (local, persistent)."""
try:
import chromadb
from chromadb.config import Settings
persist_dir = Path(self.config.persist_directory) / "chromadb"
persist_dir.mkdir(parents=True, exist_ok=True)
self.client = chromadb.PersistentClient(
path=str(persist_dir),
settings=Settings(
anonymized_telemetry=False,
allow_reset=True
)
)
self.collection = self.client.get_or_create_collection(
name=self.config.collection_name,
metadata={"description": "YouTube video transcripts with embeddings"}
)
print(f"✅ ChromaDB initialized at {persist_dir}")
except ImportError:
print("❌ ChromaDB not installed. Install with: pip3 install chromadb")
raise
def _init_pinecone(self):
"""Initialize Pinecone (cloud, production)."""
try:
from pinecone import Pinecone, ServerlessSpec
if not self.config.api_key:
raise ValueError("Pinecone requires api_key in config")
pc = Pinecone(api_key=self.config.api_key)
# Create index if doesn't exist
index_name = self.config.collection_name
if index_name not in pc.list_indexes().names():
pc.create_index(
name=index_name,
dimension=384, # all-MiniLM-L6-v2 dimension
metric='cosine',
spec=ServerlessSpec(
cloud='aws',
region=self.config.environment or 'us-east-1'
)
)
self.collection = pc.Index(index_name)
print(f"✅ Pinecone initialized: {index_name}")
except ImportError:
print("❌ Pinecone not installed. Install with: pip3 install pinecone-client")
raise
def _init_weaviate(self):
"""Initialize Weaviate (cloud/local, enterprise)."""
try:
import weaviate
from weaviate.classes.init import Auth
if self.config.url:
# Cloud Weaviate
self.client = weaviate.connect_to_wcs(
cluster_url=self.config.url,
auth_credentials=Auth.api_key(self.config.api_key)
)
else:
# Local Weaviate
self.client = weaviate.connect_to_local()
# Create collection
if not self.client.collections.exists(self.config.collection_name):
self.collection = self.client.collections.create(
name=self.config.collection_name,
description="YouTube video transcripts"
)
else:
self.collection = self.client.collections.get(self.config.collection_name)
print(f"✅ Weaviate initialized: {self.config.collection_name}")
except ImportError:
print("❌ Weaviate not installed. Install with: pip3 install weaviate-client")
raise
def _init_faiss(self):
"""Initialize FAISS (local, high-performance)."""
try:
import faiss
persist_dir = Path(self.config.persist_directory) / "faiss"
persist_dir.mkdir(parents=True, exist_ok=True)
index_path = persist_dir / f"{self.config.collection_name}.index"
metadata_path = persist_dir / f"{self.config.collection_name}.json"
# Load or create index
if index_path.exists():
self.collection = faiss.read_index(str(index_path))
with open(metadata_path, 'r') as f:
self.metadata = json.load(f)
else:
# Create new index (cosine similarity)
dimension = 384 # all-MiniLM-L6-v2
self.collection = faiss.IndexFlatIP(dimension) # Inner Product (cosine)
self.metadata = []
self.index_path = index_path
self.metadata_path = metadata_path
print(f"✅ FAISS initialized at {persist_dir}")
except ImportError:
print("❌ FAISS not installed. Install with: pip3 install faiss-cpu")
raise
def _init_qdrant(self):
"""Initialize Qdrant (cloud/local, production)."""
try:
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
if self.config.url:
# Cloud Qdrant
self.client = QdrantClient(
url=self.config.url,
api_key=self.config.api_key
)
else:
# Local Qdrant
persist_dir = Path(self.config.persist_directory) / "qdrant"
persist_dir.mkdir(parents=True, exist_ok=True)
self.client = QdrantClient(path=str(persist_dir))
# Create collection if doesn't exist
collections = self.client.get_collections().collections
collection_names = [c.name for c in collections]
if self.config.collection_name not in collection_names:
self.client.create_collection(
collection_name=self.config.collection_name,
vectors_config=VectorParams(
size=384, # all-MiniLM-L6-v2
distance=Distance.COSINE
)
)
self.collection = self.config.collection_name
print(f"✅ Qdrant initialized: {self.config.collection_name}")
except ImportError:
print("❌ Qdrant not installed. Install with: pip3 install qdrant-client")
raise
def add_chunks(self, chunks: List[Dict[str, Any]], video_id: str):
"""
Add chunks with embeddings to vector store.
Args:
chunks: List of chunks with embeddings
video_id: YouTube video ID
"""
backend = self.config.backend.lower()
if backend == "chromadb":
self._add_chromadb(chunks, video_id)
elif backend == "pinecone":
self._add_pinecone(chunks, video_id)
elif backend == "weaviate":
self._add_weaviate(chunks, video_id)
elif backend == "faiss":
self._add_faiss(chunks, video_id)
elif backend == "qdrant":
self._add_qdrant(chunks, video_id)
def _add_chromadb(self, chunks: List[Dict[str, Any]], video_id: str):
"""Add chunks to ChromaDB."""
ids = [f"{video_id}_{c['id']}" for c in chunks]
embeddings = [c['embedding'] for c in chunks]
documents = [c['text'] for c in chunks]
metadatas = [{
'video_id': video_id,
'chunk_id': c['id'],
'word_count': c.get('word_count', 0),
'quality_score': c.get('quality_score', 0)
} for c in chunks]
self.collection.add(
ids=ids,
embeddings=embeddings,
documents=documents,
metadatas=metadatas
)
print(f"✅ Added {len(chunks)} chunks to ChromaDB")
def _add_pinecone(self, chunks: List[Dict[str, Any]], video_id: str):
"""Add chunks to Pinecone."""
vectors = [
(
f"{video_id}_{c['id']}",
c['embedding'],
{
'video_id': video_id,
'chunk_id': c['id'],
'text': c['text'][:1000], # Pinecone metadata limit
'word_count': c.get('word_count', 0)
}
)
for c in chunks
]
self.collection.upsert(vectors=vectors)
print(f"✅ Added {len(chunks)} chunks to Pinecone")
def _add_weaviate(self, chunks: List[Dict[str, Any]], video_id: str):
"""Add chunks to Weaviate."""
objects = [
{
'video_id': video_id,
'chunk_id': c['id'],
'text': c['text'],
'word_count': c.get('word_count', 0),
'quality_score': c.get('quality_score', 0)
}
for c in chunks
]
vectors = [c['embedding'] for c in chunks]
self.collection.data.insert_many(objects, vectors=vectors)
print(f"✅ Added {len(chunks)} chunks to Weaviate")
def _add_faiss(self, chunks: List[Dict[str, Any]], video_id: str):
"""Add chunks to FAISS."""
import faiss
# Normalize vectors for cosine similarity
embeddings = np.array([c['embedding'] for c in chunks], dtype='float32')
faiss.normalize_L2(embeddings)
# Add to index
self.collection.add(embeddings)
# Store metadata separately
for i, c in enumerate(chunks):
self.metadata.append({
'id': f"{video_id}_{c['id']}",
'video_id': video_id,
'chunk_id': c['id'],
'text': c['text'],
'word_count': c.get('word_count', 0)
})
# Save index and metadata
faiss.write_index(self.collection, str(self.index_path))
with open(self.metadata_path, 'w') as f:
json.dump(self.metadata, f)
print(f"✅ Added {len(chunks)} chunks to FAISS")
def _add_qdrant(self, chunks: List[Dict[str, Any]], video_id: str):
"""Add chunks to Qdrant."""
from qdrant_client.models import PointStruct
points = [
PointStruct(
id=f"{video_id}_{c['id']}",
vector=c['embedding'],
payload={
'video_id': video_id,
'chunk_id': c['id'],
'text': c['text'],
'word_count': c.get('word_count', 0),
'quality_score': c.get('quality_score', 0)
}
)
for c in chunks
]
self.client.upsert(
collection_name=self.collection,
points=points
)
print(f"✅ Added {len(chunks)} chunks to Qdrant")
def search(self, query_embedding: List[float], top_k: int = 5) -> List[Dict[str, Any]]:
"""
Search for similar chunks.
Args:
query_embedding: Query vector
top_k: Number of results to return
Returns:
List of similar chunks with scores
"""
backend = self.config.backend.lower()
if backend == "chromadb":
return self._search_chromadb(query_embedding, top_k)
elif backend == "pinecone":
return self._search_pinecone(query_embedding, top_k)
elif backend == "weaviate":
return self._search_weaviate(query_embedding, top_k)
elif backend == "faiss":
return self._search_faiss(query_embedding, top_k)
elif backend == "qdrant":
return self._search_qdrant(query_embedding, top_k)
def _search_chromadb(self, query_embedding: List[float], top_k: int) -> List[Dict[str, Any]]:
"""Search ChromaDB."""
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k
)
return [
{
'id': results['ids'][0][i],
'text': results['documents'][0][i],
'score': 1 - results['distances'][0][i], # Convert distance to similarity
'metadata': results['metadatas'][0][i]
}
for i in range(len(results['ids'][0]))
]
def _search_pinecone(self, query_embedding: List[float], top_k: int) -> List[Dict[str, Any]]:
"""Search Pinecone."""
results = self.collection.query(
vector=query_embedding,
top_k=top_k,
include_metadata=True
)
return [
{
'id': match.id,
'score': match.score,
'metadata': match.metadata
}
for match in results.matches
]
def _search_faiss(self, query_embedding: List[float], top_k: int) -> List[Dict[str, Any]]:
"""Search FAISS."""
import faiss
# Normalize query
query = np.array([query_embedding], dtype='float32')
faiss.normalize_L2(query)
# Search
scores, indices = self.collection.search(query, top_k)
return [
{
'id': self.metadata[idx]['id'],
'text': self.metadata[idx]['text'],
'score': float(scores[0][i]),
'metadata': self.metadata[idx]
}
for i, idx in enumerate(indices[0])
]
def _search_qdrant(self, query_embedding: List[float], top_k: int) -> List[Dict[str, Any]]:
"""Search Qdrant."""
results = self.client.search(
collection_name=self.collection,
query_vector=query_embedding,
limit=top_k
)
return [
{
'id': hit.id,
'score': hit.score,
'metadata': hit.payload
}
for hit in results
]
def main():
"""Example usage."""
import sys
if len(sys.argv) < 2:
print("Usage: python3 vector_store_manager.py <video_id> [backend]")
print("\nBackends: chromadb (default), pinecone, weaviate, faiss, qdrant")
print("\nExample:")
print(" python3 vector_store_manager.py bVxBrzSe3kA chromadb")
sys.exit(1)
video_id = sys.argv[1]
backend = sys.argv[2] if len(sys.argv) > 2 else "chromadb"
# Load chunks
chunks_file = Path(f"KNOWLEDGE_YOUTUBE/{video_id}/exports/chunks.jsonl")
if not chunks_file.exists():
print(f"❌ Chunks file not found: {chunks_file}")
sys.exit(1)
chunks = []
with open(chunks_file, 'r') as f:
for line in f:
chunks.append(json.loads(line))
print(f"📊 Loaded {len(chunks)} chunks from {chunks_file}")
# Initialize vector store
config = VectorConfig(backend=backend)
store = VectorStoreManager(config)
# Add chunks
store.add_chunks(chunks, video_id)
print(f"\n✅ Vector storage complete!")
print(f" Backend: {backend}")
print(f" Collection: {config.collection_name}")
print(f" Chunks: {len(chunks)}")
if __name__ == '__main__':
main()