Skip to main content
Glama
Teradata

Teradata MCP Server

Official
by Teradata

rag_Execute_Workflow

Execute a complete Retrieval-Augmented Generation workflow to answer questions using document context. This tool automatically processes queries, generates embeddings, performs semantic search, and returns answers based strictly on retrieved content.

Instructions

Execute complete RAG workflow to answer user questions based on document context. This tool handles the entire RAG pipeline in a single step when a user query is tagged with /rag.

WORKFLOW STEPS (executed automatically):

  1. Configuration setup using configurable values from rag_config.yml

  2. Store user query with '/rag ' prefix stripping

  3. Generate query embeddings using either BYOM (ONNXEmbeddings) or IVSM functions based on config

  4. Perform semantic search against precomputed chunk embeddings

  5. Return context chunks for answer generation

CONFIGURATION VALUES (from rag_config.yml):

  • version: 'ivsm' or 'byom' to select embedding approach

  • All database names, table names, and model settings are configurable

  • Vector store metadata fields are dynamically detected

  • Embedding parameters are configurable

  • Default chunk retrieval count is configurable

  • Default values are provided as fallback

TECHNICAL DETAILS:

  • Strips the '/rag ' prefix if present from user questions

  • Creates query table if it does not exist (columns: id, txt, created_ts)

  • BYOM approach: Uses mldb.ONNXEmbeddings UDF for tokenization and embedding

  • IVSM approach: Uses ivsm.tokenizer_encode and ivsm.IVSM_score functions

  • Both approaches store embeddings in configured output table

  • Uses cosine similarity via TD_VECTORDISTANCE for semantic search

  • Returns the top-k matching chunks from the configured vector store

  • Each result includes chunk text, similarity score, and metadata fields

CRITICAL ANSWERING RULES:

  • Answer ONLY using retrieved chunks - no external knowledge, speculation, or inference

  • Quote source content directly without paraphrasing, summarizing, or rewriting

  • If no relevant context found: "Not enough information found in the provided context"

  • If partial context: "The available context does not fully answer the question"

  • Include document/page references when available (e.g., "On page 2 of 'demo_policy.pdf'...")

  • Execute entire workflow silently without showing function calls to user - only show final answer

LANGUAGE RESTRICTIONS:

  • Do not say "According to the context" or "The context says" - just quote directly

  • Do not say "It can be inferred that..." - no inference allowed

  • Use exact or near-verbatim quotes only

  • No paraphrasing, summarizing, or adding transitions between quotes

  • Copy content precisely - no expansion or interpretation

BEHAVIORAL REQUIREMENTS:

  • Execute all RAG workflow steps silently without showing tool calls to users

  • Only display final answers based strictly on retrieved context chunks

  • Do not use external knowledge or speculation - only quote retrieved content directly

  • RAG mode only triggers with explicit /rag prefix - do not enter RAG mode implicitly

  • Hide all tool execution steps from user

  • Provide clean, direct response with only the answer based on retrieved chunks

  • This is a complete end-to-end RAG workflow - no additional tools needed

EXECUTION: Run completely silently - user only sees their question and the final context-grounded answer.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
questionYes
kNo

