query_repository.py•10.4 kB
"""
Query Repository Service for storing and retrieving SQL queries.
This service handles:
- Query storage and retrieval
- Query metadata management
- Future integration with external query repositories
"""
from typing import Dict, Optional
class QueryRepository:
    """Repository for storing and retrieving SQL queries used by cost optimization functions."""
    
    def __init__(self):
        """Initialize the query repository with predefined queries."""
        self.queries = {
            "CO-01-01-table-formats": """
                SELECT 
                    data_source_format AS tables_format,
                    count(data_source_format) AS no_of_tables
                FROM system.information_schema.tables 
                GROUP BY ALL
                ORDER BY no_of_tables desc;
            """,
            
            "CO-01-01-managed-tables": """
                SELECT 
                table_type,
                round(count(table_type)/(select count(*) from system.information_schema.tables) * 100) as percent_of_tables
                FROM system.information_schema.tables
                group by ALL
                having percent_of_tables > 0
                order by percent_of_tables desc
            """,
            
            "CO-01-02": """
                with clusters AS (
                    SELECT
                        *,
                        ROW_NUMBER() OVER(PARTITION BY workspace_id, cluster_id ORDER BY change_time DESC) as rn
                    FROM system.compute.clusters
                    WHERE cluster_source="UI" OR cluster_source="API"
                    QUALIFY rn=1
                ),
                    job_tasks_exploded AS (
                    SELECT
                        workspace_id,
                        job_id,
                        EXPLODE(compute_ids) as cluster_id
                    FROM system.lakeflow.job_task_run_timeline
                    WHERE period_start_time >= CURRENT_DATE() - INTERVAL 30 DAY
                    GROUP BY ALL
                ),
                    all_purpose_cluster_jobs AS (
                    SELECT
                        t1.*,
                        t2.cluster_name,
                        t2.owned_by
                    FROM job_tasks_exploded t1
                        INNER JOIN clusters t2 USING (workspace_id, cluster_id)
                )
                SELECT * FROM all_purpose_cluster_jobs LIMIT 10;
            """,
            
            "CO-01-03": """
                SELECT billing_origin_product, sum(usage_quantity) as dbu 
                FROM system.billing.usage 
                WHERE billing_origin_product in ('SQL','ALL_PURPOSE') 
                  AND usage_date >= current_date() - interval 30 days 
                GROUP BY billing_origin_product;
            """,
            
            "CO-01-04": """
                SELECT regexp_extract(dbr_version, '^(\\\\d+\\\\.\\\\d+)',1) as version, count(*) as count
                FROM system.compute.clusters 
                WHERE NOT contains(dbr_version, 'custom') 
                  AND cluster_source NOT IN('PIPELINE','PIPELINE_MAINTENANCE') 
                  AND delete_time IS NULL
                GROUP BY 1
                ORDER BY count DESC;
            """,
            
            "CO-01-06-serverless": """
                WITH serverless AS (
                    SELECT sum(usage_quantity) as dbu 
                    FROM system.billing.usage u 
                    WHERE contains(u.sku_name, 'SERVERLESS') 
                      AND u.billing_origin_product in ('ALL_PURPOSE','SQL','JOBS', 'DLT','INTERACTIVE') 
                      AND date_diff(day, u.usage_start_time, now()) <28
                ),
                total AS (
                    SELECT sum(usage_quantity) as dbu 
                    FROM system.billing.usage u 
                    WHERE u.billing_origin_product in ('ALL_PURPOSE','SQL','JOBS', 'DLT','INTERACTIVE') 
                      AND date_diff(day, u.usage_start_time, now()) <28
                )
                SELECT serverless.dbu * 100 / total.dbu as serverless_dbu_percent 
                FROM serverless 
                CROSS JOIN total;
            """,
            
            "CO-01-06-sql": """
                SELECT 
                CASE 
                  WHEN t1.sku_name LIKE '%SERVERLESS_SQL%' THEN 'SQL_SERVERLESS' 
                  WHEN t1.sku_name LIKE '%ENTERPRISE_SQL_COMPUTE%' THEN 'SQL_CLASSIC'
                  WHEN t1.sku_name LIKE '%SQL_PRO%' THEN 'SQL_PRO'
                  ELSE 'Other' 
                END as sql_sku_name,
                SUM(t1.usage_quantity * list_prices.pricing.default) as list_cost
                FROM system.billing.usage t1
                INNER JOIN system.billing.list_prices 
                  ON t1.cloud = list_prices.cloud 
                  AND t1.sku_name = list_prices.sku_name 
                  AND t1.usage_start_time >= list_prices.price_start_time 
                  AND (t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
                WHERE t1.sku_name LIKE '%SQL%' 
                  AND t1.usage_date >= current_date() - interval 30 days
                GROUP BY ALL;
            """,
            
            "CO-01-08": """
                WITH per_cluster_daily AS (
                  SELECT
                    cluster_id,
                    DATE_TRUNC('DAY', start_time) AS day,
                    AVG(cpu_user_percent + cpu_system_percent) AS avg_cpu_usage_percent,
                    AVG(mem_used_percent) AS avg_memory_usage_percent
                  FROM system.compute.node_timeline
                  WHERE start_time >= CURRENT_DATE - INTERVAL 28 DAYS
                  GROUP BY cluster_id, DATE_TRUNC('DAY', start_time)
                )
                SELECT
                  percentile(avg_cpu_usage_percent, 0.75) as cpu_usage_percent_p75,
                  percentile(avg_memory_usage_percent, 0.75) as memory_usage_percent_p75
                FROM per_cluster_daily
                GROUP BY ALL;
            """,
            
            "CO-02-01": """
                WITH autoscaling_count AS (
                  SELECT count(*) as autoscaling_count 
                  FROM system.compute.clusters 
                  WHERE max_autoscale_workers IS NOT NULL 
                    AND delete_time IS NULL
                ),
                total_clusters_count AS (
                  SELECT count(*) as total_clusters_count 
                  FROM system.compute.clusters 
                  WHERE delete_time IS NULL
                )
                SELECT autoscaling_count.autoscaling_count * 100 / total_clusters_count.total_clusters_count as autoscaling_percent 
                FROM total_clusters_count 
                CROSS JOIN autoscaling_count;
            """,
            
            "CO-02-02": """
                SELECT percentile(c.auto_termination_minutes, 0.75) as p_75_auto_termination_minutes, 
                       max(c.auto_termination_minutes) as max_auto_termination_minutes, 
                       count_if(c.auto_termination_minutes is null) as count_clusters_without_autoterminations, 
                       count_if(c.auto_termination_minutes is not null) as count_clusters_with_autoterminations, 
                       (count_clusters_without_autoterminations*100)/count(*) as percent_clusters_without_autoterminations 
                FROM system.compute.clusters c 
                WHERE c.cluster_source in ('UI','API') 
                  AND c.delete_time IS NULL;
            """,
            
            "CO-03-01": """
                SELECT
                  event_date,
                  count(*) as usage_read
                FROM system.access.audit
                WHERE service_name = 'unityCatalog'
                  AND action_name = 'getTable'
                  AND request_params.full_name_arg = 'system.billing.usage'
                  AND user_identity.email != 'System-User'
                  AND (date_diff(day, event_date, current_date()) <= 90)
                GROUP BY event_date
                ORDER BY event_date;
            """,
            
            "CO-03-02-tagging": """
                SELECT array_size(map_entries(tags)) as number_of_tags, count(*) as count 
                FROM system.compute.clusters 
                WHERE tags.ResourceClass IS NULL 
                  AND delete_time IS NULL
                GROUP BY number_of_tags
                ORDER BY count DESC, number_of_tags DESC;
            """,
            
            "CO-03-02-popular": """
                WITH tag_counts AS (
                  SELECT explode(map_keys(tags)) as tag, count(*) as count
                  FROM system.compute.clusters
                  GROUP BY 1
                ),
                cluster_count AS (SELECT count(*) as count FROM system.compute.clusters)
                SELECT tag_counts.tag,
                       sum(tag_counts.count) / any_value(cluster_count.count) *100 as percent
                FROM tag_counts 
                CROSS JOIN cluster_count
                GROUP BY tag_counts.tag
                ORDER BY percent DESC;
            """
        }
    
    def get_query(self, query_name: str) -> Optional[str]:
        """
        Retrieve a SQL query by name.
        
        Args:
            query_name: Name of the query to retrieve
            
        Returns:
            SQL query string or None if not found
        """
        return self.queries.get(query_name)
    
    def add_query(self, query_name: str, query: str) -> None:
        """
        Add a new query to the repository.
        
        Args:
            query_name: Name for the new query
            query: SQL query string
        """
        self.queries[query_name] = query
    
    def list_queries(self) -> Dict[str, str]:
        """
        Get all available queries.
        
        Returns:
            Dictionary of query names and their SQL strings
        """
        return self.queries.copy()
# Default singleton instance
_query_repository = None
def get_query_repository() -> QueryRepository:
    """Get or create the default query repository instance."""
    global _query_repository
    if _query_repository is None:
        _query_repository = QueryRepository()
    return _query_repository