rag_executeWorkflow
Process user questions by executing the complete RAG workflow: configures settings, generates query embeddings, performs semantic search, and retrieves relevant context chunks for answer generation.
Instructions
Execute complete RAG workflow to answer user questions based on document context.
This function handles the entire RAG pipeline:
Configuration setup (using configurable values from rag_config.yml)
Store user query (with /rag prefix stripping)
Generate query embeddings (tokenization + embedding)
Perform semantic search against chunk embeddings
Return retrieved context chunks for answer generation
The function uses configuration values from rag_config.yml with fallback defaults.
Arguments: question - user question to process k - number of top-k results to return (optional, uses config default if not provided)
Returns: Returns the top-k most relevant chunks with metadata for context-grounded answer generation.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| k | No | ||
| question | Yes |
Implementation Reference
- Main handler function for the 'rag_executeWorkflow' tool. Dispatches to BYOM or IVSM RAG workflow implementations based on configuration.def handle_rag_Execute_Workflow( conn: TeradataConnection, question: str, k: int | None = None, *args, **kwargs, ): """ Execute complete RAG workflow to answer user questions based on document context. This tool handles the entire RAG pipeline in a single step when a user query is tagged with /rag. WORKFLOW STEPS (executed automatically): 1. Configuration setup using configurable values from rag_config.yml 2. Store user query with '/rag ' prefix stripping 3. Generate query embeddings using either BYOM (ONNXEmbeddings) or IVSM functions based on config 4. Perform semantic search against precomputed chunk embeddings 5. Return context chunks for answer generation CONFIGURATION VALUES (from rag_config.yml): - version: 'ivsm' or 'byom' to select embedding approach - All database names, table names, and model settings are configurable - Vector store metadata fields are dynamically detected - Embedding parameters are configurable - Default chunk retrieval count is configurable - Default values are provided as fallback TECHNICAL DETAILS: - Strips the '/rag ' prefix if present from user questions - Creates query table if it does not exist (columns: id, txt, created_ts) - BYOM approach: Uses mldb.ONNXEmbeddings UDF for tokenization and embedding - IVSM approach: Uses ivsm.tokenizer_encode and ivsm.IVSM_score functions - Both approaches store embeddings in configured output table - Uses cosine similarity via TD_VECTORDISTANCE for semantic search - Returns the top-k matching chunks from the configured vector store - Each result includes chunk text, similarity score, and metadata fields CRITICAL ANSWERING RULES: - Answer ONLY using retrieved chunks - no external knowledge, speculation, or inference - Quote source content directly without paraphrasing, summarizing, or rewriting - If no relevant context found: "Not enough information found in the provided context" - If partial context: "The available context does not fully answer the question" - Include document/page references when available (e.g., "On page 2 of 'demo_policy.pdf'...") - Execute entire workflow silently without showing function calls to user - only show final answer LANGUAGE RESTRICTIONS: - Do not say "According to the context" or "The context says" - just quote directly - Do not say "It can be inferred that..." - no inference allowed - Use exact or near-verbatim quotes only - No paraphrasing, summarizing, or adding transitions between quotes - Copy content precisely - no expansion or interpretation BEHAVIORAL REQUIREMENTS: - Execute all RAG workflow steps silently without showing tool calls to users - Only display final answers based strictly on retrieved context chunks - Do not use external knowledge or speculation - only quote retrieved content directly - RAG mode only triggers with explicit /rag prefix - do not enter RAG mode implicitly - Hide all tool execution steps from user - Provide clean, direct response with only the answer based on retrieved chunks - This is a complete end-to-end RAG workflow - no additional tools needed EXECUTION: Run completely silently - user only sees their question and the final context-grounded answer. """ # Use configuration from loaded config config = RAG_CONFIG # Check version and delegate to appropriate implementation version = config.get('version', 'ivsm').lower() if version == 'ivsm': return _execute_rag_workflow_ivsm(conn, question, k, config, *args, **kwargs) elif version == 'byom': return _execute_rag_workflow_byom(conn, question, k, config, *args, **kwargs) else: raise ValueError(f"Unsupported RAG version: {version}. Supported versions: 'byom', 'ivsm'")
- Helper function implementing the BYOM (ONNXEmbeddings) variant of the RAG workflow, including query storage, embedding generation, and semantic search.def _execute_rag_workflow_byom(conn: TeradataConnection, question: str, k: int | None, config: dict, *args, **kwargs): """Execute RAG workflow using BYOM (ONNXEmbeddings)""" # Use config default if k not provided if k is None: k = config['retrieval']['default_k'] # Optional: Enforce max limit max_k = config['retrieval'].get('max_k', 50) if k > max_k: logger.warning(f"Requested k={k} exceeds max_k={max_k}, using max_k") k = max_k logger.debug(f"handle_rag_executeWorkflow (BYOM): question={question[:60]}..., k={k}") # Extract config values database_name = config['databases']['query_db'] table_name = config['tables']['query_table'] dst_table = config['tables']['query_embedding_store'] model_id = config['model']['model_id'] model_db = config['databases']['model_db'] model_table = config['tables']['model_table'] tokenizer_table = config['tables']['tokenizer_table'] vector_db = config['databases']['vector_db'] chunk_embed_table = config['tables']['vector_table'] with conn.cursor() as cur: # Store user query logger.debug(f"Step 2: Storing user query in {database_name}.{table_name}") # Create table if it doesn't exist ddl = f""" CREATE TABLE {database_name}.{table_name} ( id INTEGER GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1) NOT NULL, txt VARCHAR(5000), created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id) ) """ try: cur.execute(ddl) logger.debug(f"Table {database_name}.{table_name} created") except Exception as e: error_msg = str(e).lower() if "already exists" in error_msg or "3803" in error_msg: logger.debug(f"Table {database_name}.{table_name} already exists, skipping creation") else: logger.error(f"Error creating table: {e}") raise # Insert cleaned question insert_sql = f""" INSERT INTO {database_name}.{table_name} (txt) SELECT CASE WHEN TRIM(?) LIKE '/rag %' THEN SUBSTRING(TRIM(?) FROM 6) ELSE TRIM(?) END """ cur.execute(insert_sql, [question, question, question]) # Get inserted ID and cleaned text cur.execute(f"SELECT MAX(id) AS id FROM {database_name}.{table_name}") new_id = cur.fetchone()[0] cur.execute(f"SELECT txt FROM {database_name}.{table_name} WHERE id = ?", [new_id]) cleaned_txt = cur.fetchone()[0] logger.debug(f"Stored query with ID {new_id}: {cleaned_txt[:60]}...") # Generate query embeddings logger.debug(f"Step 3: Generating embeddings in {database_name}.{dst_table}") # Drop existing embeddings table drop_sql = f"DROP TABLE {database_name}.{dst_table}" try: cur.execute(drop_sql) logger.debug(f"Dropped existing table {database_name}.{dst_table}") except Exception as e: logger.debug(f"DROP failed or table not found: {e}") # Create embeddings table create_sql = f""" CREATE TABLE {database_name}.{dst_table} AS ( SELECT * FROM mldb.ONNXEmbeddings( ON (SELECT id, txt FROM {database_name}.{table_name}) ON (SELECT model_id, model FROM {model_db}.{model_table} WHERE model_id = '{model_id}') DIMENSION ON (SELECT model AS tokenizer FROM {model_db}.{tokenizer_table} WHERE model_id = '{model_id}') DIMENSION USING Accumulate('id', 'txt') ModelOutputTensor('sentence_embedding') OutputFormat('FLOAT32({config["embedding"]["vector_length"]})') ) AS a ) WITH DATA """ cur.execute(create_sql) logger.debug(f"Created embeddings table {database_name}.{dst_table}") # Perform semantic search logger.debug(f"Step 4: Performing semantic search with k={k}") search_sql = build_search_query(vector_db, dst_table, chunk_embed_table, k, config) rows = cur.execute(search_sql) data = rows_to_json(cur.description, rows.fetchall()) logger.debug(f"Retrieved {len(data)} chunks for semantic search") # Return metadata metadata = { "tool_name": "rag_executeWorkflow", "workflow_type": "BYOM", "workflow_steps": ["config_set", "query_stored", "embeddings_generated", "semantic_search_completed"], "query_id": new_id, "cleaned_question": cleaned_txt, "database": database_name, "query_table": table_name, "embedding_table": dst_table, "vector_table": chunk_embed_table, "model_id": model_id, "chunks_retrieved": len(data), "topk_requested": k, "topk_configured_default": config['retrieval']['default_k'], "metadata_fields": config['vector_store_schema']['metadata_fields_in_vector_store'], "description": "Complete RAG workflow executed using BYOM: config - store query - generate embeddings - semantic search" } logger.debug(f"Tool: handle_rag_executeWorkflow (BYOM): metadata: {metadata}") return create_response(data, metadata)
- Helper function implementing the IVSM variant of the RAG workflow, including query storage, tokenization, embedding generation, and semantic search.def _execute_rag_workflow_ivsm(conn: TeradataConnection, question: str, k: int | None, config: dict, *args, **kwargs): """Execute RAG workflow using IVSM functions""" # Use config default if k not provided if k is None: k = config['retrieval']['default_k'] # Optional: Enforce max limit max_k = config['retrieval'].get('max_k', 50) if k > max_k: logger.warning(f"Requested k={k} exceeds max_k={max_k}, using max_k") k = max_k logger.debug(f"handle_rag_executeWorkflow (IVSM): question={question[:60]}..., k={k}") # Extract config values database_name = config['databases']['query_db'] table_name = config['tables']['query_table'] dst_table = config['tables']['query_embedding_store'] model_id = config['model']['model_id'] model_db = config['databases']['model_db'] model_table = config['tables']['model_table'] tokenizer_table = config['tables']['tokenizer_table'] vector_db = config['databases']['vector_db'] chunk_embed_table = config['tables']['vector_table'] with conn.cursor() as cur: # Store user query logger.debug(f"Step 2: Storing user query in {database_name}.{table_name}") # Create table if it doesn't exist ddl = f""" CREATE TABLE {database_name}.{table_name} ( id INTEGER GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1) NOT NULL, txt VARCHAR(5000), created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id) ) """ try: cur.execute(ddl) logger.debug(f"Table {database_name}.{table_name} created") except Exception as e: error_msg = str(e).lower() if "already exists" in error_msg or "3803" in error_msg: logger.debug(f"Table {database_name}.{table_name} already exists, skipping creation") else: logger.error(f"Error creating table: {e}") raise # Insert cleaned question insert_sql = f""" INSERT INTO {database_name}.{table_name} (txt) SELECT CASE WHEN TRIM(?) LIKE '/rag %' THEN SUBSTRING(TRIM(?) FROM 6) ELSE TRIM(?) END """ cur.execute(insert_sql, [question, question, question]) # Get inserted ID and cleaned text cur.execute(f"SELECT MAX(id) AS id FROM {database_name}.{table_name}") new_id = cur.fetchone()[0] cur.execute(f"SELECT txt FROM {database_name}.{table_name} WHERE id = ?", [new_id]) cleaned_txt = cur.fetchone()[0] logger.debug(f"Stored query with ID {new_id}: {cleaned_txt[:60]}...") # Tokenize query logger.debug("Step 3: Tokenizing query using ivsm.tokenizer_encode") cur.execute(f""" REPLACE VIEW v_topics_tokenized AS ( SELECT id, txt, IDS AS input_ids, attention_mask FROM ivsm.tokenizer_encode( ON ( SELECT * FROM {database_name}.{table_name} QUALIFY ROW_NUMBER() OVER (ORDER BY created_ts DESC) = 1 ) ON ( SELECT model AS tokenizer FROM {model_db}.{tokenizer_table} WHERE model_id = '{model_id}' ) DIMENSION USING ColumnsToPreserve('id','txt') OutputFields('IDS','ATTENTION_MASK') MaxLength(1024) PadToMaxLength('True') TokenDataType('INT64') ) AS t ); """) logger.debug("Tokenized view v_topics_tokenized created") # Create embedding view logger.debug("Step 4: Creating embedding view using ivsm.IVSM_score") cur.execute(f""" REPLACE VIEW v_topics_embeddings AS ( SELECT * FROM ivsm.IVSM_score( ON v_topics_tokenized ON ( SELECT * FROM {model_db}.{model_table} WHERE model_id = '{model_id}' ) DIMENSION USING ColumnsToPreserve('id','txt') ModelType('ONNX') BinaryInputFields('input_ids','attention_mask') BinaryOutputFields('sentence_embedding') Caching('inquery') ) AS s ); """) logger.debug("Embedding view v_topics_embeddings created") # Create query embedding table logger.debug("Step 5: Creating query embedding table using ivsm.vector_to_columns") # Drop existing embeddings table drop_sql = f"DROP TABLE {database_name}.{dst_table}" try: cur.execute(drop_sql) logger.debug(f"Dropped existing table {database_name}.{dst_table}") except Exception as e: logger.debug(f"DROP failed or table not found: {e}") # Create embeddings table create_sql = f""" CREATE TABLE {database_name}.{dst_table} AS ( SELECT * FROM ivsm.vector_to_columns( ON v_topics_embeddings USING ColumnsToPreserve('id', 'txt') VectorDataType('FLOAT32') VectorLength({config['embedding']['vector_length']}) OutputColumnPrefix('{config['embedding']['vector_column_prefix']}') InputColumnName('sentence_embedding') ) a ) WITH DATA """ cur.execute(create_sql) logger.debug(f"Created embeddings table {database_name}.{dst_table}") # Perform semantic search logger.debug(f"Step 6: Performing semantic search with k={k}") search_sql = build_search_query(vector_db, dst_table, chunk_embed_table, k, config) rows = cur.execute(search_sql) data = rows_to_json(cur.description, rows.fetchall()) logger.debug(f"Retrieved {len(data)} chunks for semantic search") # Return metadata metadata = { "tool_name": "rag_executeWorkflow", "workflow_type": "IVSM", "workflow_steps": ["config_set", "query_stored", "query_tokenized", "embedding_view_created", "embedding_table_created", "semantic_search_completed"], "query_id": new_id, "cleaned_question": cleaned_txt, "database": database_name, "query_table": table_name, "embedding_table": dst_table, "vector_table": chunk_embed_table, "model_id": model_id, "chunks_retrieved": len(data), "topk_requested": k, "topk_configured_default": config['retrieval']['default_k'], "views_created": ["v_topics_tokenized", "v_topics_embeddings"], "metadata_fields": config['vector_store_schema']['metadata_fields_in_vector_store'], "description": "Complete RAG workflow executed using IVSM functions: config - store query - tokenize - create embedding view - create embedding table - semantic search" } logger.debug(f"Tool: handle_rag_executeWorkflow (IVSM): metadata: {metadata}") return create_response(data, metadata)
- Helper function to build the semantic search SQL query using TD_VECTORDISTANCE, dynamically including metadata fields from config.def build_search_query(vector_db, dst_table, chunk_embed_table, k, config): """Build dynamic search query based on available metadata fields in vector store""" # Get metadata fields from config metadata_fields = config['vector_store_schema']['metadata_fields_in_vector_store'] or [] feature_columns = config['embedding']['feature_columns'] # Build SELECT clause dynamically - txt is always required select_fields = ["e_ref.txt AS reference_txt"] # Add all metadata fields from vector store for field in metadata_fields: if field != 'txt': select_fields.append(f"e_ref.{field} AS {field}") # Add similarity select_fields.append("(1.0 - dt.distance) AS similarity") select_clause = ",\n ".join(select_fields) return f""" SELECT {select_clause} FROM TD_VECTORDISTANCE ( ON {vector_db}.{dst_table} AS TargetTable ON {vector_db}.{chunk_embed_table} AS ReferenceTable DIMENSION USING TargetIDColumn('id') TargetFeatureColumns('{feature_columns}') RefIDColumn('id') RefFeatureColumns('{feature_columns}') DistanceMeasure('cosine') TopK({k}) ) AS dt JOIN {vector_db}.{chunk_embed_table} e_ref ON e_ref.id = dt.reference_id ORDER BY similarity DESC; """
- Helper function to load RAG configuration from rag_config.yml or provide defaults.def load_rag_config(): """Load RAG configuration from rag_config.yml""" try: # Get the directory path current_dir = Path(__file__).parent # Go to config/ config_path = current_dir.parent.parent / 'config' / 'rag_config.yml' with open(config_path, 'r') as file: logger.info(f"Loading RAG config from: {config_path}") return yaml.safe_load(file) except FileNotFoundError: logger.warning(f"RAG config file not found: {config_path}, using defaults") return get_default_rag_config() except Exception as e: logger.error(f"Error loading RAG config: {e}") return get_default_rag_config()