Implementation Reference

  • Main handler for the 'rag_Execute_Workflow' tool. Orchestrates the full RAG pipeline by loading config, selecting embedding method (IVSM or BYOM), and executing semantic search to retrieve relevant document chunks.
    def handle_rag_Execute_Workflow(
        conn: TeradataConnection,
        question: str,
        k: int | None = None,
        *args,
        **kwargs,
    ):
        """
        Execute complete RAG workflow to answer user questions based on document context.
        This tool handles the entire RAG pipeline in a single step when a user query is tagged with /rag.
    
        WORKFLOW STEPS (executed automatically):
        1. Configuration setup using configurable values from rag_config.yml
        2. Store user query with '/rag ' prefix stripping  
        3. Generate query embeddings using either BYOM (ONNXEmbeddings) or IVSM functions based on config
        4. Perform semantic search against precomputed chunk embeddings
        5. Return context chunks for answer generation
    
        CONFIGURATION VALUES (from rag_config.yml):
        - version: 'ivsm' or 'byom' to select embedding approach
        - All database names, table names, and model settings are configurable
        - Vector store metadata fields are dynamically detected
        - Embedding parameters are configurable
        - Default chunk retrieval count is configurable
        - Default values are provided as fallback
    
        TECHNICAL DETAILS:
        - Strips the '/rag ' prefix if present from user questions
        - Creates query table if it does not exist (columns: id, txt, created_ts)
        - BYOM approach: Uses mldb.ONNXEmbeddings UDF for tokenization and embedding
        - IVSM approach: Uses ivsm.tokenizer_encode and ivsm.IVSM_score functions
        - Both approaches store embeddings in configured output table
        - Uses cosine similarity via TD_VECTORDISTANCE for semantic search
        - Returns the top-k matching chunks from the configured vector store
        - Each result includes chunk text, similarity score, and metadata fields
    
        CRITICAL ANSWERING RULES:
        - Answer ONLY using retrieved chunks - no external knowledge, speculation, or inference
        - Quote source content directly without paraphrasing, summarizing, or rewriting
        - If no relevant context found: "Not enough information found in the provided context"
        - If partial context: "The available context does not fully answer the question"
        - Include document/page references when available (e.g., "On page 2 of 'demo_policy.pdf'...")
        - Execute entire workflow silently without showing function calls to user - only show final answer
    
        LANGUAGE RESTRICTIONS:
        - Do not say "According to the context" or "The context says" - just quote directly
        - Do not say "It can be inferred that..." - no inference allowed
        - Use exact or near-verbatim quotes only
        - No paraphrasing, summarizing, or adding transitions between quotes
        - Copy content precisely - no expansion or interpretation
    
        BEHAVIORAL REQUIREMENTS:
        - Execute all RAG workflow steps silently without showing tool calls to users
        - Only display final answers based strictly on retrieved context chunks
        - Do not use external knowledge or speculation - only quote retrieved content directly
        - RAG mode only triggers with explicit /rag prefix - do not enter RAG mode implicitly
        - Hide all tool execution steps from user
        - Provide clean, direct response with only the answer based on retrieved chunks
        - This is a complete end-to-end RAG workflow - no additional tools needed
    
        EXECUTION: Run completely silently - user only sees their question and the final context-grounded answer.
        """
    
        # Use configuration from loaded config
        config = RAG_CONFIG
    
        # Check version and delegate to appropriate implementation
        version = config.get('version', 'ivsm').lower()
        
        if version == 'ivsm':
            return _execute_rag_workflow_ivsm(conn, question, k, config, *args, **kwargs)
        elif version == 'byom':
            return _execute_rag_workflow_byom(conn, question, k, config, *args, **kwargs)
        else:
            raise ValueError(f"Unsupported RAG version: {version}. Supported versions: 'byom', 'ivsm'")
  • Helper function implementing the IVSM-based (default) RAG workflow: query storage, tokenization, embedding generation using IVSM UDFs, vector table creation, and cosine similarity search returning top-k chunks with metadata.
    def _execute_rag_workflow_ivsm(conn: TeradataConnection, question: str, k: int | None, config: dict, *args, **kwargs):
        """Execute RAG workflow using IVSM functions"""
        
        # Use config default if k not provided
        if k is None:
            k = config['retrieval']['default_k']
    
        # Optional: Enforce max limit
        max_k = config['retrieval'].get('max_k', 50)
        if k > max_k:
            logger.warning(f"Requested k={k} exceeds max_k={max_k}, using max_k")
            k = max_k
    
        logger.debug(f"handle_rag_executeWorkflow (IVSM): question={question[:60]}..., k={k}")
    
        # Extract config values
        database_name = config['databases']['query_db']
        table_name = config['tables']['query_table']
        dst_table = config['tables']['query_embedding_store']
        model_id = config['model']['model_id']
        model_db = config['databases']['model_db']
        model_table = config['tables']['model_table']
        tokenizer_table = config['tables']['tokenizer_table']
        vector_db = config['databases']['vector_db']
        chunk_embed_table = config['tables']['vector_table']
    
        with conn.cursor() as cur:
            # Store user query
            logger.debug(f"Step 2: Storing user query in {database_name}.{table_name}")
    
            # Create table if it doesn't exist
            ddl = f"""
            CREATE TABLE {database_name}.{table_name} (
                id INTEGER GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1) NOT NULL,
                txt VARCHAR(5000),
                created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                PRIMARY KEY (id)
            )
            """
    
            try:
                cur.execute(ddl)
                logger.debug(f"Table {database_name}.{table_name} created")
            except Exception as e:
                error_msg = str(e).lower()
                if "already exists" in error_msg or "3803" in error_msg:
                    logger.debug(f"Table {database_name}.{table_name} already exists, skipping creation")
                else:
                    logger.error(f"Error creating table: {e}")
                    raise
    
            # Insert cleaned question
            insert_sql = f"""
            INSERT INTO {database_name}.{table_name} (txt)
            SELECT
              CASE
                WHEN TRIM(?) LIKE '/rag %' THEN SUBSTRING(TRIM(?) FROM 6)
                ELSE TRIM(?)
              END
            """
            cur.execute(insert_sql, [question, question, question])
    
            # Get inserted ID and cleaned text
            cur.execute(f"SELECT MAX(id) AS id FROM {database_name}.{table_name}")
            new_id = cur.fetchone()[0]
    
            cur.execute(f"SELECT txt FROM {database_name}.{table_name} WHERE id = ?", [new_id])
            cleaned_txt = cur.fetchone()[0]
    
            logger.debug(f"Stored query with ID {new_id}: {cleaned_txt[:60]}...")
    
            # Tokenize query
            logger.debug("Step 3: Tokenizing query using ivsm.tokenizer_encode")
    
            cur.execute(f"""
                REPLACE VIEW v_topics_tokenized AS
                (
                    SELECT id, txt,
                           IDS AS input_ids,
                           attention_mask
                    FROM ivsm.tokenizer_encode(
                        ON (
                            SELECT *
                            FROM {database_name}.{table_name}
                            QUALIFY ROW_NUMBER() OVER (ORDER BY created_ts DESC) = 1
                        )
                        ON (
                            SELECT model AS tokenizer
                            FROM {model_db}.{tokenizer_table}
                            WHERE model_id = '{model_id}'
                        ) DIMENSION
                        USING
                            ColumnsToPreserve('id','txt')
                            OutputFields('IDS','ATTENTION_MASK')
                            MaxLength(1024)
                            PadToMaxLength('True')
                            TokenDataType('INT64')
                    ) AS t
                );
            """)
    
            logger.debug("Tokenized view v_topics_tokenized created")
    
            # Create embedding view
            logger.debug("Step 4: Creating embedding view using ivsm.IVSM_score")
    
            cur.execute(f"""
                REPLACE VIEW v_topics_embeddings AS
                (
                    SELECT *
                    FROM ivsm.IVSM_score(
                        ON v_topics_tokenized
                        ON (
                            SELECT *
                            FROM {model_db}.{model_table}
                            WHERE model_id = '{model_id}'
                        ) DIMENSION
                        USING
                            ColumnsToPreserve('id','txt')
                            ModelType('ONNX')
                            BinaryInputFields('input_ids','attention_mask')
                            BinaryOutputFields('sentence_embedding')
                            Caching('inquery')
                    ) AS s
                );
            """)
    
            logger.debug("Embedding view v_topics_embeddings created")
    
            # Create query embedding table
            logger.debug("Step 5: Creating query embedding table using ivsm.vector_to_columns")
    
            # Drop existing embeddings table
            drop_sql = f"DROP TABLE {database_name}.{dst_table}"
            try:
                cur.execute(drop_sql)
                logger.debug(f"Dropped existing table {database_name}.{dst_table}")
            except Exception as e:
                logger.debug(f"DROP failed or table not found: {e}")
    
            # Create embeddings table
            create_sql = f"""
            CREATE TABLE {database_name}.{dst_table} AS (
                SELECT *
                FROM ivsm.vector_to_columns(
                    ON v_topics_embeddings
                    USING
                        ColumnsToPreserve('id', 'txt')
                        VectorDataType('FLOAT32')
                        VectorLength({config['embedding']['vector_length']})
                        OutputColumnPrefix('{config['embedding']['vector_column_prefix']}')
                        InputColumnName('sentence_embedding')
                ) a
            ) WITH DATA
            """
    
            cur.execute(create_sql)
            logger.debug(f"Created embeddings table {database_name}.{dst_table}")
    
            # Perform semantic search
            logger.debug(f"Step 6: Performing semantic search with k={k}")
    
            search_sql = build_search_query(vector_db, dst_table, chunk_embed_table, k, config)
    
            rows = cur.execute(search_sql)
            data = rows_to_json(cur.description, rows.fetchall())
    
            logger.debug(f"Retrieved {len(data)} chunks for semantic search")
    
        # Return metadata
        metadata = {
            "tool_name": "rag_executeWorkflow",
            "workflow_type": "IVSM",
            "workflow_steps": ["config_set", "query_stored", "query_tokenized", "embedding_view_created", "embedding_table_created", "semantic_search_completed"],
            "query_id": new_id,
            "cleaned_question": cleaned_txt,
            "database": database_name,
            "query_table": table_name,
            "embedding_table": dst_table,
            "vector_table": chunk_embed_table,
            "model_id": model_id,
            "chunks_retrieved": len(data),
            "topk_requested": k,
            "topk_configured_default": config['retrieval']['default_k'],
            "views_created": ["v_topics_tokenized", "v_topics_embeddings"],
            "metadata_fields": config['vector_store_schema']['metadata_fields_in_vector_store'],
            "description": "Complete RAG workflow executed using IVSM functions: config - store query - tokenize - create embedding view - create embedding table - semantic search"
        }
        logger.debug(f"Tool: handle_rag_executeWorkflow (IVSM): metadata: {metadata}")
        return create_response(data, metadata)
  • Helper function for BYOM/ONNXEmbeddings-based RAG workflow: query storage, embedding generation via ONNXEmbeddings UDF, and semantic search.
    def _execute_rag_workflow_byom(conn: TeradataConnection, question: str, k: int | None, config: dict, *args, **kwargs):
        """Execute RAG workflow using BYOM (ONNXEmbeddings)"""
        
        # Use config default if k not provided
        if k is None:
            k = config['retrieval']['default_k']
    
        # Optional: Enforce max limit
        max_k = config['retrieval'].get('max_k', 50)
        if k > max_k:
            logger.warning(f"Requested k={k} exceeds max_k={max_k}, using max_k")
            k = max_k
    
        logger.debug(f"handle_rag_executeWorkflow (BYOM): question={question[:60]}..., k={k}")
    
        # Extract config values
        database_name = config['databases']['query_db']
        table_name = config['tables']['query_table']
        dst_table = config['tables']['query_embedding_store']
        model_id = config['model']['model_id']
        model_db = config['databases']['model_db']
        model_table = config['tables']['model_table']
        tokenizer_table = config['tables']['tokenizer_table']
        vector_db = config['databases']['vector_db']
        chunk_embed_table = config['tables']['vector_table']
    
        with conn.cursor() as cur:
            # Store user query
            logger.debug(f"Step 2: Storing user query in {database_name}.{table_name}")
    
            # Create table if it doesn't exist
            ddl = f"""
            CREATE TABLE {database_name}.{table_name} (
                id INTEGER GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1) NOT NULL,
                txt VARCHAR(5000),
                created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                PRIMARY KEY (id)
            )
            """
    
            try:
                cur.execute(ddl)
                logger.debug(f"Table {database_name}.{table_name} created")
            except Exception as e:
                error_msg = str(e).lower()
                if "already exists" in error_msg or "3803" in error_msg:
                    logger.debug(f"Table {database_name}.{table_name} already exists, skipping creation")
                else:
                    logger.error(f"Error creating table: {e}")
                    raise
    
            # Insert cleaned question
            insert_sql = f"""
            INSERT INTO {database_name}.{table_name} (txt)
            SELECT
              CASE
                WHEN TRIM(?) LIKE '/rag %' THEN SUBSTRING(TRIM(?) FROM 6)
                ELSE TRIM(?)
              END
            """
            cur.execute(insert_sql, [question, question, question])
    
            # Get inserted ID and cleaned text
            cur.execute(f"SELECT MAX(id) AS id FROM {database_name}.{table_name}")
            new_id = cur.fetchone()[0]
    
            cur.execute(f"SELECT txt FROM {database_name}.{table_name} WHERE id = ?", [new_id])
            cleaned_txt = cur.fetchone()[0]
    
            logger.debug(f"Stored query with ID {new_id}: {cleaned_txt[:60]}...")
    
            # Generate query embeddings
            logger.debug(f"Step 3: Generating embeddings in {database_name}.{dst_table}")
    
            # Drop existing embeddings table
            drop_sql = f"DROP TABLE {database_name}.{dst_table}"
            try:
                cur.execute(drop_sql)
                logger.debug(f"Dropped existing table {database_name}.{dst_table}")
            except Exception as e:
                logger.debug(f"DROP failed or table not found: {e}")
    
            # Create embeddings table
            create_sql = f"""
            CREATE TABLE {database_name}.{dst_table} AS (
                SELECT *
                FROM mldb.ONNXEmbeddings(
                    ON (SELECT id, txt FROM {database_name}.{table_name})
                    ON (SELECT model_id, model FROM {model_db}.{model_table} WHERE model_id = '{model_id}') DIMENSION
                    ON (SELECT model AS tokenizer FROM {model_db}.{tokenizer_table} WHERE model_id = '{model_id}') DIMENSION
                    USING
                        Accumulate('id', 'txt')
                        ModelOutputTensor('sentence_embedding')
                        OutputFormat('FLOAT32({config["embedding"]["vector_length"]})')
                ) AS a
            ) WITH DATA
            """
    
            cur.execute(create_sql)
            logger.debug(f"Created embeddings table {database_name}.{dst_table}")
    
            # Perform semantic search
            logger.debug(f"Step 4: Performing semantic search with k={k}")
    
            search_sql = build_search_query(vector_db, dst_table, chunk_embed_table, k, config)
    
            rows = cur.execute(search_sql)
            data = rows_to_json(cur.description, rows.fetchall())
    
            logger.debug(f"Retrieved {len(data)} chunks for semantic search")
    
        # Return metadata
        metadata = {
            "tool_name": "rag_executeWorkflow",
            "workflow_type": "BYOM",
            "workflow_steps": ["config_set", "query_stored", "embeddings_generated", "semantic_search_completed"],
            "query_id": new_id,
            "cleaned_question": cleaned_txt,
            "database": database_name,
            "query_table": table_name,
            "embedding_table": dst_table,
            "vector_table": chunk_embed_table,
            "model_id": model_id,
            "chunks_retrieved": len(data),
            "topk_requested": k,
            "topk_configured_default": config['retrieval']['default_k'],
            "metadata_fields": config['vector_store_schema']['metadata_fields_in_vector_store'],
            "description": "Complete RAG workflow executed using BYOM: config - store query - generate embeddings - semantic search"
        }
        logger.debug(f"Tool: handle_rag_executeWorkflow (BYOM): metadata: {metadata}")
        return create_response(data, metadata)
  • Utility to construct the SQL query for semantic similarity search using TD_VECTORDISTANCE, dynamically including vector store metadata fields.
    def build_search_query(vector_db, dst_table, chunk_embed_table, k, config):
        """Build dynamic search query based on available metadata fields in vector store"""
        # Get metadata fields from config
        metadata_fields = config['vector_store_schema']['metadata_fields_in_vector_store'] or []
        feature_columns = config['embedding']['feature_columns']
    
        # Build SELECT clause dynamically - txt is always required
        select_fields = ["e_ref.txt AS reference_txt"]
    
        # Add all metadata fields from vector store
        for field in metadata_fields:
            if field != 'txt':
                select_fields.append(f"e_ref.{field} AS {field}")
    
        # Add similarity
        select_fields.append("(1.0 - dt.distance) AS similarity")
    
        select_clause = ",\n            ".join(select_fields)
    
        return f"""
            SELECT
                {select_clause}
            FROM TD_VECTORDISTANCE (
                    ON {vector_db}.{dst_table}      AS TargetTable
                    ON {vector_db}.{chunk_embed_table}      AS ReferenceTable DIMENSION
                    USING
                        TargetIDColumn('id')
                        TargetFeatureColumns('{feature_columns}')
                        RefIDColumn('id')
                        RefFeatureColumns('{feature_columns}')
                        DistanceMeasure('cosine')
                        TopK({k})
                ) AS dt
            JOIN {vector_db}.{chunk_embed_table} e_ref
              ON e_ref.id = dt.reference_id
            ORDER BY similarity DESC;
            """
  • MODULE_MAP in ModuleLoader registers the 'rag' module for dynamic loading, making rag_Execute_Workflow handler available via lazy loading based on profile config.
    MODULE_MAP = {
        'bar': 'teradata_mcp_server.tools.bar',
        'base': 'teradata_mcp_server.tools.base',
        'chat': 'teradata_mcp_server.tools.chat',
        'dba': 'teradata_mcp_server.tools.dba',
        'fs': 'teradata_mcp_server.tools.fs',
        'qlty': 'teradata_mcp_server.tools.qlty',
        'rag': 'teradata_mcp_server.tools.rag',
        'sql_opt': 'teradata_mcp_server.tools.sql_opt',
        'sec': 'teradata_mcp_server.tools.sec',
        'tmpl': 'teradata_mcp_server.tools.tmpl',
        'plot': 'teradata_mcp_server.tools.plot',
        'tdvs': 'teradata_mcp_server.tools.tdvs'
    }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Teradata/teradata-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server