Skip to main content
Glama

search_knowledge

Search knowledge base using hybrid semantic and keyword search with cross-encoder reranking to find relevant information across categorized content.

Instructions

Hybrid search combining semantic search + BM25 keyword search with cross-encoder reranking.

Args:
    query: Search query text
    max_results: Maximum number of results (default: 5, max: 20)
    category: Optional category filter (security, ctf, logscale, development, general, redteam, blueteam)
    hybrid_alpha: Balance between semantic and keyword search (0.0 = keyword only, 1.0 = semantic only, default: 0.3)

Returns:
    JSON string with search results including content, source, relevance score, and search method used

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryYes
max_resultsNo
categoryNo
hybrid_alphaNo

Implementation Reference

  • Tool registration and handler function for search_knowledge. It uses an internal KnowledgeOrchestrator to perform the hybrid search.
    @mcp.tool()
    def search_knowledge(
        query: str,
        max_results: int = 5,
        category: str = None,
        hybrid_alpha: float = 0.3
    ) -> str:
        """
        Hybrid search combining semantic search + BM25 keyword search with cross-encoder reranking.
    
        Args:
            query: Search query text
            max_results: Maximum number of results (default: 5, max: 20)
            category: Optional category filter (security, ctf, logscale, development, general, redteam, blueteam)
            hybrid_alpha: Balance between semantic and keyword search (0.0 = keyword only, 1.0 = semantic only, default: 0.3)
    
        Returns:
            JSON string with search results including content, source, relevance score, and search method used
        """
        if not query or not query.strip():
            return json.dumps({"status": "error", "message": "Query cannot be empty"})
    
        max_results = max(1, min(max_results or 5, config.max_results))
        hybrid_alpha = max(0.0, min(hybrid_alpha if hybrid_alpha is not None else 0.3, 1.0))
    
        valid_categories = list(config.keyword_routes.keys()) + ["general"]
        if category and category not in valid_categories:
            return json.dumps({"status": "error", "message": f"Invalid category '{category}'. Valid: {', '.join(valid_categories)}"})
    
        orchestrator = get_orchestrator()
        results = orchestrator.query(query.strip(), max_results=max_results, category_filter=category, hybrid_alpha=hybrid_alpha)
    
        if not results:
            return json.dumps({"status": "no_results", "query": query, "message": "No relevant documents found."})
    
        return json.dumps({
            "status": "success",
            "query": query,
            "hybrid_alpha": hybrid_alpha,
            "result_count": len(results),
            "cache_hit_rate": orchestrator.query_cache.stats()["hit_rate"],
            "results": results
        }, indent=2, ensure_ascii=False)
  • The core business logic of KnowledgeOrchestrator.query which implements the RRF fusion and reranking pipeline.
    def query(
        self,
        query_text: str,
        max_results: int = None,
        category_filter: Optional[str] = None,
        hybrid_alpha: float = 0.5
    ) -> List[Dict[str, Any]]:
        """
        Hybrid search with RRF fusion + cross-encoder reranking.
    
        Pipeline: Semantic + BM25 -> RRF fusion -> Reranker -> Results
        """
        max_results = max_results or config.default_results
    
        # Check cache
        cached = self.query_cache.get(query_text, max_results, category_filter, hybrid_alpha)
        if cached is not None:
            return cached
    
        self._ensure_bm25_index()
    
        # Keyword routing
        routed_category = self._route_by_keywords(query_text)
        where_filter = None
        if category_filter:
            where_filter = {"category": category_filter}
        elif routed_category:
            where_filter = {"category": routed_category}
    
        # Semantic search (ChromaDB)
        semantic_results = {}
        if hybrid_alpha > 0:
            try:
                n_candidates = min(max_results * 3, config.max_results)
                results = self.collection.query(
                    query_texts=[query_text],
                    n_results=n_candidates,
                    where=where_filter,
                    include=["documents", "metadatas", "distances"]
                )
    
                if results["ids"] and results["ids"][0]:
                    for i, chunk_id in enumerate(results["ids"][0]):
                        semantic_results[chunk_id] = {
                            "rank": i + 1,
                            "distance": results["distances"][0][i] if results["distances"] else 0,
                            "document": results["documents"][0][i] if results["documents"] else "",
                            "metadata": results["metadatas"][0][i] if results["metadatas"] else {}
                        }
            except Exception as e:
                print(f"[WARN] Semantic search failed: {e}")
    
        # BM25 keyword search
        bm25_results = {}
        if hybrid_alpha < 1.0:
            try:
                bm25_hits = self.bm25_index.search(query_text, top_k=max_results * 3)
                for rank, (chunk_id, bm25_score) in enumerate(bm25_hits):
                    bm25_results[chunk_id] = {
                        "rank": rank + 1,
                        "bm25_score": bm25_score
                    }
            except Exception as e:
                print(f"[WARN] BM25 search failed: {e}")
    
        # RRF Fusion
        RRF_K = 60
        combined_scores: Dict[str, Dict] = {}
        all_chunk_ids = set(semantic_results.keys()) | set(bm25_results.keys())
    
        for chunk_id in all_chunk_ids:
            semantic_rank = semantic_results.get(chunk_id, {}).get("rank", 1000)
            bm25_rank = bm25_results.get(chunk_id, {}).get("rank", 1000)
    
            semantic_rrf = hybrid_alpha * (1 / (RRF_K + semantic_rank))
            bm25_rrf = (1 - hybrid_alpha) * (1 / (RRF_K + bm25_rank))
            combined_rrf = semantic_rrf + bm25_rrf
    
            if chunk_id in semantic_results:
                data = semantic_results[chunk_id]
            else:
                try:
                    fetched = self.collection.get(ids=[chunk_id], include=["documents", "metadatas"])
                    data = {
                        "document": fetched["documents"][0] if fetched["documents"] else "",
                        "metadata": fetched["metadatas"][0] if fetched["metadatas"] else {},
                        "distance": 0
                    }
                except Exception:
                    continue
    
            combined_scores[chunk_id] = {
                "rrf_score": combined_rrf,
                "semantic_rank": semantic_rank if chunk_id in semantic_results else None,
                "bm25_rank": bm25_rank if chunk_id in bm25_results else None,
                "document": data.get("document", ""),
                "metadata": data.get("metadata", {}),
                "distance": data.get("distance", 0)
            }
    
        # Sort by RRF score — take extra candidates for reranker
        reranker_k = max_results * config.reranker_top_k_multiplier if config.reranker_enabled else max_results
        sorted_results = sorted(
            combined_scores.items(),
            key=lambda x: x[1]["rrf_score"],
            reverse=True
        )[:reranker_k]
    
        # Cross-encoder reranking
        if config.reranker_enabled and sorted_results:
            rerank_input = []
            for chunk_id, data in sorted_results:
                rerank_input.append({
                    "chunk_id": chunk_id,
                    "document": data["document"],
                    "metadata": data["metadata"],
                    "rrf_score": data["rrf_score"],
                    "semantic_rank": data["semantic_rank"],
                    "bm25_rank": data["bm25_rank"],
                    "distance": data["distance"],
                })
            reranked = self.reranker.rerank(query_text, rerank_input, top_k=max_results)
            sorted_results = [(d["chunk_id"], d) for d in reranked]
    
        # Normalize scores and format
        if sorted_results:
            raw_scores = [data.get("reranker_score", data.get("rrf_score", 0)) for _, data in sorted_results]
            max_score = max(raw_scores) if raw_scores else 1
            min_score = min(raw_scores) if raw_scores else 0
            score_range = max_score - min_score
        else:
            score_range = 0
    
        formatted = []
        for chunk_id, data in sorted_results[:max_results]:
            metadata = data.get("metadata", {})
            s_rank = data.get("semantic_rank")
            b_rank = data.get("bm25_rank")
    
            if s_rank and b_rank:
                search_method = "hybrid"
            elif s_rank:
                search_method = "semantic"
            else:
                search_method = "keyword"
    
            raw = data.get("reranker_score", data.get("rrf_score", 0))
            normalized_score = (raw - min_score) / score_range if score_range > 0 else 1.0
    
            formatted.append({
                "content": data.get("document", ""),
                "source": metadata.get("source", ""),
                "filename": metadata.get("filename", ""),
                "category": metadata.get("category", ""),
                "chunk_index": metadata.get("chunk_index", 0),
                "score": round(normalized_score, 4),
                "raw_rrf_score": round(data.get("rrf_score", 0), 6),
                "reranker_score": round(data.get("reranker_score", 0), 6) if "reranker_score" in data else None,
                "semantic_rank": s_rank,
                "bm25_rank": b_rank,
                "search_method": search_method,
                "keywords": metadata.get("keywords", "").split(","),
                "routed_by": routed_category if routed_category else "none"
            })
    
        self.query_cache.put(query_text, max_results, category_filter, hybrid_alpha, formatted)
        return formatted

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/lyonzin/knowledge-rag'

If you have feedback or need assistance with the MCP directory API, please join our Discord server