Skip to main content
Glama
i-dot-ai
by i-dot-ai
es_to_qdrant_etl.py11.4 kB
#!/usr/bin/env python """ Transfer Parliamentary Questions and Hansard contributions from Elasticsearch to Qdrant. Used for a once off transfer of data from Elasticsearch to Qdrant. Usage: uv sync --group transfer-from-es uv run python scripts/es_to_qdrant_etl.py pqs --limit 100 --batch-size 100 uv run python scripts/es_to_qdrant_etl.py hansard --limit 100 --batch-size 100 """ import asyncio import contextlib import logging import os import click from bloom_filter import BloomFilter from dotenv import load_dotenv from elasticsearch import AsyncElasticsearch from rich.progress import ( BarColumn, MofNCompleteColumn, Progress, SpinnerColumn, TaskProgressColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn, ) from tenacity import retry, stop_after_attempt, wait_exponential from parliament_mcp.models import Contribution, ParliamentaryQuestion from parliament_mcp.qdrant_data_loaders import QdrantDataLoader from parliament_mcp.qdrant_helpers import get_async_qdrant_client from parliament_mcp.settings import settings logger = logging.getLogger(__name__) load_dotenv() # Configuration for each document type CONFIGS = { "pqs": { "es_index": "lex-parliamentary-questions-prod-may-2025", "qdrant_collection": settings.PARLIAMENTARY_QUESTIONS_COLLECTION, "model": ParliamentaryQuestion, "text_fields": ["questionText", "answerText"], "excludes": ["questionText.inference", "answerText.inference", "document_uri"], "description": "PQs", "expected_documents": 600000, # 593,281 documents "from_date": "1900-01-01", "date_field": "dateTabled", "to_date": "2020-01-01", }, "hansard": { "es_index": "parliament_mcp_hansard_contributions", "qdrant_collection": settings.HANSARD_CONTRIBUTIONS_COLLECTION, "model": Contribution, "text_fields": ["ContributionTextFull"], "excludes": ["ContributionTextFull.inference", "document_uri", "contribution_url", "debate_url"], "description": "Hansard contributions", "expected_documents": 2400000, # 2,389,496 documents "from_date": "1900-01-01", "to_date": "2020-01-01", "date_field": "SittingDate", }, } def get_es_client(): return AsyncElasticsearch( cloud_id=os.environ["ELASTICSEARCH_CLOUD_ID"], api_key=os.environ["ELASTICSEARCH_API_KEY"], ) async def populate_bloom_filter_from_qdrant(qdrant, collection_name, expected_items=1000000): """Populate a bloom filter with existing chunk IDs from Qdrant.""" bloom = BloomFilter(max_elements=expected_items, error_rate=0.01) # Get collection info to know total points collection_info = await qdrant.get_collection(collection_name) total_points = collection_info.points_count with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), MofNCompleteColumn(), TaskProgressColumn(), TextColumn("Elapsed: "), TimeElapsedColumn(), TimeRemainingColumn(), expand=True, ) as progress: task = progress.add_task( f"Constructing bloom filter from existing chunks in {collection_name}...", total=total_points ) # Scroll through all points in the collection offset = None total_loaded = 0 while True: result = await qdrant.scroll( collection_name=collection_name, limit=1000, # Process in batches offset=offset, with_payload=["chunk_id"], # Only fetch the chunk_id field with_vectors=False, # Don't fetch vectors to save bandwidth ) points, offset = result if not points: break # Add chunk IDs to bloom filter for point in points: if "chunk_id" in point.payload: bloom.add(point.payload["chunk_id"]) total_loaded += len(points) progress.update(task, advance=len(points)) if offset is None: break logger.info("Loaded %d existing chunk IDs into bloom filter", total_loaded) return bloom async def es_batch_generator(es, config_data, batch_size=100, limit=None): """Async generator that yields transformed document batches from Elasticsearch.""" processed = 0 @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10)) async def es_search_with_retry(): return await es.search( index=config_data["es_index"], scroll="5m", body={ "query": { "range": { config_data["date_field"]: { "gte": config_data["from_date"], "lte": config_data["to_date"], } } }, "_source": {"excludes": config_data["excludes"]}, "size": batch_size, }, ) @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10)) async def es_scroll_with_retry(scroll_id): return await es.scroll(scroll_id=scroll_id, scroll="5m") # Start scrolling resp = await es_search_with_retry() scroll_id = resp["_scroll_id"] hits = resp["hits"]["hits"] try: while hits and (not limit or processed < limit): # Transform batch def flatten_doc(hit): doc = hit["_source"] for field in config_data["text_fields"]: if field in doc and isinstance(doc[field], dict): doc[field] = doc[field]["text"] return config_data["model"].model_validate(doc) docs = [flatten_doc(hit) for hit in hits] processed += len(docs) yield docs # Get next batch resp = await es_scroll_with_retry(scroll_id) hits = resp["hits"]["hits"] finally: with contextlib.suppress(Exception): await es.clear_scroll(scroll_id=scroll_id) async def worker(progress_task, loader, bloom_filter, queue, progress): """Worker that processes batches from the queue.""" @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), reraise=True) async def store_batch_with_retry(batch): # Filter out documents that already exist if bloom filter is available if bloom_filter: filtered_batch = [] for doc in batch: # Generate chunk IDs to check against bloom filter chunks = list(doc.to_chunks(loader.chunker)) new_chunks_exist = False for chunk in chunks: if chunk["chunk_id"] not in bloom_filter: new_chunks_exist = True break if new_chunks_exist: filtered_batch.append(doc) if not filtered_batch: logger.debug("Skipped batch - all documents already exist") return batch = filtered_batch await loader.store_in_qdrant_batch(batch) while True: batch = await queue.get() if batch is None: break await store_batch_with_retry(batch) progress.update(progress_task, advance=len(batch)) async def transfer_documents(doc_type, limit=None, batch_size=100, concurrent_workers=3, skip_existing=True): """Generic document transfer from Elasticsearch to Qdrant with concurrent batch processing.""" config_data = CONFIGS[doc_type] async with get_async_qdrant_client(settings=settings) as qdrant, get_es_client() as es: loader = QdrantDataLoader(qdrant, config_data["qdrant_collection"], settings) # Load existing chunk IDs if skip_existing is enabled bloom_filter = None if skip_existing: bloom_filter = await populate_bloom_filter_from_qdrant( qdrant, config_data["qdrant_collection"], expected_items=config_data["expected_documents"] ) # Get total count total = ( await es.count( index=config_data["es_index"], body={ "query": { "range": { config_data["date_field"]: { "gte": config_data["from_date"], "lte": config_data["to_date"], } } } }, ) )["count"] if limit: total = min(total, limit) queue = asyncio.Queue(maxsize=concurrent_workers * 2) async def producer(): """Produces batches from ES and puts them in the queue.""" gen = es_batch_generator(es, config_data, batch_size, limit) async for batch in gen: await queue.put(batch) # Signal completion for _ in range(concurrent_workers): await queue.put(None) with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), MofNCompleteColumn(), TaskProgressColumn(), TextColumn("Elapsed: "), TimeElapsedColumn(), TextColumn("Remaining: "), TimeRemainingColumn(), expand=True, ) as progress: task = progress.add_task(f"Transferring {config_data['description']}...", total=total) # Run producer and workers concurrently async with asyncio.TaskGroup() as tg: tg.create_task(producer()) for _ in range(concurrent_workers): tg.create_task(worker(task, loader, bloom_filter, queue, progress)) await es.close() logger.info("✓ Transferred %s", config_data["description"]) @click.group() def cli(): """ES to Qdrant ETL for Parliament data.""" @cli.command() @click.option("--limit", type=int, help="Limit docs to transfer") @click.option("--batch-size", type=int, default=100) @click.option("--concurrent-workers", type=int, default=3, help="Number of concurrent workers") @click.option("--skip-existing/--no-skip-existing", default=True, help="Skip documents that already exist in Qdrant") def pqs(limit, batch_size, concurrent_workers, skip_existing): """Transfer Parliamentary Questions.""" asyncio.run(transfer_documents("pqs", limit, batch_size, concurrent_workers, skip_existing)) @cli.command() @click.option("--limit", type=int, help="Limit docs to transfer") @click.option("--batch-size", type=int, default=100) @click.option("--concurrent-workers", type=int, default=3, help="Number of concurrent workers") @click.option("--skip-existing/--no-skip-existing", default=True, help="Skip documents that already exist in Qdrant") def hansard(limit, batch_size, concurrent_workers, skip_existing): """Transfer Hansard contributions.""" asyncio.run(transfer_documents("hansard", limit, batch_size, concurrent_workers, skip_existing)) if __name__ == "__main__": cli()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/i-dot-ai/parliament-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server