Skip to main content
Glama
i-dot-ai
by i-dot-ai
qdrant_query_handler.py20.9 kB
from collections import defaultdict from datetime import datetime from typing import Any, Literal from fastembed import SparseTextEmbedding from openai import AsyncAzureOpenAI from qdrant_client import AsyncQdrantClient, models from qdrant_client.models import DatetimeRange, FieldCondition, Filter, MatchValue from parliament_mcp.openai_helpers import embed_single from parliament_mcp.settings import ParliamentMCPSettings MINIMUM_DEBATE_HITS = 2 def parse_date(date_str: str | None) -> str | None: if not date_str: return None try: return datetime.fromisoformat(date_str).date().isoformat() except (ValueError, TypeError): return None def build_date_range_filter( date_from: str | None, date_to: str | None, field: str = "SittingDate" ) -> FieldCondition | None: """Build a date range filter for Qdrant queries using DatetimeRange.""" if not date_from and not date_to: return None return FieldCondition( key=field, range=DatetimeRange( gte=datetime.fromisoformat(date_from).date() if date_from else None, lte=datetime.fromisoformat(date_to).date() if date_to else None, ), ) def build_match_filter(field: str, value: Any) -> FieldCondition | None: """Build a match filter for Qdrant queries.""" if value is None: return None return FieldCondition(key=field, match=MatchValue(value=value)) def build_filters(conditions: list[FieldCondition | None]) -> Filter | None: """Build a Qdrant filter from a list of conditions.""" valid_conditions = [c for c in conditions if c is not None] if not valid_conditions: return None return Filter(must=valid_conditions) class DebateCollection: """Collection of debates and their contributions. Used to track the contributions for each debate and return the substantial debates. """ def __init__(self): self._debates = defaultdict(lambda: {"contribution_ids": set(), "info": None}) def add_contribution(self, contribution): debate_id = contribution.get("DebateSectionExtId") contribution_id = contribution.get("ContributionExtId") debate = self._debates[debate_id] new_data = contribution_id not in debate["contribution_ids"] debate["contribution_ids"].add(contribution_id) if debate["info"] is None: debate["info"] = { "debate_id": debate_id, "title": contribution.get("DebateSection"), "date": contribution.get("SittingDate"), "house": contribution.get("House"), "debate_parents": contribution.get("debate_parents", []), "debate_url": contribution.get("debate_url"), } return new_data def get_substantial_debates(self): return [ debate["info"] for debate in self._debates.values() if len(debate["contribution_ids"]) >= MINIMUM_DEBATE_HITS ] def get_substantial_debate_ids(self): return [ debate_id for debate_id, debate in self._debates.items() if len(debate["contribution_ids"]) >= MINIMUM_DEBATE_HITS ] class QdrantQueryHandler: def __init__( self, qdrant_client: AsyncQdrantClient, openai_client: AsyncAzureOpenAI, settings: ParliamentMCPSettings ): self.qdrant_client = qdrant_client self.openai_client = openai_client self.sparse_text_embedding = SparseTextEmbedding(model_name=settings.SPARSE_TEXT_EMBEDDING_MODEL) self.settings = settings async def embed_query_dense(self, query: str) -> list[float]: """Embed a query using the dense text embedding model.""" return await embed_single( self.openai_client, query, self.settings.AZURE_OPENAI_EMBEDDING_MODEL, self.settings.EMBEDDING_DIMENSIONS, ) def embed_query_sparse(self, query: str) -> models.SparseVector: """Embed a query using the sparse text embedding model.""" embedding = next(self.sparse_text_embedding.embed(query)) return models.SparseVector(indices=embedding.indices, values=embedding.values) async def search_debate_titles( self, query: str | None = None, date_from: str | None = None, date_to: str | None = None, house: str | None = None, max_results: int = 50, ) -> list[dict]: """ Search debate titles with optional filters. Args: query: Text to search for in debate titles (optional if date range is provided) date_from: Start date in format 'YYYY-MM-DD' (optional if query is provided) date_to: End date in format 'YYYY-MM-DD' (optional if query is provided) house: Filter by house (e.g., 'Commons', 'Lords'), optional max_results: Maximum number of results to return (default 100) Returns: List of debate details dictionaries Raises: ValueError: If neither query nor date range is provided """ # Validate that at least one of query or date range is provided if not query and not date_from and not date_to: message = "At least one of 'query', 'date_from', or 'date_to' must be provided" raise ValueError(message) # Build filters filter_conditions = [ build_date_range_filter(date_from, date_to), build_match_filter("House", house), ] if query: filter_conditions.append(FieldCondition(key="debate_parents[].Title", match=models.MatchText(text=query))) query_filter = build_filters(filter_conditions) debates = DebateCollection() while len(substantial_ids := debates.get_substantial_debate_ids()) < max_results: # Filter out already found substantial debates if substantial_ids: query_filter.must_not = [ FieldCondition(key="DebateSectionExtId", match=models.MatchAny(any=substantial_ids)), ] contributions, _ = await self.qdrant_client.scroll( collection_name=self.settings.HANSARD_CONTRIBUTIONS_COLLECTION, scroll_filter=query_filter, limit=1000, with_payload=True, order_by={"key": "SittingDate", "direction": "desc"}, ) if not contributions: break new_data_available = False for result in contributions: new_data_available |= debates.add_contribution(result.payload) if not new_data_available: break return debates.get_substantial_debates()[:max_results] async def search_hansard_contributions( self, query: str | None = None, member_id: int | None = None, date_from: str | None = None, date_to: str | None = None, debate_id: str | None = None, house: Literal["Commons", "Lords"] | None = None, max_results: int = 100, min_score: float = 0, ) -> list[dict]: """ Search Hansard contributions using Qdrant vector search. Args: query: Text to search for in contributions (optional) member_id: Member ID (optional) date_from: Start date in format 'YYYY-MM-DD' (optional) date_to: End date in format 'YYYY-MM-DD' (optional) debate_id: Debate ID (optional) house: House (Commons|Lords) (optional) max_results: Maximum number of results to return (default 100) min_score: Minimum relevance score (default 0) Returns: List of Hansard contribution details dictionaries Raises: ValueError: If no search parameters are provided """ # Build filters filter_conditions = [ build_match_filter("MemberId", member_id), build_match_filter("DebateSectionExtId", debate_id), build_match_filter("House", house), build_date_range_filter(date_from, date_to), ] query_filter = build_filters(filter_conditions) if query: # Generate embedding for search query dense_query_vector = await self.embed_query_dense(query) sparse_query_vector = self.embed_query_sparse(query) # Perform vector search query_response = await self.qdrant_client.query_points( collection_name=self.settings.HANSARD_CONTRIBUTIONS_COLLECTION, prefetch=[ models.Prefetch( query=dense_query_vector, using="text_dense", limit=max_results, filter=query_filter, ), models.Prefetch( query=sparse_query_vector, using="text_sparse", limit=max_results, filter=query_filter, ), ], query=models.FusionQuery( fusion=models.Fusion.RRF, ), limit=max_results, score_threshold=min_score, with_payload=True, ) query_response = query_response.points else: # If no query, use scroll to get results with filters only query_response, _ = await self.qdrant_client.scroll( collection_name=self.settings.HANSARD_CONTRIBUTIONS_COLLECTION, scroll_filter=query_filter, limit=max_results, with_payload=True, with_vectors=False, order_by={ "key": "SittingDate", "direction": "desc", }, ) results = [] for result in query_response: payload = result.payload results.append( { "text": payload.get("text", ""), "date": payload.get("SittingDate"), "house": payload.get("House"), "member_id": payload.get("MemberId"), "member_name": payload.get("MemberName"), "relevance_score": result.score if hasattr(result, "score") else 1.0, "debate_title": payload.get("DebateSection", ""), "debate_url": payload.get("debate_url", ""), "contribution_url": payload.get("contribution_url", ""), "order_in_debate": payload.get("OrderInDebateSection"), "debate_parents": payload.get("debate_parents", []), } ) # Sort by relevance score if we have a query, otherwise by date and order if query: results.sort(key=lambda x: x["relevance_score"], reverse=True) else: results.sort( key=lambda x: ( x.get("date", ""), x.get("order_in_debate", 0), ) ) return results async def find_relevant_contributors( self, query: str, num_contributors: int = 10, num_contributions: int = 10, date_from: str | None = None, date_to: str | None = None, house: Literal["Commons", "Lords"] | None = None, ) -> list[dict]: """ Find the most relevant parliamentary contributors and their contributions. Groups Hansard contributions by member ID and returns the top contributors with their most relevant contributions for the given search query. Args: query: Text to search for in contributions num_contributors: Number of top contributors to return (default 10) num_contributions: Number of top contributions per contributor (default 10) date_from: Start date filter in 'YYYY-MM-DD' format (optional) date_to: End date filter in 'YYYY-MM-DD' format (optional) house: Filter by house - "Commons" or "Lords" (optional) Returns: List of contributor groups, each containing the member's contributions """ # Fail if none of the parameters are provided if not query: msg = "A query must be provided" raise ValueError(msg) # Build filters query_filter = build_filters( [ build_match_filter("House", house), build_date_range_filter(date_from, date_to), ] ) # Generate embedding for search query dense_query_vector = await self.embed_query_dense(query) sparse_query_vector = self.embed_query_sparse(query) # Perform vector search query_response = await self.qdrant_client.query_points_groups( collection_name=self.settings.HANSARD_CONTRIBUTIONS_COLLECTION, prefetch=[ models.Prefetch( query=dense_query_vector, using="text_dense", filter=query_filter, ), models.Prefetch( query=sparse_query_vector, using="text_sparse", filter=query_filter, ), ], query=models.FusionQuery( fusion=models.Fusion.RRF, ), limit=num_contributors, score_threshold=0, with_payload=True, group_by="MemberId", group_size=num_contributions, ) results = [] for group in query_response.groups: group_results = [] for hit in group.hits: payload = hit.payload group_results.append( { "text": payload.get("text", ""), "date": payload.get("SittingDate"), "house": payload.get("House"), "member_id": payload.get("MemberId"), "member_name": payload.get("MemberName"), "relevance_score": hit.score if hasattr(hit, "score") else 1.0, "debate_title": payload.get("DebateSection", ""), "debate_url": payload.get("debate_url", ""), "contribution_url": payload.get("contribution_url", ""), "order_in_debate": payload.get("OrderInDebateSection"), "debate_parents": payload.get("debate_parents", []), } ) results.append(group_results) return results async def search_parliamentary_questions( self, query: str | None = None, date_from: str | None = None, date_to: str | None = None, party: str | None = None, asking_member_id: int | None = None, answering_body_name: str | None = None, min_score: float = 0, max_results: int = 25, ) -> list[dict]: """ Search Parliamentary Questions using Qdrant vector search. Args: query: Text to search for in parliamentary questions date_from: Start date in format 'YYYY-MM-DD' (optional) date_to: End date in format 'YYYY-MM-DD' (optional) party: Filter by party (optional) asking_member_id: Filter by member id (optional) answering_body_name: Filter by answering body name (optional) min_score: Minimum relevance score (default 0) max_results: Maximum number of results to return (default 25) """ # Build filters filter_conditions = [ build_date_range_filter(date_from, date_to, "dateTabled"), build_match_filter("askingMember.party", party), build_match_filter("askingMember.id", asking_member_id), ] if answering_body_name: filter_conditions.append( FieldCondition(key="answeringBodyName", match=models.MatchText(text=answering_body_name)) ) query_filter = build_filters(filter_conditions) # First find the ID of any questions with any relevant chunks if query: # Generate embedding for search query dense_query_vector = await self.embed_query_dense(query) sparse_query_vector = self.embed_query_sparse(query) # Perform vector search query_response = await self.qdrant_client.query_points( collection_name=self.settings.PARLIAMENTARY_QUESTIONS_COLLECTION, prefetch=[ models.Prefetch( query=dense_query_vector, using="text_dense", limit=max_results, filter=query_filter, ), models.Prefetch( query=sparse_query_vector, using="text_sparse", limit=max_results, filter=query_filter, ), ], query=models.FusionQuery( fusion=models.Fusion.RRF, ), limit=max_results, score_threshold=min_score, with_payload=True, ) relevant_questions_ids = [hit.payload["id"] for hit in query_response.points] else: # If no query, use scroll to get results with filters only query_response, _ = await self.qdrant_client.scroll( collection_name=self.settings.PARLIAMENTARY_QUESTIONS_COLLECTION, scroll_filter=query_filter, limit=max_results, with_payload=True, order_by={ "key": "id", "direction": "desc", }, ) relevant_questions_ids = [record.payload["id"] for record in query_response] if len(relevant_questions_ids) == 0: return [] # Then get the full details of the questions query_response = await self.qdrant_client.query_points_groups( collection_name=self.settings.PARLIAMENTARY_QUESTIONS_COLLECTION, query_filter=Filter( must=[ models.FieldCondition(key="id", match=models.MatchAny(any=relevant_questions_ids)), ] ), limit=max_results, with_payload=True, with_vectors=False, group_by="id", group_size=100, ) results = [] for group in query_response.groups: # For PQs, question and answer chunks are stored as separate chunks, so we have to piece them together payloads = [hit.payload for hit in group.hits] answer_chunks = [(hit["chunk_id"], hit["text"]) for hit in payloads if hit["chunk_type"] == "answer"] question_chunks = [(hit["chunk_id"], hit["text"]) for hit in payloads if hit["chunk_type"] == "question"] answer_text = "\n".join([text for _, text in sorted(answer_chunks)]) question_text = "\n".join([text for _, text in sorted(question_chunks)]) # use the latest created_at payload payload = max(payloads, key=lambda x: x.get("created_at")) uin = payload.get("uin") tabled_date = parse_date(payload.get("dateTabled")) results.append( ( payload.get("created_at"), { "question_text": question_text, "answer_text": answer_text, "chunk_type": payload.get("chunk_type"), "askingMember": payload.get("askingMember"), "answeringMember": payload.get("answeringMember"), "dateTabled": parse_date(payload.get("dateTabled")), "dateAnswered": parse_date(payload.get("dateAnswered")), "answeringBodyName": payload.get("answeringBodyName"), "question_url": f"https://questions-statements.parliament.uk/written-questions/detail/{tabled_date}/{uin}", "created_at": payload.get("created_at"), }, ) ) # return the most recently updated question return [result for _, result in sorted(results, key=lambda x: x[0], reverse=True)]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/i-dot-ai/parliament-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server