omega_rag_query
Retrieve semantically similar text fragments from a provenance-tracked knowledge store. Use natural language queries for meaning-based search instead of exact keyword matching.
Instructions
Searches the provenance RAG store using semantic similarity and returns ranked text fragments. Use this for meaning-based search; use omega_vault_search instead for exact keyword matching. Returns JSON array of {fragment, similarity_score, quality_score, source, tier}.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | Natural-language search query, e.g. 'How was the authentication module designed?'. | |
| top_k | No | Maximum number of results to return, between 1 and 50. |
Implementation Reference
- omega_brain_mcp_standalone.py:1207-1209 (handler)The call_tool handler for 'omega_rag_query' — delegates to _rag_search with the query and top_k arguments, returns JSON results.
elif name == "omega_rag_query": result = _rag_search(arguments["query"], int(arguments.get("top_k", 5))) return [TextContent(type="text", text=json.dumps(result, indent=2))] - MCP Tool definition (schema) for omega_rag_query: describes the tool, defines inputSchema with 'query' (required string) and 'top_k' (optional integer, default 5).
Tool(name="omega_rag_query", description=( "Searches the provenance RAG store using semantic similarity and returns ranked text fragments. " "Use this for meaning-based search; use omega_vault_search instead for exact keyword matching. " "Returns JSON array of {fragment, similarity_score, quality_score, source, tier}." ), inputSchema={"type": "object", "properties": { "query": { "type": "string", "description": "Natural-language search query, e.g. 'How was the authentication module designed?'." }, "top_k": { "type": "integer", "default": 5, "description": "Maximum number of results to return, between 1 and 50." } }, "required": ["query"]}), - omega_brain_mcp_standalone.py:968-1195 (registration)The list_tools handler registers 'omega_rag_query' as an MCP tool via Tool(name='omega_rag_query', ...) in the list returned by @app.list_tools().
@app.list_tools() async def list_tools(): return [ Tool(name="omega_preload_context", description=( "Loads episodic context for a new task by querying the RAG store, vault history, and any sealed handoff. " "Call this once at the start of every new task before doing any work. " "Returns JSON with fields: rag_matches, vault_history, handoff, continuity_type (CONTINUATION | CONTEXT_SWITCH | FRESH_START)." ), inputSchema={"type": "object", "properties": { "task": { "type": "string", "description": "Natural-language description of the task to load context for, e.g. 'Fix authentication bug in login module'." } }, "required": ["task"]}), Tool(name="omega_rag_query", description=( "Searches the provenance RAG store using semantic similarity and returns ranked text fragments. " "Use this for meaning-based search; use omega_vault_search instead for exact keyword matching. " "Returns JSON array of {fragment, similarity_score, quality_score, source, tier}." ), inputSchema={"type": "object", "properties": { "query": { "type": "string", "description": "Natural-language search query, e.g. 'How was the authentication module designed?'." }, "top_k": { "type": "integer", "default": 5, "description": "Maximum number of results to return, between 1 and 50." } }, "required": ["query"]}), Tool(name="omega_ingest", description=( "Stores a new knowledge fragment in the provenance RAG store with source and evidence tier metadata. " "Use this to persist decisions, patterns, or findings for future retrieval via omega_rag_query. " "Returns JSON with fields: fragment_id, stored (boolean), timestamp." ), inputSchema={"type": "object", "properties": { "content": { "type": "string", "description": "Text content to store, e.g. 'Switched from Poetry to setuptools for pyproject.toml compatibility'." }, "source": { "type": "string", "description": "Origin identifier for provenance tracking, e.g. 'code-review', 'user-session', 'documentation'." }, "tier": { "type": "string", "description": "Evidence confidence tier: A (verified/reproducible), B (reliable), C (single source), D (unverified).", "default": "B", "enum": ["A", "B", "C", "D"] } }, "required": ["content"]}), Tool(name="omega_vault_search", description=( "Searches the vault database using exact keyword matching via SQLite FTS5. " "Use this for precise keyword lookups; use omega_rag_query instead for semantic/meaning-based search. " "Returns JSON array of matching vault entries with timestamps and session context." ), inputSchema={"type": "object", "properties": { "query": { "type": "string", "description": "FTS5 keyword query supporting AND, OR, NOT, and quoted phrases, e.g. '\"deploy production\" NOT staging'." } }, "required": ["query"]}), Tool(name="omega_cortex_check", description=( "Read-only alignment gate that measures semantic similarity between a proposed action and the task baseline. " "Use this to check alignment before high-impact operations without modifying any arguments; " "use omega_cortex_steer instead if you want automatic argument correction. " "Returns JSON with fields: approved (boolean), similarity (float 0-1), verdict (APPROVED | BLOCKED)." ), inputSchema={"type": "object", "properties": { "tool": { "type": "string", "description": "Name of the tool to check alignment for, e.g. 'omega_ingest'." }, "args": { "type": "object", "description": "The proposed arguments for the tool call, serialized as a JSON object." }, "baseline_prompt": { "type": "string", "description": "Task baseline describing the intended operation, e.g. 'Refactoring the auth module for OAuth2 support'." } }, "required": ["tool", "args", "baseline_prompt"]}), Tool(name="omega_cortex_steer", description=( "Alignment gate with automatic argument correction for drifting tool calls. " "Use this instead of omega_cortex_check when you want arguments auto-corrected toward the baseline; " "blocks hard if similarity < 0.45, steers if 0.45-0.65, passes unchanged if > 0.65. " "Returns JSON with fields: similarity (float), steered_args (object), corrections (array), verdict (PASSED | STEERED | BLOCKED)." ), inputSchema={"type": "object", "properties": { "tool": { "type": "string", "description": "Name of the tool whose arguments may need correction, e.g. 'omega_seal_run'." }, "args": { "type": "object", "description": "The original arguments that may be drifting from baseline. Will be corrected if in the steering range." }, "baseline_prompt": { "type": "string", "description": "Task baseline to steer toward, e.g. 'Deploying hotfix to staging environment'." } }, "required": ["tool", "args", "baseline_prompt"]}), Tool(name="omega_seal_run", description=( "Appends a tamper-proof entry to the SEAL (Secure Evidence Audit Ledger) SHA-256 hash chain. " "Use this to create an immutable audit record of significant events, decisions, or state changes. " "Returns JSON with fields: seal_hash (hex string), chain_position (integer), timestamp (ISO 8601)." ), inputSchema={"type": "object", "properties": { "context": { "type": "object", "description": "Structured event metadata, e.g. {\"action\": \"deploy\", \"target\": \"production\", \"version\": \"2.1.0\"}." }, "response": { "type": "string", "description": "Outcome text to seal into the immutable ledger, e.g. 'Deployment succeeded with zero errors'." } }, "required": ["context", "response"]}), Tool(name="omega_log_session", description=( "Writes a complete session record to the vault for cross-session persistence. " "Use this at the end of a work session to record what was done; data is retrievable via omega_vault_search. " "Returns JSON with fields: session_id, stored (boolean), entry_count (integer)." ), inputSchema={"type": "object", "properties": { "session_id": { "type": "string", "description": "Unique session identifier. Auto-generated if omitted." }, "task": { "type": "string", "description": "Description of the task completed, e.g. 'Migrated database schema to v3'." }, "decisions": { "type": "array", "items": {"type": "string"}, "description": "Key decisions made, e.g. ['Used Alembic for migrations', 'Kept backward compatibility']." }, "files_modified": { "type": "array", "items": {"type": "string"}, "description": "File paths changed, e.g. ['src/models.py', 'alembic/versions/001.py']." } }, "required": ["task"]}), Tool(name="omega_write_handoff", description=( "Creates a SHA-256 sealed handoff document that auto-loads on the next server restart via omega://session/preload. " "Use this at the end of a session to ensure seamless context continuity for the next session. " "Returns JSON with fields: handoff_hash (hex string), file_path (string)." ), inputSchema={"type": "object", "properties": { "task": { "type": "string", "description": "Task title for the handoff, e.g. 'OAuth2 migration phase 2'." }, "summary": { "type": "string", "description": "Concise summary of progress and current state for the next session." }, "decisions": { "type": "array", "items": {"type": "string"}, "description": "Key decisions the next session should know about." }, "files_modified": { "type": "array", "items": {"type": "string"}, "description": "Files changed during this session." }, "next_steps": { "type": "array", "items": {"type": "string"}, "description": "Ordered list of recommended next actions." }, "conversation_id": { "type": "string", "description": "Optional external conversation tracking ID." } }, "required": ["task", "summary"]}), Tool(name="omega_execute", description=( "Cortex-governed execution wrapper that checks alignment, steers if needed, executes, and auto-logs to the SEAL chain. " "Use this as the default way to invoke any Omega Brain tool with full governance; " "only wraps Omega Brain tools — external tools are returned with steered_args for manual invocation. " "Returns JSON with fields: result (object), cortex_verdict (string), seal_hash (hex string)." ), inputSchema={"type": "object", "properties": { "tool": { "type": "string", "description": "Omega Brain tool name to execute, e.g. 'omega_ingest', 'omega_rag_query', 'omega_seal_run'." }, "args": { "type": "object", "description": "Arguments for the target tool. May be steered by the Cortex before execution." }, "baseline": { "type": "string", "description": "Task baseline for the Cortex alignment check, e.g. 'Ingesting code review findings'." } }, "required": ["tool", "args", "baseline"]}), Tool(name="omega_brain_report", description=( "Generates a human-readable audit report showing SEAL chain entries, Cortex verdicts, and vault statistics. " "Use this to inspect the trust and governance layer; use omega_brain_status for a quick health summary instead. " "Returns formatted text report with sections: seal_tail, cortex_verdicts, vault_stats, session_health." ), inputSchema={"type": "object", "properties": { "lines": { "type": "integer", "description": "Number of recent SEAL ledger entries to include, between 1 and 100.", "default": 10 } }}), Tool(name="omega_brain_status", description=( "Returns a quick health summary of all Omega Brain subsystems as structured JSON. " "Use this for a fast status check; use omega_brain_report for a detailed audit report instead. " "Returns JSON with fields: vault_sessions (int), vault_entries (int), rag_fragments (int), " "seal_entries (int), session_id (string), uptime_seconds (float), call_count (int)." ), inputSchema={"type": "object", "properties": {}}), ] + (_veritas_build_tools() if HAS_BUILD_GATES else []) - _rag_search() — the core semantic search function over RAG fragments. Called by the omega_rag_query handler. Computes embeddings, cosine similarity, ranks results, and calculates VERITAS score.
def _rag_search(query: str, top_k: int = 5) -> dict: """Semantic search over stored RAG fragments. Returns top-k with VERITAS score.""" q_vec = _embed(query) conn = _db() rows = conn.execute("SELECT id, content, source, tier, embedding FROM fragments").fetchall() conn.close() results = [] for row in rows: try: fv = json.loads(row["embedding"] or "[]") sim = _cosine(q_vec, fv) results.append({ "id": row["id"], "content": row["content"][:500], "source": row["source"], "tier": row["tier"], "score": round(sim, 4), }) except Exception: pass results.sort(key=lambda x: x["score"], reverse=True) top = results[:top_k] # VERITAS scoring tier_map = {"A": 1.0, "B": 0.85, "C": 0.70, "D": 0.55} quality = (sum(tier_map.get(r["tier"], 0.5) for r in top) / len(top)) if top else 0.0 sources = set(r["source"] for r in top) indep = 1.0 if len(sources) >= 2 else 0.7 scores = [r["score"] for r in top if r["score"] > 0] spread = (max(scores) - min(scores)) if len(scores) >= 2 else 0.0 agreement = max(0.0, 1.0 - spread) veritas_score = round(min(1.0, max(0.0, agreement * quality * indep)), 4) return { "query": query, "fragments": top, "veritas_score": veritas_score, "fragment_count": len(top), "total_indexed": len(rows), "session_id": _SESSION_ID, }