#!/usr/bin/env python3
"""
Migration Script: Add description_colbert Named Vector to ColBERT Collection
This script migrates the existing card_framework_components_colbert collection
to add a second named vector (description_colbert) for the feedback loop.
Qdrant does NOT support adding new named vectors to existing collections,
so we must:
1. Create a new collection with both named vectors
2. Copy all existing points (with placeholder for description_colbert)
3. Rename collections (old -> backup, new -> primary)
Usage:
uv run python scripts/migrate_colbert_collection_for_feedback.py [--dry-run]
"""
import os
import sys
from typing import List
from dotenv import load_dotenv
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
load_dotenv()
# Configuration
OLD_COLLECTION = "card_framework_components_colbert"
NEW_COLLECTION = "card_framework_components_colbert_v2"
BACKUP_COLLECTION = "card_framework_components_colbert_backup"
COLBERT_DIM = 128
BATCH_SIZE = 100
def get_qdrant_client():
"""Get Qdrant client using .env credentials."""
from qdrant_client import QdrantClient
url = os.getenv("QDRANT_URL")
key = os.getenv("QDRANT_KEY")
if not url or not key:
raise ValueError("QDRANT_URL and QDRANT_KEY must be set in .env")
return QdrantClient(url=url, api_key=key, prefer_grpc=True)
def check_collection_exists(client, name: str) -> bool:
"""Check if a collection exists."""
collections = client.get_collections()
return name in [c.name for c in collections.collections]
def get_collection_info(client, name: str):
"""Get collection info."""
return client.get_collection(name)
def create_new_collection_with_both_vectors(client, dry_run: bool = False):
"""Create the new collection with both named vectors."""
from qdrant_client.models import (
Distance,
MultiVectorComparator,
MultiVectorConfig,
VectorParams,
)
print(f"\nπ¦ Creating new collection: {NEW_COLLECTION}")
print(f" Named vectors: colbert, description_colbert")
print(f" Dimension: {COLBERT_DIM}")
print(f" Multi-vector comparator: MAX_SIM")
if dry_run:
print(" [DRY RUN] Would create collection")
return
# Check if new collection already exists
if check_collection_exists(client, NEW_COLLECTION):
print(f" β οΈ Collection {NEW_COLLECTION} already exists - deleting first")
client.delete_collection(NEW_COLLECTION)
client.create_collection(
collection_name=NEW_COLLECTION,
vectors_config={
"colbert": VectorParams(
size=COLBERT_DIM,
distance=Distance.COSINE,
multivector_config=MultiVectorConfig(
comparator=MultiVectorComparator.MAX_SIM
),
),
"description_colbert": VectorParams(
size=COLBERT_DIM,
distance=Distance.COSINE,
multivector_config=MultiVectorConfig(
comparator=MultiVectorComparator.MAX_SIM
),
),
},
)
print(f" β
Created {NEW_COLLECTION}")
# Create payload indexes for filtering
print(f"\nπ Creating payload indexes...")
from qdrant_client.models import PayloadSchemaType
client.create_payload_index(
collection_name=NEW_COLLECTION,
field_name="type",
field_schema=PayloadSchemaType.KEYWORD,
)
print(f" β
Created index on 'type' field")
client.create_payload_index(
collection_name=NEW_COLLECTION,
field_name="feedback",
field_schema=PayloadSchemaType.KEYWORD,
)
print(f" β
Created index on 'feedback' field")
def migrate_points(client, dry_run: bool = False):
"""Migrate all points from old to new collection."""
from qdrant_client.models import PointStruct
# Get total count
old_info = get_collection_info(client, OLD_COLLECTION)
total_points = old_info.points_count
print(
f"\nπ Migrating {total_points} points from {OLD_COLLECTION} to {NEW_COLLECTION}"
)
if total_points == 0:
print(" β οΈ No points to migrate")
return
if dry_run:
print(f" [DRY RUN] Would migrate {total_points} points")
return
# Scroll through all points in batches
offset = None
migrated = 0
while True:
# Scroll to get points
results, next_offset = client.scroll(
collection_name=OLD_COLLECTION,
limit=BATCH_SIZE,
offset=offset,
with_payload=True,
with_vectors=True,
)
if not results:
break
# Convert points for new collection
new_points = []
for point in results:
# Get existing colbert vectors
colbert_vectors = point.vector
if isinstance(colbert_vectors, dict):
colbert_vectors = colbert_vectors.get("colbert", [])
# Create placeholder for description_colbert
# Use a single zero vector (will be overwritten when patterns are stored)
placeholder_vector = [[0.0] * COLBERT_DIM]
new_point = PointStruct(
id=point.id,
vector={
"colbert": colbert_vectors,
"description_colbert": placeholder_vector,
},
payload=point.payload,
)
new_points.append(new_point)
# Upsert batch
client.upsert(
collection_name=NEW_COLLECTION,
points=new_points,
)
migrated += len(new_points)
print(f" β
Migrated {migrated}/{total_points} points")
if next_offset is None:
break
offset = next_offset
print(f"\n β
Migration complete: {migrated} points")
def swap_collections(client, dry_run: bool = False):
"""Rename collections: old -> backup, new -> primary."""
print(f"\nπ Swapping collections:")
print(f" {OLD_COLLECTION} -> {BACKUP_COLLECTION}")
print(f" {NEW_COLLECTION} -> {OLD_COLLECTION}")
if dry_run:
print(" [DRY RUN] Would swap collections")
return
# Delete backup if it exists
if check_collection_exists(client, BACKUP_COLLECTION):
print(f" β οΈ Deleting existing backup: {BACKUP_COLLECTION}")
client.delete_collection(BACKUP_COLLECTION)
# Rename old -> backup
# Note: Qdrant doesn't have rename, so we need to recreate
# For now, just delete old and rename new
# Actually, Qdrant Cloud supports collection aliases which is the better approach
# Let's check if aliases are available
try:
# Try to use aliases (more elegant)
from qdrant_client.models import (
AliasOperations,
CreateAliasOperation,
DeleteAliasOperation,
)
# Update alias to point to new collection
client.update_collection_aliases(
change_aliases_operations=[
(
DeleteAliasOperation(alias_name=OLD_COLLECTION)
if check_alias_exists(client, OLD_COLLECTION)
else None
),
CreateAliasOperation(
alias_name=OLD_COLLECTION,
collection_name=NEW_COLLECTION,
),
]
)
print(f" β
Created alias {OLD_COLLECTION} -> {NEW_COLLECTION}")
except Exception as e:
# Aliases not available or failed, use delete/rename approach
print(f" βΉοΈ Aliases not available ({e}), using delete/create approach")
# Delete old collection
print(f" ποΈ Deleting old collection: {OLD_COLLECTION}")
client.delete_collection(OLD_COLLECTION)
# Rename new collection (by recreating with same name)
# This is destructive - we'll lose the new collection name
# Better approach: just update the code to use NEW_COLLECTION
print(f" βΉοΈ New collection available as: {NEW_COLLECTION}")
print(f" β οΈ Update code to use: {NEW_COLLECTION}")
def check_alias_exists(client, alias_name: str) -> bool:
"""Check if an alias exists."""
try:
aliases = client.get_collection_aliases(alias_name)
return bool(aliases.aliases)
except Exception:
return False
def verify_migration(client):
"""Verify the migration was successful."""
print(f"\nπ Verifying migration:")
# Check new collection
if not check_collection_exists(client, NEW_COLLECTION):
print(f" β New collection {NEW_COLLECTION} does not exist!")
return False
new_info = get_collection_info(client, NEW_COLLECTION)
print(f" New collection points: {new_info.points_count}")
# Check vectors config
vectors_config = new_info.config.params.vectors
if isinstance(vectors_config, dict):
print(f" Named vectors: {list(vectors_config.keys())}")
if "colbert" in vectors_config and "description_colbert" in vectors_config:
print(" β
Both named vectors present")
else:
print(" β Missing expected named vectors")
return False
# Sample a point to verify structure
results, _ = client.scroll(
collection_name=NEW_COLLECTION,
limit=1,
with_vectors=True,
)
if results:
point = results[0]
if isinstance(point.vector, dict):
print(f" Sample point vectors: {list(point.vector.keys())}")
print(" β
Point structure correct")
else:
print(" β Point vector structure incorrect")
return False
return True
def main():
"""Run the migration."""
import argparse
parser = argparse.ArgumentParser(
description="Migrate ColBERT collection for feedback loop"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be done without making changes",
)
parser.add_argument(
"--skip-swap",
action="store_true",
help="Don't swap collections, keep new collection with _v2 suffix",
)
args = parser.parse_args()
print("=" * 60)
print("Migration: Add description_colbert to ColBERT Collection")
print("=" * 60)
if args.dry_run:
print("\nπΆ DRY RUN MODE - No changes will be made")
client = get_qdrant_client()
# Check old collection exists
if not check_collection_exists(client, OLD_COLLECTION):
print(f"\nβ Source collection {OLD_COLLECTION} does not exist!")
return 1
old_info = get_collection_info(client, OLD_COLLECTION)
print(f"\nπ Source collection: {OLD_COLLECTION}")
print(f" Points: {old_info.points_count}")
vectors_config = old_info.config.params.vectors
if isinstance(vectors_config, dict):
print(f" Named vectors: {list(vectors_config.keys())}")
# Step 1: Create new collection
create_new_collection_with_both_vectors(client, args.dry_run)
# Step 2: Migrate points
if not args.dry_run:
migrate_points(client, args.dry_run)
# Step 3: Verify migration
if not args.dry_run:
if not verify_migration(client):
print("\nβ Migration verification failed!")
return 1
# Step 4: Swap collections (optional)
if not args.skip_swap:
swap_collections(client, args.dry_run)
else:
print(f"\nβοΈ Skipping collection swap")
print(f" New collection available as: {NEW_COLLECTION}")
print(f" Update COLLECTION_NAME in code to use new collection")
print("\n" + "=" * 60)
print("β
Migration complete!")
print("=" * 60)
if args.skip_swap or args.dry_run:
print(f"\nNext steps:")
print(
f"1. Update COLLECTION_NAME in gchat/feedback_loop.py to: {NEW_COLLECTION}"
)
print(
f"2. Update COLLECTION_NAME in gchat/smart_card_builder.py to: {NEW_COLLECTION}"
)
print(f"3. Run tests to verify functionality")
return 0
if __name__ == "__main__":
sys.exit(main())