Skip to main content
Glama

sql_Execute_Full_Pipeline

Execute a complete SQL query clustering workflow to identify and analyze high CPU usage queries for optimization opportunities in Teradata databases.

Instructions

COMPLETE SQL QUERY CLUSTERING PIPELINE FOR HIGH-USAGE QUERY OPTIMIZATION

This tool executes the entire SQL query clustering workflow to identify and analyze high CPU usage queries for optimization opportunities. It's designed for database performance analysts and DBAs who need to systematically identify query optimization candidates.

FULL PIPELINE WORKFLOW:

  1. Query Log Extraction: Extracts SQL queries from DBC.DBQLSqlTbl with comprehensive performance metrics

  2. Performance Metrics Calculation: Computes CPU skew, I/O skew, PJI (Physical to Logical I/O ratio), UII (Unit I/O Intensity)

  3. Query Tokenization: Tokenizes SQL text using {sql_clustering_config.get('model', {}).get('model_id', 'bge-small-en-v1.5')} tokenizer via ivsm.tokenizer_encode

  4. Embedding Generation: Creates semantic embeddings using ivsm.IVSM_score with ONNX models

  5. Vector Store Creation: Converts embeddings to vector columns via ivsm.vector_to_columns

  6. K-Means Clustering: Groups similar queries using TD_KMeans with optimal K from configuration

  7. Silhouette Analysis: Calculates clustering quality scores using TD_Silhouette

  8. Statistics Generation: Creates comprehensive cluster statistics with performance aggregations

PERFORMANCE METRICS EXPLAINED:

  • AMPCPUTIME: Total CPU seconds across all AMPs (primary optimization target)

  • CPUSKW/IOSKW: CPU/I/O skew ratios (>2.0 indicates distribution problems)

  • PJI: Physical-to-Logical I/O ratio (higher = more CPU-intensive)

  • UII: Unit I/O Intensity (higher = more I/O-intensive relative to CPU)

  • LogicalIO: Total logical I/O operations (indicates scan intensity)

  • NumSteps: Query plan complexity (higher = more complex plans)

CONFIGURATION (from sql_opt_config.yml):

  • Uses top {default_max_queries} queries by CPU time (configurable)

  • Creates {default_optimal_k} clusters by default (configurable via optimal_k parameter)

  • Embedding model: {sql_clustering_config.get('model', {}).get('model_id', 'bge-small-en-v1.5')}

  • Vector dimensions: {sql_clustering_config.get('embedding', {}).get('vector_length', 384)}

  • All database and table names are configurable

OPTIMIZATION WORKFLOW: After running this tool, use:

  1. sql_Analyze_Cluster_Stats to identify problematic clusters

  2. sql_Retrieve_Cluster_Queries to get actual SQL from target clusters

  3. LLM analysis to identify patterns and propose specific optimizations

USE CASES:

  • Identify query families consuming the most system resources

  • Find queries with similar patterns but different performance

  • Discover optimization opportunities through clustering analysis

  • Prioritize DBA effort on highest-impact query improvements

  • Understand workload composition and resource distribution

PREREQUISITES:

  • DBC.DBQLSqlTbl and DBC.DBQLOgTbl must be accessible

  • Embedding models and tokenizers must be installed in feature_ext_db

  • Sufficient space in feature_ext_db for intermediate and final tables

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
max_queriesNo
optimal_kNo

