Skip to main content
Glama
urfanazad

BigQuery FinOps MCP Server

by urfanazad
azuresql.py11.8 kB
import logging import os from datetime import timedelta from typing import Any, Dict from azure.core.exceptions import ClientAuthenticationError from azure.identity import DefaultAzureCredential from azure.monitor.querymetrics import MetricsClient import openai import sqlparse from .base import BaseDataSource try: import pyodbc AZURESQL_AVAILABLE = True except ImportError: AZURESQL_AVAILABLE = False class AzureSQLDataSource(BaseDataSource): def __init__(self): self.conn_str = None self.connection = None self.monitor_client = None self.resource_uri = "" async def connect(self) -> None: # Connect to Azure SQL DB if AZURESQL_AVAILABLE: try: server = os.getenv("AZURE_SQL_SERVER") database = os.getenv("AZURE_SQL_DATABASE") username = os.getenv("AZURE_SQL_USERNAME") password = os.getenv("AZURE_SQL_PASSWORD") if all([server, database, username, password]): self.conn_str = ( f"DRIVER={{ODBC Driver 17 for SQL Server}};" f"SERVER={server};" f"DATABASE={database};" f"UID={username};" f"PWD={password}" ) self.connection = pyodbc.connect(self.conn_str, timeout=5) logging.info("Connected to Azure SQL.") # Prepare resource URI for Azure Monitor subscription_id = os.getenv("AZURE_SUBSCRIPTION_ID") resource_group = os.getenv("AZURE_RESOURCE_GROUP") self.resource_uri = ( f"/subscriptions/{subscription_id}/resourceGroups/{resource_group}/providers/Microsoft.Sql/" f"servers/{server.split('.')[0]}/databases/{database}" ) else: logging.warning("Azure SQL environment variables not set. Using mock data for queries.") except Exception as e: logging.error(f"Could not connect to Azure SQL: {e}") self.connection = None else: logging.warning("pyodbc not installed. Azure SQL data source is not available.") # Connect to Azure Monitor try: credential = DefaultAzureCredential() self.monitor_client = MetricsClient(credential) logging.info("Connected to Azure Monitor.") except (ClientAuthenticationError, ImportError) as e: logging.error(f"Could not connect to Azure Monitor: {e}") self.monitor_client = None async def get_cost_summary(self) -> Dict[str, Any]: if not self.monitor_client: logging.warning("No Azure Monitor connection. Returning mock data.") return { "service_tier": "General Purpose", "vCores": 4, "storage_gb": 512, "estimated_monthly_cost": 750.00, "description": "Metrics are based on provisioned resources, not per-query cost." } try: response = self.monitor_client.query_resource( self.resource_uri, metric_names=["cpu_percent", "dtu_used", "storage_percent"], timespan=timedelta(hours=1), granularity=timedelta(minutes=5), aggregations=["average"] ) metrics = {metric.name: metric.timeseries[0].data[-1].average for metric in response.metrics if metric.timeseries} return { "cpu_percent_avg": metrics.get("cpu_percent"), "dtu_used_avg": metrics.get("dtu_used"), "storage_percent_avg": metrics.get("storage_percent"), "description": "Live metrics from Azure Monitor from the last hour." } except Exception as e: logging.error(f"Error querying Azure Monitor: {e}") return {"error": str(e)} async def get_expensive_queries(self) -> Dict[str, Any]: if not self.connection: logging.warning("No Azure SQL connection available. Returning mock data.") return { "queries": [ { "id": "query_hash_1", "query": "SELECT * FROM sales.orders WHERE order_date > '2024-01-01'", "avg_cpu_time_ms": 1200, "avg_duration_ms": 2500, "execution_count": 50, "optimization": "Consider adding an index on order_date.", "severity": "high" } ] } query = """ SELECT TOP 20 qt.query_sql_text, q.query_id, rs.avg_cpu_time, rs.avg_duration, rs.count_executions FROM sys.query_store_query_text AS qt JOIN sys.query_store_query AS q ON qt.query_text_id = q.query_text_id JOIN sys.query_store_runtime_stats AS rs ON q.query_id = rs.query_id ORDER BY rs.avg_cpu_time DESC; """ try: cursor = self.connection.cursor() cursor.execute(query) rows = cursor.fetchall() queries = [ { "id": row.query_id, "query": row.query_sql_text, "avg_cpu_time_ms": row.avg_cpu_time / 1000, "avg_duration_ms": row.avg_duration / 1000, "execution_count": row.count_executions, "optimization": "Analyze query plan for optimization opportunities.", "severity": "medium" } for row in rows ] return {"queries": queries} except Exception as e: logging.error(f"Error querying Query Store: {e}") return {"error": str(e)} async def get_project_costs(self) -> Dict[str, Any]: if not self.connection: logging.warning("No Azure SQL connection available. Returning mock data.") return { "databases": [ {"name": "SalesDB", "size_gb": 250, "service_tier": "General Purpose"}, {"name": "ReportingDB", "size_gb": 150, "service_tier": "General Purpose"}, ] } query = "SELECT name, service_objective, (size * 8) / 1024.0 / 1024.0 AS size_gb FROM sys.database_service_objectives" try: cursor = self.connection.cursor() cursor.execute(query) rows = cursor.fetchall() databases = [ { "name": row.name, "service_tier": row.service_objective, "size_gb": round(row.size_gb, 2) } for row in rows ] return {"databases": databases} except Exception as e: logging.error(f"Error querying database sizes: {e}") return {"error": str(e)} async def get_cost_trends(self) -> Dict[str, Any]: if not self.monitor_client: logging.warning("No Azure Monitor connection. Returning mock data.") return { "trends": [ {"date": "2024/01", "cpu_usage_percent": 60}, {"date": "2024/02", "cpu_usage_percent": 65}, {"date": "2024/03", "cpu_usage_percent": 70}, ] } try: response = self.monitor_client.query_resource( self.resource_uri, metric_names=["cpu_percent"], timespan=timedelta(days=30), granularity=timedelta(days=1), aggregations=["average"] ) trends = [ { "date": d.timestamp.strftime("%Y/%m/%d"), "cpu_usage_percent": round(d.average, 2) } for d in response.metrics[0].timeseries[0].data ] return {"trends": trends} except Exception as e: logging.error(f"Error querying Azure Monitor for cost trends: {e}") return {"error": str(e)} async def analyze_query_cost(self, query: str, dry_run: bool) -> Dict[str, Any]: if not self.connection: logging.warning("No Azure SQL connection available. Returning mock data.") return { "estimated_impact": "Medium", "optimization_suggestion": "Review the query execution plan to identify bottlenecks.", "description": "SQL Server cost analysis is based on resource consumption (CPU, I/O), not data scanned." } # Security measure: Use sqlparse to validate the query try: parsed = sqlparse.parse(query) if len(parsed) != 1 or parsed[0].get_type() != 'SELECT': return {"error": "Invalid query provided. Only single SELECT statements are allowed."} validated_query = parsed[0].to_unicode() cursor = self.connection.cursor() cursor.execute("SET SHOWPLAN_XML ON;") cursor.execute(validated_query) plan = cursor.fetchone()[0] cursor.execute("SET SHOWPLAN_XML OFF;") return {"execution_plan": plan} except Exception as e: logging.error(f"Error getting execution plan: {e}") return {"error": str(e)} async def get_cost_by_user(self, days: int) -> Dict[str, Any]: if not self.connection: logging.warning("No Azure SQL connection available. Returning mock data.") return { "users": [ {"name": "sales_app_user", "total_executions": 1200}, {"name": "reporting_user", "total_executions": 800}, ] } query = "SELECT login_name, COUNT(*) AS session_count FROM sys.dm_exec_sessions GROUP BY login_name" try: cursor = self.connection.cursor() cursor.execute(query) rows = cursor.fetchall() users = [ { "name": row.login_name, "session_count": row.session_count } for row in rows ] return {"users": users} except Exception as e: logging.error(f"Error querying sessions by user: {e}") return {"error": str(e)} async def natural_language_to_sql(self, question: str) -> Dict[str, Any]: try: client = openai.AzureOpenAI( api_key=os.getenv("AZURE_OPENAI_API_KEY"), api_version="2023-12-01-preview", azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT") ) # A simple prompt to get started. In a real application, you'd want to provide # more context, such as the database schema. prompt = f"Translate the following natural language question into a SQL query for Azure SQL Server:\n\nQuestion: {question}\n\nSQL Query:" response = client.completions.create( model=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"), prompt=prompt, max_tokens=150, temperature=0 ) return {"sql_query": response.choices[0].text.strip()} except Exception as e: logging.error(f"Error calling Azure OpenAI: {e}") return {"error": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/urfanazad/BQ_MCP_OPTIMISER'

If you have feedback or need assistance with the MCP directory API, please join our Discord server