Skip to main content
Glama

Teradata MCP Server

Official
by Teradata
rag_tools.py21.5 kB
import json import logging from datetime import date, datetime from decimal import Decimal from typing import Any from pathlib import Path import yaml from teradatasql import TeradataConnection logger = logging.getLogger("teradata_mcp_server") # Load RAG configuration def load_rag_config(): """Load RAG configuration from rag_config.yml""" try: # Get the directory path current_dir = Path(__file__).parent # Go to config/ config_path = current_dir.parent.parent / 'config' / 'rag_config.yml' with open(config_path, 'r') as file: logger.info(f"Loading RAG config from: {config_path}") return yaml.safe_load(file) except FileNotFoundError: logger.warning(f"RAG config file not found: {config_path}, using defaults") return get_default_rag_config() except Exception as e: logger.error(f"Error loading RAG config: {e}") return get_default_rag_config() def get_default_rag_config(): """Default RAG configuration as fallback""" return { 'version': 'ivsm', 'databases': { 'query_db': 'demo_db', 'model_db': 'demo_db', 'vector_db': 'demo_db' }, 'tables': { 'query_table': 'user_query', 'query_embedding_store': 'user_query_embeddings', 'vector_table': 'icici_fr_embeddings_store', 'model_table': 'embeddings_models', 'tokenizer_table': 'embeddings_tokenizers' }, 'model': { 'model_id': 'bge-small-en-v1.5' }, 'retrieval': { 'default_k': 10, 'max_k': 50 }, 'vector_store_schema': { 'required_fields': ['txt'], 'metadata_fields_in_vector_store': ['chunk_num', 'section_title', 'doc_name'] }, 'embedding': { 'vector_length': 384, 'vector_column_prefix': 'emb_', 'distance_measure': 'cosine', 'feature_columns': '[emb_0:emb_383]' } } # Load config at module level RAG_CONFIG = load_rag_config() def build_search_query(vector_db, dst_table, chunk_embed_table, k, config): """Build dynamic search query based on available metadata fields in vector store""" # Get metadata fields from config metadata_fields = config['vector_store_schema']['metadata_fields_in_vector_store'] or [] feature_columns = config['embedding']['feature_columns'] # Build SELECT clause dynamically - txt is always required select_fields = ["e_ref.txt AS reference_txt"] # Add all metadata fields from vector store for field in metadata_fields: if field != 'txt': select_fields.append(f"e_ref.{field} AS {field}") # Add similarity select_fields.append("(1.0 - dt.distance) AS similarity") select_clause = ",\n ".join(select_fields) return f""" SELECT {select_clause} FROM TD_VECTORDISTANCE ( ON {vector_db}.{dst_table} AS TargetTable ON {vector_db}.{chunk_embed_table} AS ReferenceTable DIMENSION USING TargetIDColumn('id') TargetFeatureColumns('{feature_columns}') RefIDColumn('id') RefFeatureColumns('{feature_columns}') DistanceMeasure('cosine') TopK({k}) ) AS dt JOIN {vector_db}.{chunk_embed_table} e_ref ON e_ref.id = dt.reference_id ORDER BY similarity DESC; """ def serialize_teradata_types(obj: Any) -> Any: """Convert Teradata-specific types to JSON serializable formats""" if isinstance(obj, date | datetime): return obj.isoformat() if isinstance(obj, Decimal): return float(obj) return str(obj) def rows_to_json(cursor_description: Any, rows: list[Any]) -> list[dict[str, Any]]: """Convert database rows to JSON objects using column names as keys""" if not cursor_description or not rows: return [] columns = [col[0] for col in cursor_description] return [ { col: serialize_teradata_types(value) for col, value in zip(columns, row) } for row in rows ] def create_response(data: Any, metadata: dict[str, Any] | None = None) -> str: """Create a standardized JSON response structure""" if metadata: response = { "status": "success", "metadata": metadata, "results": data } else: response = { "status": "success", "results": data } return json.dumps(response, default=serialize_teradata_types) def handle_rag_Execute_Workflow( conn: TeradataConnection, question: str, k: int | None = None, *args, **kwargs, ): """ Execute complete RAG workflow to answer user questions based on document context. This tool handles the entire RAG pipeline in a single step when a user query is tagged with /rag. WORKFLOW STEPS (executed automatically): 1. Configuration setup using configurable values from rag_config.yml 2. Store user query with '/rag ' prefix stripping 3. Generate query embeddings using either BYOM (ONNXEmbeddings) or IVSM functions based on config 4. Perform semantic search against precomputed chunk embeddings 5. Return context chunks for answer generation CONFIGURATION VALUES (from rag_config.yml): - version: 'ivsm' or 'byom' to select embedding approach - All database names, table names, and model settings are configurable - Vector store metadata fields are dynamically detected - Embedding parameters are configurable - Default chunk retrieval count is configurable - Default values are provided as fallback TECHNICAL DETAILS: - Strips the '/rag ' prefix if present from user questions - Creates query table if it does not exist (columns: id, txt, created_ts) - BYOM approach: Uses mldb.ONNXEmbeddings UDF for tokenization and embedding - IVSM approach: Uses ivsm.tokenizer_encode and ivsm.IVSM_score functions - Both approaches store embeddings in configured output table - Uses cosine similarity via TD_VECTORDISTANCE for semantic search - Returns the top-k matching chunks from the configured vector store - Each result includes chunk text, similarity score, and metadata fields CRITICAL ANSWERING RULES: - Answer ONLY using retrieved chunks - no external knowledge, speculation, or inference - Quote source content directly without paraphrasing, summarizing, or rewriting - If no relevant context found: "Not enough information found in the provided context" - If partial context: "The available context does not fully answer the question" - Include document/page references when available (e.g., "On page 2 of 'demo_policy.pdf'...") - Execute entire workflow silently without showing function calls to user - only show final answer LANGUAGE RESTRICTIONS: - Do not say "According to the context" or "The context says" - just quote directly - Do not say "It can be inferred that..." - no inference allowed - Use exact or near-verbatim quotes only - No paraphrasing, summarizing, or adding transitions between quotes - Copy content precisely - no expansion or interpretation BEHAVIORAL REQUIREMENTS: - Execute all RAG workflow steps silently without showing tool calls to users - Only display final answers based strictly on retrieved context chunks - Do not use external knowledge or speculation - only quote retrieved content directly - RAG mode only triggers with explicit /rag prefix - do not enter RAG mode implicitly - Hide all tool execution steps from user - Provide clean, direct response with only the answer based on retrieved chunks - This is a complete end-to-end RAG workflow - no additional tools needed EXECUTION: Run completely silently - user only sees their question and the final context-grounded answer. """ # Use configuration from loaded config config = RAG_CONFIG # Check version and delegate to appropriate implementation version = config.get('version', 'ivsm').lower() if version == 'ivsm': return _execute_rag_workflow_ivsm(conn, question, k, config, *args, **kwargs) elif version == 'byom': return _execute_rag_workflow_byom(conn, question, k, config, *args, **kwargs) else: raise ValueError(f"Unsupported RAG version: {version}. Supported versions: 'byom', 'ivsm'") def _execute_rag_workflow_byom(conn: TeradataConnection, question: str, k: int | None, config: dict, *args, **kwargs): """Execute RAG workflow using BYOM (ONNXEmbeddings)""" # Use config default if k not provided if k is None: k = config['retrieval']['default_k'] # Optional: Enforce max limit max_k = config['retrieval'].get('max_k', 50) if k > max_k: logger.warning(f"Requested k={k} exceeds max_k={max_k}, using max_k") k = max_k logger.debug(f"handle_rag_executeWorkflow (BYOM): question={question[:60]}..., k={k}") # Extract config values database_name = config['databases']['query_db'] table_name = config['tables']['query_table'] dst_table = config['tables']['query_embedding_store'] model_id = config['model']['model_id'] model_db = config['databases']['model_db'] model_table = config['tables']['model_table'] tokenizer_table = config['tables']['tokenizer_table'] vector_db = config['databases']['vector_db'] chunk_embed_table = config['tables']['vector_table'] with conn.cursor() as cur: # Store user query logger.debug(f"Step 2: Storing user query in {database_name}.{table_name}") # Create table if it doesn't exist ddl = f""" CREATE TABLE {database_name}.{table_name} ( id INTEGER GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1) NOT NULL, txt VARCHAR(5000), created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id) ) """ try: cur.execute(ddl) logger.debug(f"Table {database_name}.{table_name} created") except Exception as e: error_msg = str(e).lower() if "already exists" in error_msg or "3803" in error_msg: logger.debug(f"Table {database_name}.{table_name} already exists, skipping creation") else: logger.error(f"Error creating table: {e}") raise # Insert cleaned question insert_sql = f""" INSERT INTO {database_name}.{table_name} (txt) SELECT CASE WHEN TRIM(?) LIKE '/rag %' THEN SUBSTRING(TRIM(?) FROM 6) ELSE TRIM(?) END """ cur.execute(insert_sql, [question, question, question]) # Get inserted ID and cleaned text cur.execute(f"SELECT MAX(id) AS id FROM {database_name}.{table_name}") new_id = cur.fetchone()[0] cur.execute(f"SELECT txt FROM {database_name}.{table_name} WHERE id = ?", [new_id]) cleaned_txt = cur.fetchone()[0] logger.debug(f"Stored query with ID {new_id}: {cleaned_txt[:60]}...") # Generate query embeddings logger.debug(f"Step 3: Generating embeddings in {database_name}.{dst_table}") # Drop existing embeddings table drop_sql = f"DROP TABLE {database_name}.{dst_table}" try: cur.execute(drop_sql) logger.debug(f"Dropped existing table {database_name}.{dst_table}") except Exception as e: logger.debug(f"DROP failed or table not found: {e}") # Create embeddings table create_sql = f""" CREATE TABLE {database_name}.{dst_table} AS ( SELECT * FROM mldb.ONNXEmbeddings( ON (SELECT id, txt FROM {database_name}.{table_name}) ON (SELECT model_id, model FROM {model_db}.{model_table} WHERE model_id = '{model_id}') DIMENSION ON (SELECT model AS tokenizer FROM {model_db}.{tokenizer_table} WHERE model_id = '{model_id}') DIMENSION USING Accumulate('id', 'txt') ModelOutputTensor('sentence_embedding') OutputFormat('FLOAT32({config["embedding"]["vector_length"]})') ) AS a ) WITH DATA """ cur.execute(create_sql) logger.debug(f"Created embeddings table {database_name}.{dst_table}") # Perform semantic search logger.debug(f"Step 4: Performing semantic search with k={k}") search_sql = build_search_query(vector_db, dst_table, chunk_embed_table, k, config) rows = cur.execute(search_sql) data = rows_to_json(cur.description, rows.fetchall()) logger.debug(f"Retrieved {len(data)} chunks for semantic search") # Return metadata metadata = { "tool_name": "rag_executeWorkflow", "workflow_type": "BYOM", "workflow_steps": ["config_set", "query_stored", "embeddings_generated", "semantic_search_completed"], "query_id": new_id, "cleaned_question": cleaned_txt, "database": database_name, "query_table": table_name, "embedding_table": dst_table, "vector_table": chunk_embed_table, "model_id": model_id, "chunks_retrieved": len(data), "topk_requested": k, "topk_configured_default": config['retrieval']['default_k'], "metadata_fields": config['vector_store_schema']['metadata_fields_in_vector_store'], "description": "Complete RAG workflow executed using BYOM: config - store query - generate embeddings - semantic search" } logger.debug(f"Tool: handle_rag_executeWorkflow (BYOM): metadata: {metadata}") return create_response(data, metadata) def _execute_rag_workflow_ivsm(conn: TeradataConnection, question: str, k: int | None, config: dict, *args, **kwargs): """Execute RAG workflow using IVSM functions""" # Use config default if k not provided if k is None: k = config['retrieval']['default_k'] # Optional: Enforce max limit max_k = config['retrieval'].get('max_k', 50) if k > max_k: logger.warning(f"Requested k={k} exceeds max_k={max_k}, using max_k") k = max_k logger.debug(f"handle_rag_executeWorkflow (IVSM): question={question[:60]}..., k={k}") # Extract config values database_name = config['databases']['query_db'] table_name = config['tables']['query_table'] dst_table = config['tables']['query_embedding_store'] model_id = config['model']['model_id'] model_db = config['databases']['model_db'] model_table = config['tables']['model_table'] tokenizer_table = config['tables']['tokenizer_table'] vector_db = config['databases']['vector_db'] chunk_embed_table = config['tables']['vector_table'] with conn.cursor() as cur: # Store user query logger.debug(f"Step 2: Storing user query in {database_name}.{table_name}") # Create table if it doesn't exist ddl = f""" CREATE TABLE {database_name}.{table_name} ( id INTEGER GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1) NOT NULL, txt VARCHAR(5000), created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id) ) """ try: cur.execute(ddl) logger.debug(f"Table {database_name}.{table_name} created") except Exception as e: error_msg = str(e).lower() if "already exists" in error_msg or "3803" in error_msg: logger.debug(f"Table {database_name}.{table_name} already exists, skipping creation") else: logger.error(f"Error creating table: {e}") raise # Insert cleaned question insert_sql = f""" INSERT INTO {database_name}.{table_name} (txt) SELECT CASE WHEN TRIM(?) LIKE '/rag %' THEN SUBSTRING(TRIM(?) FROM 6) ELSE TRIM(?) END """ cur.execute(insert_sql, [question, question, question]) # Get inserted ID and cleaned text cur.execute(f"SELECT MAX(id) AS id FROM {database_name}.{table_name}") new_id = cur.fetchone()[0] cur.execute(f"SELECT txt FROM {database_name}.{table_name} WHERE id = ?", [new_id]) cleaned_txt = cur.fetchone()[0] logger.debug(f"Stored query with ID {new_id}: {cleaned_txt[:60]}...") # Tokenize query logger.debug("Step 3: Tokenizing query using ivsm.tokenizer_encode") cur.execute(f""" REPLACE VIEW v_topics_tokenized AS ( SELECT id, txt, IDS AS input_ids, attention_mask FROM ivsm.tokenizer_encode( ON ( SELECT * FROM {database_name}.{table_name} QUALIFY ROW_NUMBER() OVER (ORDER BY created_ts DESC) = 1 ) ON ( SELECT model AS tokenizer FROM {model_db}.{tokenizer_table} WHERE model_id = '{model_id}' ) DIMENSION USING ColumnsToPreserve('id','txt') OutputFields('IDS','ATTENTION_MASK') MaxLength(1024) PadToMaxLength('True') TokenDataType('INT64') ) AS t ); """) logger.debug("Tokenized view v_topics_tokenized created") # Create embedding view logger.debug("Step 4: Creating embedding view using ivsm.IVSM_score") cur.execute(f""" REPLACE VIEW v_topics_embeddings AS ( SELECT * FROM ivsm.IVSM_score( ON v_topics_tokenized ON ( SELECT * FROM {model_db}.{model_table} WHERE model_id = '{model_id}' ) DIMENSION USING ColumnsToPreserve('id','txt') ModelType('ONNX') BinaryInputFields('input_ids','attention_mask') BinaryOutputFields('sentence_embedding') Caching('inquery') ) AS s ); """) logger.debug("Embedding view v_topics_embeddings created") # Create query embedding table logger.debug("Step 5: Creating query embedding table using ivsm.vector_to_columns") # Drop existing embeddings table drop_sql = f"DROP TABLE {database_name}.{dst_table}" try: cur.execute(drop_sql) logger.debug(f"Dropped existing table {database_name}.{dst_table}") except Exception as e: logger.debug(f"DROP failed or table not found: {e}") # Create embeddings table create_sql = f""" CREATE TABLE {database_name}.{dst_table} AS ( SELECT * FROM ivsm.vector_to_columns( ON v_topics_embeddings USING ColumnsToPreserve('id', 'txt') VectorDataType('FLOAT32') VectorLength({config['embedding']['vector_length']}) OutputColumnPrefix('{config['embedding']['vector_column_prefix']}') InputColumnName('sentence_embedding') ) a ) WITH DATA """ cur.execute(create_sql) logger.debug(f"Created embeddings table {database_name}.{dst_table}") # Perform semantic search logger.debug(f"Step 6: Performing semantic search with k={k}") search_sql = build_search_query(vector_db, dst_table, chunk_embed_table, k, config) rows = cur.execute(search_sql) data = rows_to_json(cur.description, rows.fetchall()) logger.debug(f"Retrieved {len(data)} chunks for semantic search") # Return metadata metadata = { "tool_name": "rag_executeWorkflow", "workflow_type": "IVSM", "workflow_steps": ["config_set", "query_stored", "query_tokenized", "embedding_view_created", "embedding_table_created", "semantic_search_completed"], "query_id": new_id, "cleaned_question": cleaned_txt, "database": database_name, "query_table": table_name, "embedding_table": dst_table, "vector_table": chunk_embed_table, "model_id": model_id, "chunks_retrieved": len(data), "topk_requested": k, "topk_configured_default": config['retrieval']['default_k'], "views_created": ["v_topics_tokenized", "v_topics_embeddings"], "metadata_fields": config['vector_store_schema']['metadata_fields_in_vector_store'], "description": "Complete RAG workflow executed using IVSM functions: config - store query - tokenize - create embedding view - create embedding table - semantic search" } logger.debug(f"Tool: handle_rag_executeWorkflow (IVSM): metadata: {metadata}") return create_response(data, metadata)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Teradata/teradata-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server