Skip to main content
Glama
service.py10.5 kB
"""Search service implementation.""" import asyncio from typing import List, Dict, Any, Optional, Set, Tuple from cmcp.kb.path import PathComponents from cmcp.utils.logging import get_logger logger = get_logger(__name__) class SearchService: """Orchestrates search operations, acting as the async business logic layer.""" def __init__(self, search_index_manager, search_recovery, document_store, search_relation_predicates, search_graph_neighbor_limit): """Initializes the SearchService. Args: search_index_manager: The synchronous index manager. search_recovery: The index recovery utility. document_store: The document storage utility. search_relation_predicates: Predicates to follow during graph expansion. search_graph_neighbor_limit: Max number of neighbors for graph search. """ self.search_index_manager = search_index_manager self.search_recovery = search_recovery self.document_store = document_store self.reranker = self.search_index_manager.reranker self.search_relation_predicates = search_relation_predicates self.search_graph_neighbor_limit = search_graph_neighbor_limit async def search(self, query: Optional[str] = None, graph_seed_urns: Optional[List[str]] = None, graph_expand_hops: int = 0, relation_predicates: Optional[List[str]] = None, top_k_sparse: int = 50, top_k_rerank: int = 10, filter_urns: Optional[List[str]] = None, include_content: bool = False, include_index: bool = False, use_reranker: bool = True, fuzzy_distance: int = 0) -> List[Dict[str, Any]]: """Search the knowledge base using text query and/or graph expansion.""" if not query and not filter_urns: raise ValueError("Search requires either a query or filter_urns.") if relation_predicates is None: relation_predicates = self.search_relation_predicates if not include_content and use_reranker: logger.warning("Disabling reranking because include_content=False.") use_reranker = False logger.debug(f"Search request: query='{query}', graph_seed_urns={graph_seed_urns}, hops={graph_expand_hops}") candidate_urns, sparse_scores = await self._get_candidate_urns( query, graph_seed_urns, graph_expand_hops, relation_predicates, top_k_sparse, filter_urns, fuzzy_distance ) if not candidate_urns: return [] docs_with_data, error_docs = await self._fetch_content_for_candidates( candidate_urns, sparse_scores, include_content, include_index, use_reranker, query ) final_results = await self._prepare_final_results( query, docs_with_data, error_docs, sparse_scores, use_reranker, top_k_sparse, top_k_rerank ) return final_results async def update_document_in_indices(self, urn: str, content: str): """Coordinates the update of a document in all relevant search indices.""" # This will be expanded to include graph updates if necessary. await asyncio.to_thread( self.search_index_manager.update_sparse_index, urn, content ) async def delete_document_from_indices(self, urn: str): """Coordinates the deletion of a document from all search indices.""" await asyncio.to_thread(self.search_index_manager.delete_sparse_index, urn) await asyncio.to_thread(self.search_index_manager.delete_document_from_graph, urn) async def move_document_in_indices(self, old_urn: str, new_urn: str, content: str): """Coordinates moving a document in all relevant search indices.""" await asyncio.to_thread( self.search_index_manager.update_moved_document_sparse, old_urn, new_urn, content ) await asyncio.to_thread( self.search_index_manager.update_moved_document_graph, old_urn, new_urn ) async def add_triple_to_indices(self, subject: str, predicate: str, object_urn: str, triple_type: str): """Adds a triple to the graph index.""" await asyncio.to_thread( self.search_index_manager.add_triple, subject, predicate, object_urn, triple_type ) async def delete_triple_from_indices(self, subject: str, predicate: str, object_urn: str, triple_type: str): """Deletes a triple from the graph index.""" await asyncio.to_thread( self.search_index_manager.delete_triple, subject, predicate, object_urn, triple_type ) async def recover_indices(self, rebuild_all: bool = False) -> Dict[str, Any]: """Delegates index recovery to the SearchIndexRecovery utility.""" return await self.search_recovery.recover_indices(rebuild_all=rebuild_all) async def _get_candidate_urns(self, query: Optional[str], graph_seed_urns: Optional[List[str]], graph_expand_hops: int, relation_predicates: List[str], top_k_sparse: int, filter_urns: Optional[List[str]], fuzzy_distance: int = 0) -> Tuple[Set[str], Dict[str, float]]: """Get candidate URNs from sparse search and graph expansion.""" candidate_urns = set(graph_seed_urns or []) sparse_scores = {} # urn -> score if query: sparse_results = await asyncio.to_thread( self.search_index_manager.search_sparse, query, top_k_sparse, fuzzy_distance, filter_urns ) for urn, score in sparse_results: sparse_scores[urn] = score if graph_seed_urns is None or urn in graph_seed_urns: candidate_urns.add(urn) if graph_expand_hops > 0 and candidate_urns: current_urns = set(candidate_urns) all_expanded_urns = set(candidate_urns) for _ in range(graph_expand_hops): if not current_urns: break neighbors = await asyncio.to_thread( self.search_index_manager.find_neighbors, list(current_urns), relation_predicates, self.search_graph_neighbor_limit, filter_urns ) new_urns = neighbors - all_expanded_urns all_expanded_urns.update(new_urns) current_urns = new_urns candidate_urns = all_expanded_urns return candidate_urns, sparse_scores async def _fetch_content_for_candidates(self, candidate_urns: Set[str], sparse_scores: Dict[str, float], include_content: bool, include_index: bool, use_reranker: bool, query: Optional[str] = None) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: """Fetch content and/or index for candidate URNs.""" need_content_for_reranking = use_reranker and query and self.reranker fetch_content = include_content or need_content_for_reranking docs_with_data, error_docs = [], [] for urn in candidate_urns: try: components = PathComponents.parse_path(urn) doc_data = {'urn': urn, 'sparse_score': sparse_scores.get(urn)} content_fetched = False if fetch_content: content = await self.document_store.read_content(components) if content: doc_data['content'] = content content_fetched = True if include_index: index = await self.document_store.read_index(components) doc_data['index'] = index.model_dump() if need_content_for_reranking: if content_fetched: docs_with_data.append(doc_data) else: error_docs.append({'urn': urn, 'error': 'No content for reranking'}) else: docs_with_data.append(doc_data) except Exception as e: logger.warning(f"Failed to process {urn}: {e}") error_docs.append({'urn': urn, 'error': str(e)}) return docs_with_data, error_docs async def _prepare_final_results(self, query: Optional[str], docs_with_data: List[Dict[str, Any]], error_docs: List[Dict[str, Any]], sparse_scores: Dict[str, float], use_reranker: bool, top_k_sparse: int, top_k_rerank: int) -> List[Dict[str, Any]]: """Prepare final results by reranking or sorting.""" final_results = list(error_docs) if use_reranker and query and docs_with_data and self.reranker: reranked_docs = await asyncio.to_thread( self.search_index_manager.rerank_docs, query, docs_with_data ) final_results.extend(reranked_docs) final_results.sort(key=lambda x: x.get('rerank_score', float('-inf')), reverse=True) return final_results[:top_k_rerank] if sparse_scores: combined = docs_with_data + [res for res in error_docs if res['urn'] not in {d['urn'] for d in docs_with_data}] combined.sort(key=lambda x: x.get('sparse_score', float('-inf')), reverse=True) return combined[:top_k_sparse] if docs_with_data: for doc in docs_with_data: if 'sparse_score' not in doc: doc['sparse_score'] = 1.0 docs_with_data.sort(key=lambda x: x['urn']) return docs_with_data final_results.sort(key=lambda x: x['urn']) return final_results[:top_k_rerank]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/54rt1n/container-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server