Implementation Reference

  • The core handler function implementing the entire SQL query clustering pipeline: extracts queries from DBQL, tokenizes, generates embeddings, performs K-means clustering, computes silhouette scores, and generates cluster statistics.
    def handle_sql_Execute_Full_Pipeline( conn, optimal_k: int = None, max_queries: int = None, *args, **kwargs ): """ **COMPLETE SQL QUERY CLUSTERING PIPELINE FOR HIGH-USAGE QUERY OPTIMIZATION** This tool executes the entire SQL query clustering workflow to identify and analyze high CPU usage queries for optimization opportunities. It's designed for database performance analysts and DBAs who need to systematically identify query optimization candidates. **FULL PIPELINE WORKFLOW:** 1. **Query Log Extraction**: Extracts SQL queries from DBC.DBQLSqlTbl with comprehensive performance metrics 2. **Performance Metrics Calculation**: Computes CPU skew, I/O skew, PJI (Physical to Logical I/O ratio), UII (Unit I/O Intensity) 3. **Query Tokenization**: Tokenizes SQL text using {sql_clustering_config.get('model', {}).get('model_id', 'bge-small-en-v1.5')} tokenizer via ivsm.tokenizer_encode 4. **Embedding Generation**: Creates semantic embeddings using ivsm.IVSM_score with ONNX models 5. **Vector Store Creation**: Converts embeddings to vector columns via ivsm.vector_to_columns 6. **K-Means Clustering**: Groups similar queries using TD_KMeans with optimal K from configuration 7. **Silhouette Analysis**: Calculates clustering quality scores using TD_Silhouette 8. **Statistics Generation**: Creates comprehensive cluster statistics with performance aggregations **PERFORMANCE METRICS EXPLAINED:** - **AMPCPUTIME**: Total CPU seconds across all AMPs (primary optimization target) - **CPUSKW/IOSKW**: CPU/I/O skew ratios (>2.0 indicates distribution problems) - **PJI**: Physical-to-Logical I/O ratio (higher = more CPU-intensive) - **UII**: Unit I/O Intensity (higher = more I/O-intensive relative to CPU) - **LogicalIO**: Total logical I/O operations (indicates scan intensity) - **NumSteps**: Query plan complexity (higher = more complex plans) **CONFIGURATION (from sql_opt_config.yml):** - Uses top {default_max_queries} queries by CPU time (configurable) - Creates {default_optimal_k} clusters by default (configurable via optimal_k parameter) - Embedding model: {sql_clustering_config.get('model', {}).get('model_id', 'bge-small-en-v1.5')} - Vector dimensions: {sql_clustering_config.get('embedding', {}).get('vector_length', 384)} - All database and table names are configurable **OPTIMIZATION WORKFLOW:** After running this tool, use: 1. sql_Analyze_Cluster_Stats to identify problematic clusters 2. sql_Retrieve_Cluster_Queries to get actual SQL from target clusters 3. LLM analysis to identify patterns and propose specific optimizations **USE CASES:** - Identify query families consuming the most system resources - Find queries with similar patterns but different performance - Discover optimization opportunities through clustering analysis - Prioritize DBA effort on highest-impact query improvements - Understand workload composition and resource distribution **PREREQUISITES:** - DBC.DBQLSqlTbl and DBC.DBQLOgTbl must be accessible - Embedding models and tokenizers must be installed in feature_ext_db - Sufficient space in feature_ext_db for intermediate and final tables """ config = SQL_CLUSTERING_CONFIG # Use config defaults if not provided if optimal_k is None: optimal_k = config['clustering']['optimal_k'] if max_queries is None: max_queries = config['clustering']['max_queries'] logger.debug(f"handle_sql_Execute_Full_Pipeline: optimal_k={optimal_k}, max_queries={max_queries}") # Extract config values feature_db = config['databases']['feature_db'] model_db = config['databases']['model_db'] model_id = config['model']['model_id'] tables = config['tables'] embedding_config = config['embedding'] clustering_config = config['clustering'] with conn.cursor() as cur: # Create main SQL query log table logger.debug(f"Step 1: Creating main query log table {feature_db}.{tables['sql_query_log_main']}") try: cur.execute(f"DROP TABLE {feature_db}.{tables['sql_query_log_main']}") logger.debug(f"Dropped existing table {feature_db}.{tables['sql_query_log_main']}") except Exception as e: logger.debug(f"DROP failed or table not found: {e}") main_query_sql = f""" CREATE TABLE {feature_db}.{tables['sql_query_log_main']} AS ( SELECT CAST(a.QueryID AS BIGINT) AS id, a.SQLTextInfo AS txt, b.username, b.appid, b.numsteps, b.ampcputime, b.TotalIOCount AS logicalio, b.wdname, CASE WHEN b.ampcputime < HashAmp()+1 OR (b.ampcputime / (HashAmp()+1)) = 0 THEN 0 ELSE b.maxampcputime/(b.ampcputime / (HashAmp()+1)) END (DEC(8,2)) AS CPUSKW, CASE WHEN b.ampcputime < HashAmp()+1 OR (b.TotalIOCount / (HashAmp()+1)) = 0 THEN 0 ELSE b.maxampio/(b.TotalIOCount / (HashAmp()+1)) END (DEC(8,2)) AS IOSKW, CASE WHEN b.ampcputime < HashAmp()+1 OR b.TotalIOCount = 0 THEN 0 ELSE (b.ampcputime * 1000)/b.TotalIOCount END AS PJI, CASE WHEN b.ampcputime < HashAmp()+1 OR b.ampcputime = 0 THEN 0 ELSE b.TotalIOCount/(b.ampcputime * 1000) END AS UII, CAST(EXTRACT(HOUR FROM ((b.FirstRespTime - b.StartTime) HOUR(3) TO SECOND(6))) * 3600 + EXTRACT(MINUTE FROM ((b.FirstRespTime - b.StartTime) HOUR(3) TO SECOND(6))) * 60 + EXTRACT(SECOND FROM ((b.FirstRespTime - b.StartTime) HOUR(3) TO SECOND(6))) AS DECIMAL(10,2)) AS response_secs, (CAST(EXTRACT(HOUR FROM ((b.FirstRespTime - b.StartTime) HOUR(3) TO SECOND(6))) * 3600 + EXTRACT(MINUTE FROM ((b.FirstRespTime - b.StartTime) HOUR(3) TO SECOND(6))) * 60 + EXTRACT(SECOND FROM ((b.FirstRespTime - b.StartTime) HOUR(3) TO SECOND(6))) AS DECIMAL(10,2)))/60.0 AS response_mins, CASE WHEN b.delaytime IS NULL THEN 0.0 ELSE b.delaytime END AS delaytime FROM DBC.DBQLSqlTbl a JOIN ( -- OPTIMIZATION: Filter to top queries by CPU BEFORE joining SELECT * FROM DBC.DBQLOgTbl WHERE LOWER(statementtype) IN ('select','create table') QUALIFY ROW_NUMBER() OVER (ORDER BY ampcputime DESC) <= {max_queries} ) b ON a.queryid = b.queryid AND a.procid = b.procid WHERE a.SQLTextInfo NOT LIKE '%SET QUERY_BAND%' AND a.SQLTextInfo NOT LIKE '%ParamValue%' AND a.SQLTextInfo NOT LIKE '%SELECT CURRENT_TIMESTAMP%' AND LOWER(a.SQLTextInfo) NOT LIKE '%dbc.%' AND a.SqlRowNo = 1 ) WITH DATA """ cur.execute(main_query_sql) logger.debug(f"Created main query log table") # Create tokenized table for embeddings logger.debug(f"Step 2: Creating tokenized table {feature_db}.{tables['sql_log_tokenized_for_embeddings']}") try: cur.execute(f"DROP TABLE {feature_db}.{tables['sql_log_tokenized_for_embeddings']}") except Exception as e: logger.debug(f"DROP failed or table not found: {e}") tokenize_sql = f""" CREATE TABLE {feature_db}.{tables['sql_log_tokenized_for_embeddings']} AS ( SELECT id, txt, IDS AS input_ids, attention_mask FROM ivsm.tokenizer_encode( ON (SELECT * FROM {feature_db}.{tables['sql_query_log_main']}) ON (SELECT model AS tokenizer FROM {model_db}.{tables['embedding_tokenizers']} WHERE model_id = '{model_id}') DIMENSION USING ColumnsToPreserve('id', 'txt') OutputFields('IDS', 'ATTENTION_MASK') MaxLength({embedding_config['max_length']}) PadToMaxLength('{embedding_config['pad_to_max_length']}') TokenDataType('INT64') ) AS dt ) WITH DATA """ cur.execute(tokenize_sql) logger.debug(f"Created tokenized table") # Create embeddings table logger.debug(f"Step 3: Creating embeddings table {feature_db}.{tables['sql_log_embeddings']}") try: cur.execute(f"DROP TABLE {feature_db}.{tables['sql_log_embeddings']}") except Exception as e: logger.debug(f"DROP failed or table not found: {e}") embeddings_sql = f""" CREATE TABLE {feature_db}.{tables['sql_log_embeddings']} AS ( SELECT * FROM ivsm.IVSM_score( ON {feature_db}.{tables['sql_log_tokenized_for_embeddings']} ON (SELECT * FROM {model_db}.{tables['embedding_models']} WHERE model_id = '{model_id}') DIMENSION USING ColumnsToPreserve('id', 'txt') ModelType('ONNX') BinaryInputFields('input_ids', 'attention_mask') BinaryOutputFields('sentence_embedding') Caching('inquery') ) a ) WITH DATA """ cur.execute(embeddings_sql) logger.debug(f"Created embeddings table") # Create embeddings store table logger.debug(f"Step 4: Creating embeddings store table {feature_db}.{tables['sql_log_embeddings_store']}") try: cur.execute(f"DROP TABLE {feature_db}.{tables['sql_log_embeddings_store']}") except Exception as e: logger.debug(f"DROP failed or table not found: {e}") embeddings_store_sql = f""" CREATE TABLE {feature_db}.{tables['sql_log_embeddings_store']} AS ( SELECT * FROM ivsm.vector_to_columns( ON {feature_db}.{tables['sql_log_embeddings']} USING ColumnsToPreserve('id', 'txt') VectorDataType('FLOAT32') VectorLength({embedding_config['vector_length']}) OutputColumnPrefix('emb_') InputColumnName('sentence_embedding') ) a ) WITH DATA """ cur.execute(embeddings_store_sql) logger.debug(f"Created embeddings store table") # Perform K-means clustering logger.debug(f"Step 5: Performing K-means clustering with k={optimal_k}") try: cur.execute(f"DROP TABLE {feature_db}.{tables['sql_query_clusters_temp']}") except Exception as e: logger.debug(f"DROP failed or table not found: {e}") kmeans_sql = f""" CREATE TABLE {feature_db}.{tables['sql_query_clusters_temp']} AS ( SELECT td_clusterid_kmeans, a.* FROM TD_KMeans ( ON {feature_db}.{tables['sql_log_embeddings_store']} AS InputTable USING IdColumn('id') TargetColumns('[2:385]') NumClusters({optimal_k}) Seed({clustering_config['seed']}) StopThreshold({clustering_config['stop_threshold']}) OutputClusterAssignment('true') MaxIterNum({clustering_config['max_iterations']}) ) AS dt JOIN {feature_db}.{tables['sql_query_log_main']} a ON a.id = dt.id ) WITH DATA """ cur.execute(kmeans_sql) logger.debug(f"Created temporary clusters table") # Create final clusters table with silhouette scores logger.debug(f"Step 6: Creating final clusters table with silhouette scores") try: cur.execute(f"DROP TABLE {feature_db}.{tables['sql_query_clusters']}") except Exception as e: logger.debug(f"DROP failed or table not found: {e}") final_clusters_sql = f""" CREATE TABLE {feature_db}.{tables['sql_query_clusters']} AS ( SELECT a.*, b.silhouette_score FROM {feature_db}.{tables['sql_query_clusters_temp']} a JOIN (SELECT * FROM TD_Silhouette( ON (SELECT td_clusterid_kmeans, b.* FROM {feature_db}.{tables['sql_query_clusters_temp']} a JOIN {feature_db}.{tables['sql_log_embeddings_store']} b ON a.id = b.id) AS InputTable USING IdColumn('id') ClusterIdColumn('td_clusterid_kmeans') TargetColumns('[4:]') OutputType('SAMPLE_SCORES') ) AS dt) AS b ON a.id = b.id ) WITH DATA PRIMARY INDEX(id) """ cur.execute(final_clusters_sql) logger.debug(f"Created final clusters table") # Create cluster statistics table logger.debug(f"Step 7: Creating cluster statistics table") try: cur.execute(f"DROP TABLE {feature_db}.{tables['query_cluster_stats']}") except Exception as e: logger.debug(f"DROP failed or table not found: {e}") cluster_stats_sql = f""" CREATE TABLE {feature_db}.{tables['query_cluster_stats']} AS ( SELECT a.td_clusterid_kmeans, AVG(a.numsteps) AS avg_numsteps, VAR_SAMP(a.numsteps) AS var_numsteps, AVG(a.ampcputime) AS avg_cpu, VAR_SAMP(a.ampcputime) AS var_cpu, AVG(a.logicalio) AS avg_io, VAR_SAMP(a.logicalio) AS var_io, AVG(a.cpuskw) AS avg_cpuskw, VAR_SAMP(a.cpuskw) AS var_cpuskw, AVG(a.ioskw) AS avg_ioskw, VAR_SAMP(a.ioskw) AS var_ioskw, AVG(a.pji) AS avg_pji, VAR_SAMP(a.pji) AS var_pji, AVG(a.uii) AS avg_uii, VAR_SAMP(a.uii) AS var_uii, MAX(un.top_username) AS top_username, MAX(top_wdname) AS top_wdname, MAX(top_appid) AS top_appid, MAX(s1.silhouette_score) AS overall_silhouette_score, MAX(s2.silhouette_score) AS cluster_silhouette_score, COUNT(*) AS queries FROM {feature_db}.{tables['sql_query_clusters']} a JOIN ( SELECT td_clusterid_kmeans, username AS top_UserName FROM {feature_db}.{tables['sql_query_clusters']} GROUP BY td_clusterid_kmeans, username QUALIFY ROW_NUMBER() OVER (PARTITION BY td_clusterid_kmeans ORDER BY COUNT(*) DESC) = 1 ) un ON a.td_clusterid_kmeans = un.td_clusterid_kmeans JOIN ( SELECT td_clusterid_kmeans, wdname AS top_wdname FROM {feature_db}.{tables['sql_query_clusters']} GROUP BY td_clusterid_kmeans, wdname QUALIFY ROW_NUMBER() OVER (PARTITION BY td_clusterid_kmeans ORDER BY COUNT(*) DESC) = 1 ) wd ON un.td_clusterid_kmeans = wd.td_clusterid_kmeans JOIN ( SELECT td_clusterid_kmeans, appid AS top_AppId FROM {feature_db}.{tables['sql_query_clusters']} GROUP BY td_clusterid_kmeans, appid QUALIFY ROW_NUMBER() OVER (PARTITION BY td_clusterid_kmeans ORDER BY COUNT(*) DESC) = 1 ) ap ON un.td_clusterid_kmeans = ap.td_clusterid_kmeans CROSS JOIN ( SELECT * FROM TD_Silhouette( ON (SELECT td_clusterid_kmeans, b.* FROM {feature_db}.{tables['sql_query_clusters']} a JOIN {feature_db}.{tables['sql_log_embeddings_store']} b ON a.id = b.id) AS InputTable USING IdColumn('id') ClusterIdColumn('td_clusterid_kmeans') TargetColumns('[4:]') OutputType('SCORE') ) AS dt ) AS s1 JOIN ( SELECT * FROM TD_Silhouette( ON (SELECT td_clusterid_kmeans, b.* FROM {feature_db}.{tables['sql_query_clusters']} a JOIN {feature_db}.{tables['sql_log_embeddings_store']} b ON a.id = b.id) AS InputTable USING IdColumn('id') ClusterIdColumn('td_clusterid_kmeans') TargetColumns('[4:]') OutputType('CLUSTER_SCORES') ) AS dt ) s2 ON a.td_clusterid_kmeans = s2.td_clusterid_kmeans GROUP BY a.td_clusterid_kmeans ) WITH DATA PRIMARY INDEX(td_clusterid_kmeans) """ cur.execute(cluster_stats_sql) logger.debug(f"Created cluster statistics table") # Get final results cur.execute(f"SELECT COUNT(*) FROM {feature_db}.{tables['sql_query_clusters']}") total_queries = cur.fetchone()[0] cur.execute(f"SELECT COUNT(DISTINCT td_clusterid_kmeans) FROM {feature_db}.{tables['sql_query_clusters']}") total_clusters = cur.fetchone()[0] cur.execute(f"SELECT AVG(silhouette_score) FROM {feature_db}.{tables['sql_query_clusters']}") avg_silhouette = cur.fetchone()[0] # Return metadata metadata = { "tool_name": "sql_Execute_Full_Pipeline", "workflow_steps": [ "query_log_extracted", "queries_tokenized", "embeddings_generated", "embeddings_stored", "kmeans_clustering_completed", "silhouette_scores_calculated", "cluster_statistics_generated" ], "configuration": { "optimal_k": optimal_k, "max_queries_processed": max_queries, "model_id": model_id, "clustering_parameters": clustering_config, "embedding_parameters": embedding_config }, "results": { "total_queries_clustered": total_queries, "total_clusters_created": total_clusters, "average_silhouette_score": float(avg_silhouette) if avg_silhouette else None }, "tables_created": [ f"{feature_db}.{tables['sql_query_log_main']}", f"{feature_db}.{tables['sql_log_tokenized_for_embeddings']}", f"{feature_db}.{tables['sql_log_embeddings']}", f"{feature_db}.{tables['sql_log_embeddings_store']}", f"{feature_db}.{tables['sql_query_clusters']}", f"{feature_db}.{tables['query_cluster_stats']}" ], "description": "Complete SQL query clustering pipeline executed: extracted SQL logs → tokenized → embedded → clustered → analyzed" } return create_response({"status": "success", "pipeline_completed": True}, metadata)
  • Dynamic registration loop that discovers handle_* functions from loaded modules and registers them as MCP tools using the tool name derived by stripping 'handle_' prefix. This registers 'sql_Execute_Full_Pipeline'.
    for name, func in all_functions.items(): if not (inspect.isfunction(func) and name.startswith("handle_")): continue tool_name = name[len("handle_"):] if not any(re.match(p, tool_name) for p in config.get('tool', [])): continue wrapped = make_tool_wrapper(func) mcp.tool(name=tool_name, description=wrapped.__doc__)(wrapped) logger.info(f"Created tool: {tool_name}")
  • Global configuration loader for SQL clustering parameters used throughout the pipeline.
    SQL_CLUSTERING_CONFIG = load_sql_clustering_config()
  • Package init that imports the sql_opt_tools making the handler available for module loader.
    from .sql_opt_resources import * from .sql_opt_tools import *

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/blitzstermayank/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server