Skip to main content
Glama

BigQuery FinOps MCP Server

by urfanazad
mcp_server.py8.44 kB
#!/usr/bin/env python3 """ FinOps MCP Server Provides cost analytics and query optimization insights from various data sources. """ import asyncio import json import logging import os from typing import Any, Optional import mcp.server.stdio import mcp.types as types from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions from pydantic import AnyUrl from data_sources.azuresql import AzureSQLDataSource from data_sources.base import BaseDataSource from data_sources.bigquery import BigQueryDataSource from utils import handle_errors # ============================================================================== # Configuration # ============================================================================== # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Initialize the MCP server server = Server("azure-bq-mcp-optimiser") # Global variable for the data source DATA_SOURCE: Optional[BaseDataSource] = None # ============================================================================== # MCP Server Implementation # ============================================================================== @server.list_resources() async def handle_list_resources() -> list[types.Resource]: """List available FinOps resources.""" return [ types.Resource( uri=AnyUrl("finops://cost-summary"), name="Cost Summary", description="Overall cost metrics and trends", mimeType="application/json", ), types.Resource( uri=AnyUrl("finops://expensive-queries"), name="Expensive Queries", description="Top cost queries with optimization recommendations", mimeType="application/json", ), types.Resource( uri=AnyUrl("finops://project-costs"), name="Project Costs", description="Cost breakdown by project", mimeType="application/json", ), types.Resource( uri=AnyUrl("finops://cost-trends"), name="Cost Trends", description="Historical cost and query volume trends", mimeType="application/json", ), ] @server.read_resource() @handle_errors async def handle_read_resource(uri: AnyUrl) -> str: """Read FinOps data from the configured data source.""" if not DATA_SOURCE: raise ConnectionError("Data source is not initialized.") if uri.scheme != "finops": raise ValueError(f"Unsupported URI scheme: {uri.scheme}") resource_type = str(uri).replace("finops://", "") data: Dict[str, Any] = {} if resource_type == "cost-summary": data = await DATA_SOURCE.get_cost_summary() elif resource_type == "expensive-queries": data = await DATA_SOURCE.get_expensive_queries() elif resource_type == "project-costs": data = await DATA_SOURCE.get_project_costs() elif resource_type == "cost-trends": data = await DATA_SOURCE.get_cost_trends() else: raise ValueError(f"Unknown resource: {resource_type}") return json.dumps(data, indent=2) @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available FinOps tools.""" return [ types.Tool( name="analyze_query_cost", description="Analyze the cost of a SQL query", inputSchema={ "type": "object", "properties": { "query": {"type": "string", "description": "The SQL query to analyze"}, "dry_run": {"type": "boolean", "description": "Perform a dry run", "default": True}, }, "required": ["query"], }, ), types.Tool( name="get_optimization_recommendations", description="Get optimization recommendations for expensive queries", inputSchema={ "type": "object", "properties": { "days": {"type": "integer", "description": "Number of days to analyze", "default": 7}, "min_cost": {"type": "number", "description": "Minimum cost threshold in USD", "default": 1.0}, }, }, ), types.Tool( name="get_cost_by_user", description="Get cost breakdown by user", inputSchema={ "type": "object", "properties": { "days": {"type": "integer", "description": "Number of days to analyze", "default": 30} }, }, ), types.Tool( name="natural_language_to_sql", description="Translate a natural language question into a SQL query", inputSchema={ "type": "object", "properties": { "question": {"type": "string", "description": "The question to translate"}, }, "required": ["question"], }, ), ] @server.call_tool() @handle_errors async def handle_call_tool(name: str, arguments: dict | None) -> list[types.TextContent]: """Handle tool execution.""" if not DATA_SOURCE: raise ConnectionError("Data source is not initialized.") if arguments is None: arguments = {} result: Dict[str, Any] = {} if name == "analyze_query_cost": result = await DATA_SOURCE.analyze_query_cost( arguments.get("query", ""), arguments.get("dry_run", True) ) elif name == "get_optimization_recommendations": # This is a meta-tool that orchestrates calls to the data source expensive_queries = await DATA_SOURCE.get_expensive_queries() filtered = [ q for q in expensive_queries.get("queries", []) if q.get("cost", 0) >= arguments.get("min_cost", 1.0) ] result = { "total_potential_savings": sum(q.get("potential_savings", 0) for q in filtered), "queries_analyzed": len(filtered), "top_recommendations": [ { "query_id": q.get("id"), "current_cost": q.get("cost"), "optimization": q.get("optimization"), "estimated_savings": q.get("potential_savings"), "severity": q.get("severity"), } for q in sorted(filtered, key=lambda x: x.get("potential_savings", 0), reverse=True)[:10] ], } elif name == "get_cost_by_user": result = await DATA_SOURCE.get_cost_by_user(arguments.get("days", 30)) elif name == "natural_language_to_sql": result = await DATA_SOURCE.natural_language_to_sql(arguments.get("question", "")) else: raise ValueError(f"Unknown tool: {name}") return [types.TextContent(type="text", text=json.dumps(result, indent=2))] # ============================================================================== # Main Entry Point # ============================================================================== async def main(): """Main entry point for the FinOps MCP server.""" global DATA_SOURCE # Determine which data source to use based on the environment variable data_source_type = os.getenv("DATA_SOURCE_TYPE", "bigquery").lower() if data_source_type == "bigquery": project_id = os.getenv("GCP_PROJECT_ID") region = os.getenv("GCP_REGION", "us") DATA_SOURCE = BigQueryDataSource(project_id=project_id, region=region) elif data_source_type == "azuresql": DATA_SOURCE = AzureSQLDataSource() else: raise ValueError(f"Unsupported DATA_SOURCE_TYPE: {data_source_type}") await DATA_SOURCE.connect() # Run the server using stdin/stdout streams async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="azure-bq-mcp-optimiser", server_version="1.0.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/urfanazad/BQ_MCP_OPTIMISER'

If you have feedback or need assistance with the MCP directory API, please join our Discord server