search_knowledge
Search knowledge base using hybrid semantic and keyword search with cross-encoder reranking to find relevant information across categorized content.
Instructions
Hybrid search combining semantic search + BM25 keyword search with cross-encoder reranking.
Args:
query: Search query text
max_results: Maximum number of results (default: 5, max: 20)
category: Optional category filter (security, ctf, logscale, development, general, redteam, blueteam)
hybrid_alpha: Balance between semantic and keyword search (0.0 = keyword only, 1.0 = semantic only, default: 0.3)
Returns:
JSON string with search results including content, source, relevance score, and search method usedInput Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | ||
| max_results | No | ||
| category | No | ||
| hybrid_alpha | No |
Implementation Reference
- mcp_server/server.py:1297-1339 (handler)Tool registration and handler function for search_knowledge. It uses an internal KnowledgeOrchestrator to perform the hybrid search.
@mcp.tool() def search_knowledge( query: str, max_results: int = 5, category: str = None, hybrid_alpha: float = 0.3 ) -> str: """ Hybrid search combining semantic search + BM25 keyword search with cross-encoder reranking. Args: query: Search query text max_results: Maximum number of results (default: 5, max: 20) category: Optional category filter (security, ctf, logscale, development, general, redteam, blueteam) hybrid_alpha: Balance between semantic and keyword search (0.0 = keyword only, 1.0 = semantic only, default: 0.3) Returns: JSON string with search results including content, source, relevance score, and search method used """ if not query or not query.strip(): return json.dumps({"status": "error", "message": "Query cannot be empty"}) max_results = max(1, min(max_results or 5, config.max_results)) hybrid_alpha = max(0.0, min(hybrid_alpha if hybrid_alpha is not None else 0.3, 1.0)) valid_categories = list(config.keyword_routes.keys()) + ["general"] if category and category not in valid_categories: return json.dumps({"status": "error", "message": f"Invalid category '{category}'. Valid: {', '.join(valid_categories)}"}) orchestrator = get_orchestrator() results = orchestrator.query(query.strip(), max_results=max_results, category_filter=category, hybrid_alpha=hybrid_alpha) if not results: return json.dumps({"status": "no_results", "query": query, "message": "No relevant documents found."}) return json.dumps({ "status": "success", "query": query, "hybrid_alpha": hybrid_alpha, "result_count": len(results), "cache_hit_rate": orchestrator.query_cache.stats()["hit_rate"], "results": results }, indent=2, ensure_ascii=False) - mcp_server/server.py:719-885 (handler)The core business logic of KnowledgeOrchestrator.query which implements the RRF fusion and reranking pipeline.
def query( self, query_text: str, max_results: int = None, category_filter: Optional[str] = None, hybrid_alpha: float = 0.5 ) -> List[Dict[str, Any]]: """ Hybrid search with RRF fusion + cross-encoder reranking. Pipeline: Semantic + BM25 -> RRF fusion -> Reranker -> Results """ max_results = max_results or config.default_results # Check cache cached = self.query_cache.get(query_text, max_results, category_filter, hybrid_alpha) if cached is not None: return cached self._ensure_bm25_index() # Keyword routing routed_category = self._route_by_keywords(query_text) where_filter = None if category_filter: where_filter = {"category": category_filter} elif routed_category: where_filter = {"category": routed_category} # Semantic search (ChromaDB) semantic_results = {} if hybrid_alpha > 0: try: n_candidates = min(max_results * 3, config.max_results) results = self.collection.query( query_texts=[query_text], n_results=n_candidates, where=where_filter, include=["documents", "metadatas", "distances"] ) if results["ids"] and results["ids"][0]: for i, chunk_id in enumerate(results["ids"][0]): semantic_results[chunk_id] = { "rank": i + 1, "distance": results["distances"][0][i] if results["distances"] else 0, "document": results["documents"][0][i] if results["documents"] else "", "metadata": results["metadatas"][0][i] if results["metadatas"] else {} } except Exception as e: print(f"[WARN] Semantic search failed: {e}") # BM25 keyword search bm25_results = {} if hybrid_alpha < 1.0: try: bm25_hits = self.bm25_index.search(query_text, top_k=max_results * 3) for rank, (chunk_id, bm25_score) in enumerate(bm25_hits): bm25_results[chunk_id] = { "rank": rank + 1, "bm25_score": bm25_score } except Exception as e: print(f"[WARN] BM25 search failed: {e}") # RRF Fusion RRF_K = 60 combined_scores: Dict[str, Dict] = {} all_chunk_ids = set(semantic_results.keys()) | set(bm25_results.keys()) for chunk_id in all_chunk_ids: semantic_rank = semantic_results.get(chunk_id, {}).get("rank", 1000) bm25_rank = bm25_results.get(chunk_id, {}).get("rank", 1000) semantic_rrf = hybrid_alpha * (1 / (RRF_K + semantic_rank)) bm25_rrf = (1 - hybrid_alpha) * (1 / (RRF_K + bm25_rank)) combined_rrf = semantic_rrf + bm25_rrf if chunk_id in semantic_results: data = semantic_results[chunk_id] else: try: fetched = self.collection.get(ids=[chunk_id], include=["documents", "metadatas"]) data = { "document": fetched["documents"][0] if fetched["documents"] else "", "metadata": fetched["metadatas"][0] if fetched["metadatas"] else {}, "distance": 0 } except Exception: continue combined_scores[chunk_id] = { "rrf_score": combined_rrf, "semantic_rank": semantic_rank if chunk_id in semantic_results else None, "bm25_rank": bm25_rank if chunk_id in bm25_results else None, "document": data.get("document", ""), "metadata": data.get("metadata", {}), "distance": data.get("distance", 0) } # Sort by RRF score — take extra candidates for reranker reranker_k = max_results * config.reranker_top_k_multiplier if config.reranker_enabled else max_results sorted_results = sorted( combined_scores.items(), key=lambda x: x[1]["rrf_score"], reverse=True )[:reranker_k] # Cross-encoder reranking if config.reranker_enabled and sorted_results: rerank_input = [] for chunk_id, data in sorted_results: rerank_input.append({ "chunk_id": chunk_id, "document": data["document"], "metadata": data["metadata"], "rrf_score": data["rrf_score"], "semantic_rank": data["semantic_rank"], "bm25_rank": data["bm25_rank"], "distance": data["distance"], }) reranked = self.reranker.rerank(query_text, rerank_input, top_k=max_results) sorted_results = [(d["chunk_id"], d) for d in reranked] # Normalize scores and format if sorted_results: raw_scores = [data.get("reranker_score", data.get("rrf_score", 0)) for _, data in sorted_results] max_score = max(raw_scores) if raw_scores else 1 min_score = min(raw_scores) if raw_scores else 0 score_range = max_score - min_score else: score_range = 0 formatted = [] for chunk_id, data in sorted_results[:max_results]: metadata = data.get("metadata", {}) s_rank = data.get("semantic_rank") b_rank = data.get("bm25_rank") if s_rank and b_rank: search_method = "hybrid" elif s_rank: search_method = "semantic" else: search_method = "keyword" raw = data.get("reranker_score", data.get("rrf_score", 0)) normalized_score = (raw - min_score) / score_range if score_range > 0 else 1.0 formatted.append({ "content": data.get("document", ""), "source": metadata.get("source", ""), "filename": metadata.get("filename", ""), "category": metadata.get("category", ""), "chunk_index": metadata.get("chunk_index", 0), "score": round(normalized_score, 4), "raw_rrf_score": round(data.get("rrf_score", 0), 6), "reranker_score": round(data.get("reranker_score", 0), 6) if "reranker_score" in data else None, "semantic_rank": s_rank, "bm25_rank": b_rank, "search_method": search_method, "keywords": metadata.get("keywords", "").split(","), "routed_by": routed_category if routed_category else "none" }) self.query_cache.put(query_text, max_results, category_filter, hybrid_alpha, formatted) return formatted