#!/usr/bin/env python3
"""
Azure Pricing MCP Server
A Model Context Protocol server that provides tools for querying Azure retail pricing.
"""
import asyncio
import logging
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Any
import aiohttp
from mcp.server import NotificationOptions, Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Azure Retail Prices API configuration
AZURE_PRICING_BASE_URL = "https://prices.azure.com/api/retail/prices"
DEFAULT_API_VERSION = "2023-01-01-preview"
MAX_RESULTS_PER_REQUEST = 1000
# Retry and rate limiting configuration
MAX_RETRIES = 3
RATE_LIMIT_RETRY_BASE_WAIT = 5 # seconds
DEFAULT_CUSTOMER_DISCOUNT = 10.0 # percent
# VM Retirement status configuration
RETIRED_SIZES_URL = "https://raw.githubusercontent.com/MicrosoftDocs/azure-compute-docs/main/articles/virtual-machines/sizes/retirement/retired-sizes-list.md"
PREVIOUS_GEN_URL = "https://raw.githubusercontent.com/MicrosoftDocs/azure-compute-docs/main/articles/virtual-machines/sizes/previous-gen-sizes-list.md"
RETIREMENT_CACHE_TTL = timedelta(hours=24)
class RetirementStatus(Enum):
"""VM series retirement status levels."""
CURRENT = "current" # Fully supported, not deprecated
PREVIOUS_GEN = "previous_gen" # Newer version available, still supported
RETIREMENT_ANNOUNCED = "retirement_announced" # Has planned retirement date
RETIRED = "retired" # No longer available
@dataclass
class VMSeriesRetirementInfo:
"""Information about a VM series retirement status."""
series_name: str
status: RetirementStatus
retirement_date: str | None = None # e.g., "November 15, 2028"
replacement: str | None = None # e.g., "Lsv3, Lasv3, or Lsv4"
migration_guide_url: str | None = None
# Fallback retirement data when GitHub fetch fails
# Based on Microsoft docs as of January 2026
FALLBACK_RETIREMENT_DATA: dict[str, VMSeriesRetirementInfo] = {
# Storage optimized - retiring
"Lsv2": VMSeriesRetirementInfo(
series_name="Lsv2-series",
status=RetirementStatus.RETIREMENT_ANNOUNCED,
retirement_date="November 15, 2028",
replacement="Lsv3, Lasv3, Lsv4, or Lasv4 series",
migration_guide_url="https://learn.microsoft.com/en-us/azure/virtual-machines/migration/sizes/d-ds-dv2-dsv2-ls-series-migration-guide",
),
"Ls": VMSeriesRetirementInfo(
series_name="Ls-series",
status=RetirementStatus.RETIREMENT_ANNOUNCED,
retirement_date="May 1, 2028",
replacement="Lsv3, Lasv3, Lsv4, or Lasv4 series",
migration_guide_url="https://learn.microsoft.com/en-us/azure/virtual-machines/migration/sizes/d-ds-dv2-dsv2-ls-series-migration-guide",
),
# General purpose - retiring
"Dv2": VMSeriesRetirementInfo(
series_name="Dv2-series",
status=RetirementStatus.RETIREMENT_ANNOUNCED,
retirement_date="May 1, 2028",
replacement="Dv5, Dasv5, or Ddsv5 series",
migration_guide_url="https://learn.microsoft.com/en-us/azure/virtual-machines/migration/sizes/d-ds-dv2-dsv2-ls-series-migration-guide",
),
"Dsv2": VMSeriesRetirementInfo(
series_name="Dsv2-series",
status=RetirementStatus.RETIREMENT_ANNOUNCED,
retirement_date="May 1, 2028",
replacement="Ddsv5 or Ddsv6 series",
migration_guide_url="https://learn.microsoft.com/en-us/azure/virtual-machines/migration/sizes/d-ds-dv2-dsv2-ls-series-migration-guide",
),
"Av2": VMSeriesRetirementInfo(
series_name="Av2-series",
status=RetirementStatus.RETIREMENT_ANNOUNCED,
retirement_date="November 15, 2028",
replacement="Dasv5 or Dadsv5 series",
migration_guide_url="https://learn.microsoft.com/en-us/azure/virtual-machines/migration/sizes/d-ds-dv2-dsv2-ls-series-migration-guide",
),
"Bv1": VMSeriesRetirementInfo(
series_name="B-series (v1)",
status=RetirementStatus.RETIREMENT_ANNOUNCED,
retirement_date="November 15, 2028",
replacement="Bsv2, Basv2, or Bpsv2 series",
migration_guide_url="https://learn.microsoft.com/en-us/azure/virtual-machines/migration/sizes/d-ds-dv2-dsv2-ls-series-migration-guide",
),
# Compute optimized - retiring
"Fsv2": VMSeriesRetirementInfo(
series_name="Fsv2-series",
status=RetirementStatus.RETIREMENT_ANNOUNCED,
retirement_date="November 15, 2028",
replacement="Fasv6 or Falsv6 series",
migration_guide_url="https://learn.microsoft.com/en-us/azure/virtual-machines/migration/sizes/d-ds-dv2-dsv2-ls-series-migration-guide",
),
"Fs": VMSeriesRetirementInfo(
series_name="Fs-series",
status=RetirementStatus.RETIREMENT_ANNOUNCED,
retirement_date="November 15, 2028",
replacement="Fasv6 or Falsv6 series",
migration_guide_url="https://learn.microsoft.com/en-us/azure/virtual-machines/migration/sizes/d-ds-dv2-dsv2-ls-series-migration-guide",
),
# Memory optimized - retiring
"G": VMSeriesRetirementInfo(
series_name="G-series",
status=RetirementStatus.RETIREMENT_ANNOUNCED,
retirement_date="November 15, 2028",
replacement="Ev5 or Edsv5 series",
migration_guide_url="https://learn.microsoft.com/en-us/azure/virtual-machines/migration/sizes/d-ds-dv2-dsv2-ls-series-migration-guide",
),
"Gs": VMSeriesRetirementInfo(
series_name="Gs-series",
status=RetirementStatus.RETIREMENT_ANNOUNCED,
retirement_date="November 15, 2028",
replacement="Edsv5 or Edsv6 series",
migration_guide_url="https://learn.microsoft.com/en-us/azure/virtual-machines/migration/sizes/d-ds-dv2-dsv2-ls-series-migration-guide",
),
# Previous generation - not retiring but newer available
"Edsv4": VMSeriesRetirementInfo(
series_name="Edsv4-series",
status=RetirementStatus.PREVIOUS_GEN,
replacement="Edsv5 or Edsv6 series",
),
"Esv4": VMSeriesRetirementInfo(
series_name="Esv4-series",
status=RetirementStatus.PREVIOUS_GEN,
replacement="Esv5 or Esv6 series",
),
"Ev4": VMSeriesRetirementInfo(
series_name="Ev4-series",
status=RetirementStatus.PREVIOUS_GEN,
replacement="Ev5 or Ev6 series",
),
"Ddsv4": VMSeriesRetirementInfo(
series_name="Ddsv4-series",
status=RetirementStatus.PREVIOUS_GEN,
replacement="Ddsv5 or Ddsv6 series",
),
"Dsv4": VMSeriesRetirementInfo(
series_name="Dsv4-series",
status=RetirementStatus.PREVIOUS_GEN,
replacement="Dsv5 or Dsv6 series",
),
"Dv4": VMSeriesRetirementInfo(
series_name="Dv4-series",
status=RetirementStatus.PREVIOUS_GEN,
replacement="Dv5 or Dv6 series",
),
"Easv4": VMSeriesRetirementInfo(
series_name="Easv4-series",
status=RetirementStatus.PREVIOUS_GEN,
replacement="Easv5 or Easv6 series",
),
"Eav4": VMSeriesRetirementInfo(
series_name="Eav4-series",
status=RetirementStatus.PREVIOUS_GEN,
replacement="Eav5 or Eav6 series",
),
"Esv3": VMSeriesRetirementInfo(
series_name="Esv3-series",
status=RetirementStatus.PREVIOUS_GEN,
replacement="Esv5 or Esv6 series",
),
"Ev3": VMSeriesRetirementInfo(
series_name="Ev3-series",
status=RetirementStatus.PREVIOUS_GEN,
replacement="Ev5 or Ev6 series",
),
}
# Common service name mappings for fuzzy search
# Maps user-friendly terms to official Azure service names
SERVICE_NAME_MAPPINGS = {
# User input -> Correct Azure service name
"app service": "Azure App Service",
"web app": "Azure App Service",
"web apps": "Azure App Service",
"app services": "Azure App Service",
"websites": "Azure App Service",
"web service": "Azure App Service",
"virtual machine": "Virtual Machines",
"vm": "Virtual Machines",
"vms": "Virtual Machines",
"compute": "Virtual Machines",
"storage": "Storage",
"blob": "Storage",
"blob storage": "Storage",
"file storage": "Storage",
"disk": "Storage",
"sql": "Azure SQL Database",
"sql database": "Azure SQL Database",
"database": "Azure SQL Database",
"sql server": "Azure SQL Database",
"cosmos": "Azure Cosmos DB",
"cosmosdb": "Azure Cosmos DB",
"cosmos db": "Azure Cosmos DB",
"document db": "Azure Cosmos DB",
"kubernetes": "Azure Kubernetes Service",
"aks": "Azure Kubernetes Service",
"k8s": "Azure Kubernetes Service",
"container service": "Azure Kubernetes Service",
"functions": "Azure Functions",
"function app": "Azure Functions",
"serverless": "Azure Functions",
"redis": "Azure Cache for Redis",
"cache": "Azure Cache for Redis",
"ai": "Azure AI services",
"cognitive": "Azure AI services",
"cognitive services": "Azure AI services",
"openai": "Azure OpenAI",
"networking": "Virtual Network",
"network": "Virtual Network",
"vnet": "Virtual Network",
"load balancer": "Load Balancer",
"lb": "Load Balancer",
"application gateway": "Application Gateway",
"app gateway": "Application Gateway",
}
def normalize_sku_name(sku_name: str) -> tuple[list[str], str]:
"""
Normalize SKU name to handle different formats and generate search variants.
Azure API uses different formats across VM generations:
- v3, v4: "D4s v3", "D4s v4" (space format)
- v5, v6: "Standard_D4s_v5" (ARM format with prefix)
This function generates multiple search terms to ensure we find the SKU
regardless of input format.
Handles input formats like:
- "D4s v5" -> searches: ["D4s v5", "D4s_v5"]
- "Standard_D4s_v5" -> searches: ["D4s_v5", "D4s v5"]
- "D4s_v5" -> searches: ["D4s_v5", "D4s v5"]
- "D4s" -> searches: ["D4s"]
Returns:
Tuple of (search_terms, display_name):
- search_terms: List of search terms to try (in order of priority)
- display_name: Human-readable format (e.g., "D4s v5")
"""
if not sku_name:
return ([], "")
original = sku_name.strip()
normalized = original
# Remove "Standard_" or "Basic_" prefix if present (ARM format)
prefixes_to_remove = ["Standard_", "Basic_", "standard_", "basic_"]
for prefix in prefixes_to_remove:
if normalized.startswith(prefix):
normalized = normalized[len(prefix) :]
break
# Create display name with spaces
display_name = normalized.replace("_", " ")
# Generate search term variants
search_terms = []
# Variant 1: Underscore format (for v5, v6 SKUs like "Standard_D4s_v5")
underscore_variant = normalized.replace(" ", "_")
if underscore_variant not in search_terms:
search_terms.append(underscore_variant)
# Variant 2: Space format (for v3, v4 SKUs like "D4s v3")
space_variant = normalized.replace("_", " ")
if space_variant not in search_terms:
search_terms.append(space_variant)
# Variant 3: Original normalized (no prefix) - might be useful
if normalized not in search_terms:
search_terms.append(normalized)
return (search_terms, display_name)
class AzurePricingServer:
"""Azure Pricing MCP Server implementation."""
def __init__(self):
self.session: aiohttp.ClientSession | None = None
# Retirement status cache
self._retirement_cache: dict[str, VMSeriesRetirementInfo] | None = None
self._retirement_cache_time: datetime | None = None
async def __aenter__(self):
"""Async context manager entry."""
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self.session:
await self.session.close()
async def _make_request(
self, url: str, params: dict[str, Any] | None = None, max_retries: int = MAX_RETRIES
) -> dict[str, Any]:
"""Make HTTP request to Azure Pricing API with retry logic for rate limiting."""
if not self.session:
raise RuntimeError("HTTP session not initialized")
last_exception = None
for attempt in range(max_retries + 1): # 0, 1, 2, 3 (4 total attempts)
try:
async with self.session.get(url, params=params) as response:
if response.status == 429: # Too Many Requests
if attempt < max_retries:
wait_time = RATE_LIMIT_RETRY_BASE_WAIT * (attempt + 1) # 5, 10, 15 seconds
logger.warning(
f"Rate limited (429). Retrying in {wait_time} seconds... (attempt {attempt + 1}/{max_retries + 1})"
)
await asyncio.sleep(wait_time)
continue
else:
# Last attempt failed, raise the error
response.raise_for_status()
response.raise_for_status()
json_data: dict[str, Any] = await response.json()
return json_data
except aiohttp.ClientResponseError as e:
if e.status == 429 and attempt < max_retries:
wait_time = RATE_LIMIT_RETRY_BASE_WAIT * (attempt + 1)
logger.warning(
f"Rate limited (429). Retrying in {wait_time} seconds... (attempt {attempt + 1}/{max_retries + 1})"
)
await asyncio.sleep(wait_time)
last_exception = e
continue
else:
logger.error(f"HTTP request failed: {e}")
raise
except aiohttp.ClientError as e:
logger.error(f"HTTP request failed: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error during request: {e}")
raise
# If we get here, all retries failed
if last_exception:
raise last_exception
raise RuntimeError("Request failed without exception")
async def _fetch_retirement_data(self) -> dict[str, VMSeriesRetirementInfo]:
"""Fetch VM retirement status data from Microsoft docs on GitHub.
Returns the parsed retirement data, or fallback data on failure.
"""
if not self.session:
logger.warning("HTTP session not initialized, using fallback retirement data")
return FALLBACK_RETIREMENT_DATA.copy()
retirement_data: dict[str, VMSeriesRetirementInfo] = {}
session = self.session # Local reference for type narrowing
try:
# Fetch both markdown files in parallel
async def fetch_text(url: str) -> str:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
return await response.text()
return ""
results = await asyncio.gather(
fetch_text(RETIRED_SIZES_URL),
fetch_text(PREVIOUS_GEN_URL),
return_exceptions=True,
)
retired_result, previous_gen_result = results
# Handle exceptions from gather and ensure string type
retired_md: str = ""
previous_gen_md: str = ""
if isinstance(retired_result, Exception):
logger.warning(f"Failed to fetch retired sizes: {retired_result}")
elif isinstance(retired_result, str):
retired_md = retired_result
if isinstance(previous_gen_result, Exception):
logger.warning(f"Failed to fetch previous-gen sizes: {previous_gen_result}")
elif isinstance(previous_gen_result, str):
previous_gen_md = previous_gen_result
# Parse retired sizes markdown (takes precedence)
if retired_md:
retirement_data.update(self._parse_retired_sizes_md(retired_md))
# Parse previous-gen sizes markdown (only add if not already in retirement_data)
if previous_gen_md:
previous_gen_data = self._parse_previous_gen_md(previous_gen_md)
for key, value in previous_gen_data.items():
if key not in retirement_data:
retirement_data[key] = value
# If we got any data, return it; otherwise use fallback
if retirement_data:
logger.info(f"Fetched retirement data for {len(retirement_data)} VM series")
return retirement_data
else:
logger.warning("No retirement data parsed, using fallback")
return FALLBACK_RETIREMENT_DATA.copy()
except Exception as e:
logger.warning(f"Failed to fetch retirement data: {e}, using fallback")
return FALLBACK_RETIREMENT_DATA.copy()
def _parse_retired_sizes_md(self, md_content: str) -> dict[str, VMSeriesRetirementInfo]:
"""Parse the retired-sizes-list.md markdown content."""
result: dict[str, VMSeriesRetirementInfo] = {}
# Match table rows with format:
# | Series name | Retirement Status | Retirement Announcement | Planned Retirement Date | Migration Guide |
# Example: | Lsv2-series | **Announced** | [10/15/25](...) | 11/15/28 | [Migration Guide](...) |
# Split by lines and process each line that looks like a table row
for line in md_content.split("\n"):
line = line.strip()
if not line.startswith("|") or "---" in line:
continue
# Split by | and clean up
parts = [p.strip() for p in line.split("|")]
parts = [p for p in parts if p] # Remove empty strings
if len(parts) < 4:
continue
series_name = parts[0].strip()
status_text = parts[1].strip().lower()
# Skip header rows
if series_name.lower() in ["series name", "series", ""]:
continue
# Get retirement date (usually in column 4, index 3)
retirement_date = parts[3].strip() if len(parts) > 3 else ""
# Determine status - handle **bold** markdown
status_clean = status_text.replace("*", "").strip()
if "retired" in status_clean and "announced" not in status_clean:
status = RetirementStatus.RETIRED
elif "announced" in status_clean:
status = RetirementStatus.RETIREMENT_ANNOUNCED
else:
continue # Skip rows we can't parse
# Extract series key (e.g., "Lsv2-series" -> "Lsv2")
series_key = self._extract_series_key(series_name)
if not series_key:
continue
# Get replacement recommendation based on series
replacement = self._get_replacement_for_series(series_key)
result[series_key] = VMSeriesRetirementInfo(
series_name=series_name if "-series" in series_name else f"{series_name}-series",
status=status,
retirement_date=retirement_date if retirement_date and retirement_date != "-" else None,
replacement=replacement,
migration_guide_url="https://learn.microsoft.com/en-us/azure/virtual-machines/migration/sizes/d-ds-dv2-dsv2-ls-series-migration-guide",
)
return result
def _parse_previous_gen_md(self, md_content: str) -> dict[str, VMSeriesRetirementInfo]:
"""Parse the previous-gen-sizes-list.md markdown content."""
result: dict[str, VMSeriesRetirementInfo] = {}
# Process line by line for previous-gen format
for line in md_content.split("\n"):
line = line.strip()
if not line.startswith("|") or "---" in line:
continue
# Split by | and clean up
parts = [p.strip() for p in line.split("|")]
parts = [p for p in parts if p] # Remove empty strings
if len(parts) < 2:
continue
series_name = parts[0].strip()
status_text = parts[1].strip().lower()
# Skip header rows
if series_name.lower() in ["series name", "series", "", "replacement series"]:
continue
# Extract just the link text part (inside []) for checking
# Status like "[Capacity limited](url)" - we want just "capacity limited"
link_match = re.search(r"\[([^\]]+)\]", status_text)
status_check = link_match.group(1).lower() if link_match else status_text
# Skip if this is a "retirement announced" entry - covered in retired sizes
if "retirement announced" in status_check:
continue
# Check if it's a previous-gen indicator
if "next-gen available" not in status_check and "capacity limited" not in status_check:
continue
# Handle "X and Y-series" format - create entries for both
series_keys = self._extract_all_series_keys(series_name)
for series_key in series_keys:
# Skip if already in result (retired takes precedence)
if series_key in result:
continue
replacement = self._get_replacement_for_series(series_key)
result[series_key] = VMSeriesRetirementInfo(
series_name=f"{series_key}-series",
status=RetirementStatus.PREVIOUS_GEN,
replacement=replacement,
)
return result
def _extract_all_series_keys(self, series_name: str) -> list[str]:
"""Extract all series keys from a series name that may contain multiple series.
Examples:
"Edv4 and Edsv4-series" -> ["Edv4", "Edsv4"]
"Ev3 and Esv3-series" -> ["Ev3", "Esv3"]
"Dv2-series" -> ["Dv2"]
"""
results = []
# Remove -series suffix
name = series_name.replace("-series", "").replace("-Series", "").strip()
# Split by " and " or "/"
if " and " in name:
parts = name.split(" and ")
elif "/" in name:
parts = name.split("/")
else:
parts = [name]
for part in parts:
key = self._extract_series_key(part.strip())
if key:
results.append(key)
return results
def _extract_series_key(self, series_name: str) -> str | None:
"""Extract the series key from a series name.
Examples:
"Lsv2-series" -> "Lsv2"
"D-series" -> "D"
"Ev3 and Esv3-series" -> "Ev3" (returns first one)
"Standard_M192idms_v2" -> "M192idms_v2"
"""
# Remove common suffixes
name = series_name.replace("-series", "").replace("-Series", "").strip()
# Handle "X and Y" format - take first one
if " and " in name:
name = name.split(" and ")[0].strip()
# Handle "X/Y" format - take first one
if "/" in name:
name = name.split("/")[0].strip()
# Remove "Standard_" prefix
if name.startswith("Standard_"):
name = name[9:]
# Remove parenthetical notes like "(V1)"
name = re.sub(r"\s*\([^)]*\)", "", name).strip()
return name if name else None
def _get_replacement_for_series(self, series_key: str) -> str | None:
"""Get replacement recommendation for a series."""
# Map of series to recommended replacements
replacements = {
# Storage optimized
"Ls": "Lsv3, Lasv3, Lsv4, or Lasv4 series",
"Lsv2": "Lsv3, Lasv3, Lsv4, or Lasv4 series",
# General purpose
"D": "Dv5, Dasv5, or Ddsv5 series",
"Ds": "Dsv5, Dadsv5, or Ddsv5 series",
"Dv2": "Dv5, Dasv5, or Ddsv5 series",
"Dsv2": "Dsv5, Dadsv5, or Ddsv5 series",
"Dv3": "Dv5 or Dv6 series",
"Dsv3": "Dsv5 or Dsv6 series",
"Dv4": "Dv5 or Dv6 series",
"Dsv4": "Dsv5 or Dsv6 series",
"Ddsv4": "Ddsv5 or Ddsv6 series",
"Dasv4": "Dasv5 or Dasv6 series",
"Dadsv4": "Dadsv5 or Dadsv6 series",
"Av2": "Dasv5 or Dadsv5 series",
"B": "Bsv2, Basv2, or Bpsv2 series",
"Bv1": "Bsv2, Basv2, or Bpsv2 series",
# Compute optimized
"F": "Fasv6 or Falsv6 series",
"Fs": "Fasv6 or Falsv6 series",
"Fsv2": "Fasv6 or Falsv6 series",
# Memory optimized
"E": "Ev5 or Ev6 series",
"Ev3": "Ev5 or Ev6 series",
"Esv3": "Esv5 or Esv6 series",
"Ev4": "Ev5 or Ev6 series",
"Esv4": "Esv5 or Esv6 series",
"Edsv4": "Edsv5 or Edsv6 series",
"Easv4": "Easv5 or Easv6 series",
"Eav4": "Eav5 or Eav6 series",
"G": "Ev5 or Edsv5 series",
"Gs": "Edsv5 or Edsv6 series",
}
return replacements.get(series_key)
def _get_series_from_sku(self, sku_name: str) -> str | None:
"""Extract the VM series identifier from a SKU name.
Examples:
"Standard_L32s_v2" -> "Lsv2"
"L32s v2" -> "Lsv2"
"D4s v3" -> "Dsv3"
"Standard_D4s_v5" -> "Dsv5"
"E32ds v4" -> "Edsv4"
"Standard_E32bds_v5" -> "Ebdsv5"
"""
if not sku_name:
return None
# Normalize: remove Standard_ prefix and convert to consistent format
normalized = sku_name.strip()
for prefix in ["Standard_", "Basic_", "standard_", "basic_"]:
if normalized.startswith(prefix):
normalized = normalized[len(prefix) :]
break
# Replace underscores with spaces for consistent parsing
normalized = normalized.replace("_", " ")
# Pattern to extract series: letter(s) + optional size number + letter suffix(es) + version
# Examples: "L32s v2" -> L + s + v2, "D4s v3" -> D + s + v3, "E32ds v4" -> E + ds + v4
match = re.match(r"^([A-Za-z]+)\d*([a-z]*)\s*v?(\d+)?", normalized, re.IGNORECASE)
if match:
prefix = match.group(1) # e.g., "L", "D", "E"
suffix = match.group(2) or "" # e.g., "s", "ds", "bds"
version = match.group(3) # e.g., "2", "3", "4", "5"
# Build series key
series_key = f"{prefix}{suffix}"
if version:
series_key += f"v{version}"
return series_key
return None
async def _get_retirement_data(self) -> dict[str, VMSeriesRetirementInfo]:
"""Get retirement data, using cache if valid or fetching fresh data."""
now = datetime.now()
# Check if cache is valid
if (
self._retirement_cache is not None
and self._retirement_cache_time is not None
and (now - self._retirement_cache_time) < RETIREMENT_CACHE_TTL
):
return self._retirement_cache
# Fetch fresh data
self._retirement_cache = await self._fetch_retirement_data()
self._retirement_cache_time = now
return self._retirement_cache
async def _check_skus_retirement_status(self, items: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Check retirement status for SKUs in the results.
Returns a list of retirement warnings for any retiring/previous-gen SKUs found.
"""
if not items:
return []
# Get retirement data
retirement_data = await self._get_retirement_data()
# Collect unique series from results
seen_series: set[str] = set()
warnings: list[dict[str, Any]] = []
for item in items:
sku_name = item.get("skuName") or item.get("armSkuName") or ""
if not sku_name:
continue
series_key = self._get_series_from_sku(sku_name)
if not series_key or series_key in seen_series:
continue
seen_series.add(series_key)
# Check for match in retirement data
retirement_info = self._match_series_to_retirement(series_key, retirement_data)
if retirement_info and retirement_info.status != RetirementStatus.CURRENT:
warning = {
"series_name": retirement_info.series_name,
"status": retirement_info.status.value,
"sku_example": sku_name,
}
if retirement_info.retirement_date:
warning["retirement_date"] = retirement_info.retirement_date
if retirement_info.replacement:
warning["replacement"] = retirement_info.replacement
if retirement_info.migration_guide_url:
warning["migration_guide_url"] = retirement_info.migration_guide_url
warnings.append(warning)
return warnings
def _match_series_to_retirement(
self, series_key: str, retirement_data: dict[str, VMSeriesRetirementInfo]
) -> VMSeriesRetirementInfo | None:
"""Match a series key to retirement data with smart matching.
Matching rules:
1. Exact match (e.g., "Lsv2" matches "Lsv2")
2. Series key without version matches data key (e.g., "Dsv5" won't match "Dsv2")
3. Avoid false positives like "Lsv4" matching "Ls"
"""
# Exact match first
if series_key in retirement_data:
return retirement_data[series_key]
# Extract the base and version from series_key
# e.g., "Lsv2" -> base="Ls", version="v2"
# e.g., "Edsv4" -> base="Eds", version="v4"
match = re.match(r"^([A-Za-z]+)(v\d+)?$", series_key, re.IGNORECASE)
if not match:
return None
version = match.group(2) # e.g., "v2", "v4", or None
# For versioned series, require exact base + version match in retirement data
# e.g., "Lsv2" should match "Lsv2" but not "Ls" or "Lsv3"
if version:
# Already tried exact match above, so if we have a version, no match
# Unless the retirement data has the unversioned base as retiring
# (which would mean ALL versions are retiring - not typical)
# Check if the base+version is in data
if series_key.lower() in [k.lower() for k in retirement_data]:
for k, v in retirement_data.items():
if k.lower() == series_key.lower():
return v
# Don't match versioned series to unversioned retirement entries
# e.g., "Lsv4" should NOT match "Ls" entry
return None
# For unversioned series (rare in practice), try exact match only
return None
async def search_azure_prices(
self,
service_name: str | None = None,
service_family: str | None = None,
region: str | None = None,
sku_name: str | None = None,
price_type: str | None = None,
currency_code: str = "USD",
limit: int = 50,
discount_percentage: float | None = None,
validate_sku: bool = True,
) -> dict[str, Any]:
"""Search Azure retail prices with various filters, SKU validation, and discount support."""
# Build filter conditions
filter_conditions = []
if service_name:
filter_conditions.append(f"serviceName eq '{service_name}'")
if service_family:
filter_conditions.append(f"serviceFamily eq '{service_family}'")
if region:
filter_conditions.append(f"armRegionName eq '{region}'")
if sku_name:
filter_conditions.append(f"contains(skuName, '{sku_name}')")
if price_type:
filter_conditions.append(f"priceType eq '{price_type}'")
# Construct query parameters
params = {"api-version": DEFAULT_API_VERSION, "currencyCode": currency_code}
if filter_conditions:
params["$filter"] = " and ".join(filter_conditions)
# Limit results
if limit < MAX_RESULTS_PER_REQUEST:
params["$top"] = str(limit)
# Make request
data = await self._make_request(AZURE_PRICING_BASE_URL, params)
# Process results
items = data.get("Items", [])
# If we have more results than requested, truncate
if len(items) > limit:
items = items[:limit]
# SKU validation and clarification
validation_info = {}
if validate_sku and sku_name and not items:
validation_info = await self._validate_and_suggest_skus(service_name, sku_name, currency_code)
elif validate_sku and sku_name and isinstance(items, list) and len(items) > 10:
# Too many results - provide clarification
validation_info["clarification"] = {
"message": f"Found {len(items)} SKUs matching '{sku_name}'. Consider being more specific.",
"suggestions": [item.get("skuName") for item in items[:5] if item and item.get("skuName")],
}
# Apply discount if provided
if discount_percentage is not None and discount_percentage > 0 and isinstance(items, list):
items = self._apply_discount_to_items(items, discount_percentage)
# Check retirement status for VM SKUs
retirement_warnings = []
if items and service_name and "virtual machine" in service_name.lower():
retirement_warnings = await self._check_skus_retirement_status(items)
result = {
"items": items,
"count": len(items) if isinstance(items, list) else 0,
"has_more": bool(data.get("NextPageLink")),
"currency": currency_code,
"filters_applied": filter_conditions,
}
# Add retirement warnings if any
if retirement_warnings:
result["retirement_warnings"] = retirement_warnings
# Add discount info if applied
if discount_percentage is not None and discount_percentage > 0:
result["discount_applied"] = {"percentage": discount_percentage, "note": "Prices shown are after discount"}
# Add validation info if available
if validation_info:
result.update(validation_info)
return result
async def _validate_and_suggest_skus(
self, service_name: str | None, sku_name: str, currency_code: str = "USD"
) -> dict[str, Any]:
"""Validate SKU name and suggest alternatives if not found."""
# Try to find similar SKUs
suggestions = []
if service_name:
# Search for SKUs within the service
broad_search = await self.search_azure_prices(
service_name=service_name, currency_code=currency_code, limit=100, validate_sku=False # Avoid recursion
)
# Find SKUs that partially match
sku_lower = sku_name.lower()
items = broad_search.get("items", [])
if items: # Only process if items exist
for item in items:
item_sku = item.get("skuName")
if not item_sku: # Skip items without SKU names
continue
item_sku_lower = item_sku.lower()
if (
sku_lower in item_sku_lower
or item_sku_lower in sku_lower
or any(word in item_sku_lower for word in sku_lower.split() if word)
):
suggestions.append(
{
"sku_name": item_sku,
"product_name": item.get("productName", "Unknown"),
"price": item.get("retailPrice", 0),
"unit": item.get("unitOfMeasure", "Unknown"),
"region": item.get("armRegionName", "Unknown"),
}
)
# Remove duplicates and limit suggestions
seen_skus = set()
unique_suggestions = []
for suggestion in suggestions:
sku = suggestion["sku_name"]
if sku not in seen_skus:
seen_skus.add(sku)
unique_suggestions.append(suggestion)
if len(unique_suggestions) >= 5:
break
return {
"sku_validation": {
"original_sku": sku_name,
"found": False,
"message": f"SKU '{sku_name}' not found" + (f" in service '{service_name}'" if service_name else ""),
"suggestions": unique_suggestions,
}
}
def _apply_discount_to_items(self, items: list[dict], discount_percentage: float) -> list[dict]:
"""Apply discount percentage to pricing items."""
if not items:
return []
discounted_items = []
for item in items:
discounted_item = item.copy()
# Apply discount to retail price
if "retailPrice" in item and item["retailPrice"]:
original_price = item["retailPrice"]
discounted_price = original_price * (1 - discount_percentage / 100)
discounted_item["retailPrice"] = round(discounted_price, 6)
discounted_item["originalPrice"] = original_price
# Apply discount to savings plans if present
if "savingsPlan" in item and item["savingsPlan"] and isinstance(item["savingsPlan"], list):
discounted_savings = []
for plan in item["savingsPlan"]:
discounted_plan = plan.copy()
if "retailPrice" in plan and plan["retailPrice"]:
original_plan_price = plan["retailPrice"]
discounted_plan_price = original_plan_price * (1 - discount_percentage / 100)
discounted_plan["retailPrice"] = round(discounted_plan_price, 6)
discounted_plan["originalPrice"] = original_plan_price
discounted_savings.append(discounted_plan)
discounted_item["savingsPlan"] = discounted_savings
discounted_items.append(discounted_item)
return discounted_items
async def get_ri_pricing(
self,
service_name: str | None = None,
sku_name: str | None = None,
region: str | None = None,
reservation_term: str | None = None,
currency_code: str = "USD",
compare_on_demand: bool = True,
limit: int = 50,
) -> dict[str, Any]:
"""
Get Reserved Instance pricing and optionally compare with On-Demand.
"""
# 1. Fetch RI Prices
ri_filter = ["priceType eq 'Reservation'"]
if service_name:
ri_filter.append(f"serviceName eq '{service_name}'")
if region:
ri_filter.append(f"armRegionName eq '{region}'")
if sku_name:
ri_filter.append(f"contains(skuName, '{sku_name}')")
params = {
"api-version": DEFAULT_API_VERSION,
"currencyCode": currency_code,
"$filter": " and ".join(ri_filter),
"$top": str(limit),
}
ri_data = await self._make_request(AZURE_PRICING_BASE_URL, params)
ri_items = ri_data.get("Items", [])
# Filter by reservation term if specified (API filtering for this is limited)
if reservation_term:
ri_items = [item for item in ri_items if item.get("reservationTerm") == reservation_term]
result = {
"ri_items": ri_items,
"currency": currency_code,
"count": len(ri_items),
}
# 2. Fetch On-Demand Prices if requested
if compare_on_demand and ri_items:
od_filter = ["priceType eq 'Consumption'"]
if service_name:
od_filter.append(f"serviceName eq '{service_name}'")
if region:
od_filter.append(f"armRegionName eq '{region}'")
if sku_name:
od_filter.append(f"contains(skuName, '{sku_name}')")
od_params = {
"api-version": DEFAULT_API_VERSION,
"currencyCode": currency_code,
"$filter": " and ".join(od_filter),
"$top": str(limit * 2), # Fetch more to ensure coverage
}
od_data = await self._make_request(AZURE_PRICING_BASE_URL, od_params)
od_items = od_data.get("Items", [])
# Calculate savings
comparison = self._calculate_ri_savings(ri_items, od_items)
result["comparison"] = comparison
return result
def _calculate_ri_savings(self, ri_items: list[dict], od_items: list[dict]) -> list[dict]:
"""Calculate savings and break-even for RI vs On-Demand."""
comparison_results = []
# Index On-Demand items by SKU + Region for fast lookup
od_map = {}
for item in od_items:
# Create a key based on SKU and Region
key = (item.get("skuName"), item.get("armRegionName"))
od_map[key] = item
for ri in ri_items:
key = (ri.get("skuName"), ri.get("armRegionName"))
od = od_map.get(key)
if od:
ri_price = ri.get("retailPrice", 0)
od_price = od.get("retailPrice", 0)
term = ri.get("reservationTerm", "")
if od_price > 0:
hours_in_term = 8760 if "1 Year" in term else (26280 if "3 Year" in term else 0)
# Determine effective hourly rate for RI
# The API usually returns the total cost for the reservation term
if hours_in_term > 0 and ri_price > od_price:
# Heuristic: if RI price > OD price, it's the total term cost
ri_hourly = ri_price / hours_in_term
total_ri_cost = ri_price
else:
# It might be already hourly (rare/billing frequency dependent) or just cheaper
# But for safety, let's assume if it's a Reservation, it's the total price
# UNLESS it's very small.
# Actually, $1003 is clearly total.
# Let's use the division logic.
ri_hourly = ri_price / hours_in_term if hours_in_term > 0 else ri_price
total_ri_cost = ri_price
savings_percent = ((od_price - ri_hourly) / od_price) * 100
# Break-even analysis
break_even_months = 0
if total_ri_cost > 0:
monthly_od_cost = od_price * 730
if monthly_od_cost > 0:
break_even_months = total_ri_cost / monthly_od_cost
comparison_results.append(
{
"sku": ri.get("skuName"),
"region": ri.get("armRegionName"),
"term": term,
"ri_hourly": round(ri_hourly, 5),
"od_hourly": od_price,
"savings_percentage": round(savings_percent, 2),
"break_even_months": round(break_even_months, 1) if break_even_months else None,
"annual_savings": round((od_price - ri_hourly) * 8760, 2), # Annualized savings
}
)
return comparison_results
async def get_customer_discount(self, customer_id: str | None = None) -> dict[str, Any]:
"""Get customer discount information. Currently returns 10% default discount for all customers."""
# For now, return a default discount for all customers
# In the future, this could be enhanced to query a customer database
return {
"customer_id": customer_id or "default",
"discount_percentage": DEFAULT_CUSTOMER_DISCOUNT,
"discount_type": "standard",
"description": "Standard customer discount",
"valid_until": None, # No expiration for standard discount
"applicable_services": "all", # Applies to all Azure services
"note": "This is a default discount applied to all customers. Contact sales for enterprise discounts.",
}
async def compare_prices(
self,
service_name: str,
sku_name: str | None = None,
regions: list[str] | None = None,
currency_code: str = "USD",
discount_percentage: float | None = None,
) -> dict[str, Any]:
"""Compare prices across different regions or SKUs."""
comparisons = []
if regions and isinstance(regions, list):
# Compare across regions
for region in regions:
try:
result = await self.search_azure_prices(
service_name=service_name,
sku_name=sku_name,
region=region,
currency_code=currency_code,
limit=10,
)
if result["items"]:
# Get the first item for comparison
item = result["items"][0]
comparisons.append(
{
"region": region,
"sku_name": item.get("skuName"),
"retail_price": item.get("retailPrice"),
"unit_of_measure": item.get("unitOfMeasure"),
"product_name": item.get("productName"),
"meter_name": item.get("meterName"),
}
)
except Exception as e:
logger.warning(f"Failed to get prices for region {region}: {e}")
else:
# Compare different SKUs within the same service
result = await self.search_azure_prices(service_name=service_name, currency_code=currency_code, limit=20)
# Group by SKU
sku_prices = {}
items = result.get("items", [])
for item in items:
sku = item.get("skuName")
if sku and sku not in sku_prices:
sku_prices[sku] = {
"sku_name": sku,
"retail_price": item.get("retailPrice"),
"unit_of_measure": item.get("unitOfMeasure"),
"product_name": item.get("productName"),
"region": item.get("armRegionName"),
"meter_name": item.get("meterName"),
}
comparisons = list(sku_prices.values())
# Apply discount if provided
if discount_percentage is not None and discount_percentage > 0:
for comparison in comparisons:
if "retail_price" in comparison and comparison["retail_price"]:
original_price = comparison["retail_price"]
discounted_price = original_price * (1 - discount_percentage / 100)
comparison["retail_price"] = round(discounted_price, 6)
comparison["original_price"] = original_price
# Sort by price
comparisons.sort(key=lambda x: x.get("retail_price", 0))
result = {
"comparisons": comparisons,
"service_name": service_name,
"currency": currency_code,
"comparison_type": "regions" if regions else "skus",
}
# Add discount info if applied
if discount_percentage is not None and discount_percentage > 0:
result["discount_applied"] = {"percentage": discount_percentage, "note": "Prices shown are after discount"}
return result
async def recommend_regions(
self,
service_name: str,
sku_name: str,
top_n: int = 10,
currency_code: str = "USD",
discount_percentage: float | None = None,
) -> dict[str, Any]:
"""
Recommend the cheapest Azure regions for a given service and SKU.
Dynamically discovers all available regions, fetches pricing in parallel batches,
and returns ranked recommendations sorted by price.
Args:
service_name: Azure service name (e.g., 'Virtual Machines')
sku_name: SKU name to price across regions. Supports multiple formats:
- "D4s v5" (display format)
- "Standard_D4s_v5" (ARM format)
- "D4s_v5" (underscore format)
top_n: Number of top recommendations to return (default: 10)
currency_code: Currency for pricing (default: USD)
discount_percentage: Optional discount to apply to all prices
Returns:
Dict with ranked region recommendations and pricing details
"""
# Normalize the SKU name to handle different input formats
# Returns list of search variants and a display name
search_terms, display_sku = normalize_sku_name(sku_name)
# Step 1: Discover all regions where this SKU is available
# Try each search term variant until we get results
discovery_result: dict[str, Any] = {"items": []}
for search_term in search_terms:
discovery_result = await self.search_azure_prices(
service_name=service_name,
sku_name=search_term,
currency_code=currency_code,
limit=500, # Get many results to discover regions
validate_sku=False, # Skip validation for discovery
)
if discovery_result.get("items"):
break
if not discovery_result["items"]:
return {
"error": f"No pricing found for {display_sku} in service {service_name}",
"service_name": service_name,
"sku_name": display_sku,
"sku_input": sku_name,
"search_terms_tried": search_terms,
"recommendations": [],
}
# Extract unique regions with non-zero prices
# Track both On-Demand and Spot pricing separately per region
region_data: dict[str, dict[str, Any]] = {}
spot_data: dict[str, dict[str, Any]] = {} # Track Spot pricing separately
for item in discovery_result["items"]:
region = item.get("armRegionName")
price = item.get("retailPrice", 0)
location = item.get("location", region)
sku_name_item = item.get("skuName", "")
meter_name = item.get("meterName", "")
# Determine pricing type from SKU/meter name
is_spot = "Spot" in sku_name_item or "Spot" in meter_name
is_low_priority = "Low Priority" in sku_name_item or "Low Priority" in meter_name
# Filter out items with $0 prices (preview/unavailable)
if region and price and price > 0:
item_data = {
"region": region,
"location": location,
"retail_price": price,
"sku_name": item.get("skuName"),
"product_name": item.get("productName"),
"unit_of_measure": item.get("unitOfMeasure"),
"meter_name": item.get("meterName"),
}
if is_spot or is_low_priority:
# Track Spot/Low Priority pricing separately
pricing_type = "Spot" if is_spot else "Low Priority"
if region not in spot_data or price < spot_data[region]["retail_price"]:
spot_data[region] = {**item_data, "pricing_type": pricing_type}
else:
# On-Demand pricing
if region not in region_data or price < region_data[region]["retail_price"]:
region_data[region] = {**item_data, "pricing_type": "On-Demand"}
# Merge Spot pricing info into On-Demand entries
for region, on_demand in region_data.items():
if region in spot_data:
spot = spot_data[region]
on_demand["spot_price"] = spot["retail_price"]
on_demand["spot_sku_name"] = spot["sku_name"]
if not region_data:
return {
"error": f"No regions with valid pricing found for {display_sku}",
"service_name": service_name,
"sku_name": display_sku,
"sku_input": sku_name,
"recommendations": [],
}
# Step 2: If we need more detailed pricing, fetch in parallel batches
# For now, we already have pricing from discovery - use it directly
recommendations = list(region_data.values())
# Step 3: Apply discount if provided
if discount_percentage is not None and discount_percentage > 0:
for rec in recommendations:
original_price = rec["retail_price"]
discounted_price = original_price * (1 - discount_percentage / 100)
rec["original_price"] = original_price
rec["retail_price"] = round(discounted_price, 6)
# Step 4: Sort by price (cheapest first)
recommendations.sort(key=lambda x: x.get("retail_price", float("inf")))
# Step 5: Calculate savings vs most expensive region
if recommendations:
max_price = max(r.get("retail_price", 0) for r in recommendations)
for rec in recommendations:
price = rec.get("retail_price", 0)
if max_price > 0:
savings_vs_max = ((max_price - price) / max_price) * 100
rec["savings_vs_most_expensive"] = round(savings_vs_max, 2)
else:
rec["savings_vs_most_expensive"] = 0.0
# Step 6: Limit to top N recommendations
top_recommendations = recommendations[:top_n]
# Build result
result: dict[str, Any] = {
"service_name": service_name,
"sku_name": display_sku,
"sku_input": sku_name, # Original input for transparency
"currency": currency_code,
"total_regions_found": len(recommendations),
"showing_top": min(top_n, len(recommendations)),
"recommendations": top_recommendations,
}
# Add summary statistics
if recommendations:
result["summary"] = {
"cheapest_region": recommendations[0]["region"],
"cheapest_location": recommendations[0]["location"],
"cheapest_price": recommendations[0]["retail_price"],
"most_expensive_region": recommendations[-1]["region"],
"most_expensive_location": recommendations[-1]["location"],
"most_expensive_price": recommendations[-1]["retail_price"],
"max_savings_percentage": recommendations[0].get("savings_vs_most_expensive", 0),
}
# Add discount info if applied
if discount_percentage is not None and discount_percentage > 0:
result["discount_applied"] = {
"percentage": discount_percentage,
"note": "Prices shown are after discount",
}
return result
async def estimate_costs(
self,
service_name: str,
sku_name: str,
region: str,
hours_per_month: float = 730, # Default to full month
currency_code: str = "USD",
discount_percentage: float | None = None,
) -> dict[str, Any]:
"""Estimate monthly costs based on usage."""
# Get pricing information
result = await self.search_azure_prices(
service_name=service_name, sku_name=sku_name, region=region, currency_code=currency_code, limit=5
)
if not result["items"]:
return {
"error": f"No pricing found for {sku_name} in {region}",
"service_name": service_name,
"sku_name": sku_name,
"region": region,
}
item = result["items"][0]
hourly_rate = item.get("retailPrice", 0)
# Apply discount if provided
if discount_percentage is not None and discount_percentage > 0:
original_hourly_rate = hourly_rate
hourly_rate = hourly_rate * (1 - discount_percentage / 100)
# Calculate estimates
monthly_cost = hourly_rate * hours_per_month
daily_cost = hourly_rate * 24
yearly_cost = monthly_cost * 12
# Check for savings plans
savings_plans = item.get("savingsPlan", [])
savings_estimates = []
for plan in savings_plans:
plan_hourly = plan.get("retailPrice", 0)
# Apply discount to savings plan prices too
if discount_percentage is not None and discount_percentage > 0:
original_plan_hourly = plan_hourly
plan_hourly = plan_hourly * (1 - discount_percentage / 100)
plan_monthly = plan_hourly * hours_per_month
plan_yearly = plan_monthly * 12
savings_percent = ((hourly_rate - plan_hourly) / hourly_rate) * 100 if hourly_rate > 0 else 0
plan_data = {
"term": plan.get("term"),
"hourly_rate": round(plan_hourly, 6),
"monthly_cost": round(plan_monthly, 2),
"yearly_cost": round(plan_yearly, 2),
"savings_percent": round(savings_percent, 2),
"annual_savings": round((yearly_cost - plan_yearly), 2),
}
# Add original prices if discount was applied
if discount_percentage is not None and discount_percentage > 0:
plan_data["original_hourly_rate"] = original_plan_hourly
plan_data["original_monthly_cost"] = round(original_plan_hourly * hours_per_month, 2)
plan_data["original_yearly_cost"] = round(original_plan_hourly * hours_per_month * 12, 2)
savings_estimates.append(plan_data)
result = {
"service_name": service_name,
"sku_name": item.get("skuName"),
"region": region,
"product_name": item.get("productName"),
"unit_of_measure": item.get("unitOfMeasure"),
"currency": currency_code,
"on_demand_pricing": {
"hourly_rate": round(hourly_rate, 6),
"daily_cost": round(daily_cost, 2),
"monthly_cost": round(monthly_cost, 2),
"yearly_cost": round(yearly_cost, 2),
},
"usage_assumptions": {
"hours_per_month": hours_per_month,
"hours_per_day": round(hours_per_month / 30.44, 2), # Average days per month
},
"savings_plans": savings_estimates,
}
# Add discount info and original prices if discount was applied
if discount_percentage is not None and discount_percentage > 0:
result["discount_applied"] = {
"percentage": discount_percentage,
"note": "All prices shown are after discount",
}
result["on_demand_pricing"]["original_hourly_rate"] = original_hourly_rate
result["on_demand_pricing"]["original_daily_cost"] = round(original_hourly_rate * 24, 2)
result["on_demand_pricing"]["original_monthly_cost"] = round(original_hourly_rate * hours_per_month, 2)
result["on_demand_pricing"]["original_yearly_cost"] = round(original_hourly_rate * hours_per_month * 12, 2)
return result
async def discover_skus(
self, service_name: str, region: str | None = None, price_type: str = "Consumption", limit: int = 100
) -> dict[str, Any]:
"""Discover available SKUs for a specific Azure service."""
# Build filter conditions
filter_conditions = [f"serviceName eq '{service_name}'"]
if region:
filter_conditions.append(f"armRegionName eq '{region}'")
if price_type:
filter_conditions.append(f"priceType eq '{price_type}'")
# Construct query parameters
params = {"api-version": DEFAULT_API_VERSION, "currencyCode": "USD"}
if filter_conditions:
params["$filter"] = " and ".join(filter_conditions)
# Limit results
if limit < MAX_RESULTS_PER_REQUEST:
params["$top"] = str(limit)
# Make request
data = await self._make_request(AZURE_PRICING_BASE_URL, params)
# Process and deduplicate SKUs
skus = {}
items = data.get("Items", [])
for item in items:
sku_name = item.get("skuName")
arm_sku_name = item.get("armSkuName")
product_name = item.get("productName")
region = item.get("armRegionName")
price = item.get("retailPrice", 0)
unit = item.get("unitOfMeasure")
meter_name = item.get("meterName")
if sku_name and sku_name not in skus:
skus[sku_name] = {
"sku_name": sku_name,
"arm_sku_name": arm_sku_name,
"product_name": product_name,
"sample_price": price,
"unit_of_measure": unit,
"meter_name": meter_name,
"sample_region": region,
"available_regions": [region] if region else [],
}
elif sku_name and region and region not in skus[sku_name]["available_regions"]:
# Add region to existing SKU
skus[sku_name]["available_regions"].append(region)
# Convert to list and sort by SKU name
sku_list = list(skus.values())
sku_list.sort(key=lambda x: x["sku_name"])
return {
"service_name": service_name,
"skus": sku_list,
"total_skus": len(sku_list),
"price_type": price_type,
"region_filter": region,
}
async def search_azure_prices_with_fuzzy_matching(
self,
service_name: str | None = None,
service_family: str | None = None,
region: str | None = None,
sku_name: str | None = None,
price_type: str | None = None,
currency_code: str = "USD",
limit: int = 50,
suggest_alternatives: bool = True,
) -> dict[str, Any]:
"""
Search Azure retail prices with fuzzy matching and suggestions.
If exact matches aren't found, suggests similar services.
"""
# First try exact search
exact_result = await self.search_azure_prices(
service_name=service_name,
service_family=service_family,
region=region,
sku_name=sku_name,
price_type=price_type,
currency_code=currency_code,
limit=limit,
)
# If we got results, return them
if exact_result["items"]:
return exact_result
# If no results and suggest_alternatives is True, try fuzzy matching
if suggest_alternatives and (service_name or service_family):
return await self._find_similar_services(
service_name=service_name, service_family=service_family, currency_code=currency_code, limit=limit
)
return exact_result
async def _find_similar_services(
self,
service_name: str | None = None,
service_family: str | None = None,
currency_code: str = "USD",
limit: int = 50,
) -> dict[str, Any]:
"""Find services with similar names or suggest alternatives."""
suggestions = []
search_term = service_name.lower() if service_name else ""
# Try exact mapping first
if search_term in SERVICE_NAME_MAPPINGS:
correct_name = SERVICE_NAME_MAPPINGS[search_term]
result = await self.search_azure_prices(service_name=correct_name, currency_code=currency_code, limit=limit)
if result["items"]:
result["suggestion_used"] = correct_name
result["original_search"] = service_name
result["match_type"] = "exact_mapping"
return result
# Try partial matching for common terms
partial_matches = []
for user_term, azure_service in SERVICE_NAME_MAPPINGS.items():
if search_term in user_term or user_term in search_term:
partial_matches.append(azure_service)
# Remove duplicates and try each match
for azure_service in list(set(partial_matches)):
result = await self.search_azure_prices(service_name=azure_service, currency_code=currency_code, limit=5)
if result["items"]:
suggestions.append(
{
"service_name": azure_service,
"match_reason": f"Partial match for '{service_name}'",
"sample_items": result["items"][:3],
}
)
# If still no matches, do a broad search and look for similar services
if not suggestions:
broad_result = await self.search_azure_prices(
service_family=service_family, currency_code=currency_code, limit=100
)
# Find services that contain the search term
matching_services = set()
for item in broad_result.get("items", []):
service = item.get("serviceName", "")
product = item.get("productName", "")
if (
search_term in service.lower()
or search_term in product.lower()
or any(word in service.lower() for word in search_term.split())
):
matching_services.add(service)
# Create suggestions from found services
for service in list(matching_services)[:5]: # Limit to top 5
service_result = await self.search_azure_prices(
service_name=service, currency_code=currency_code, limit=3
)
if service_result["items"]:
suggestions.append(
{
"service_name": service,
"match_reason": f"Contains '{search_term}'",
"sample_items": service_result["items"][:2],
}
)
return {
"items": [],
"count": 0,
"has_more": False,
"currency": currency_code,
"original_search": service_name or service_family,
"suggestions": suggestions,
"match_type": "suggestions_only",
}
async def discover_service_skus(
self, service_hint: str, region: str | None = None, currency_code: str = "USD", limit: int = 30
) -> dict[str, Any]:
"""
Discover SKUs for a service with intelligent service name matching.
Args:
service_hint: User's description of the service (e.g., "app service", "web app")
region: Optional specific region to filter by
currency_code: Currency for pricing
limit: Maximum number of results
"""
# Use fuzzy matching to find the right service
result = await self.search_azure_prices_with_fuzzy_matching(
service_name=service_hint, region=region, currency_code=currency_code, limit=limit
)
# If we found exact matches, process SKUs
if result["items"]:
skus = {}
service_used = result.get("suggestion_used", service_hint)
for item in result["items"]:
sku_name = item.get("skuName", "Unknown")
arm_sku = item.get("armSkuName", "Unknown")
product = item.get("productName", "Unknown")
price = item.get("retailPrice", 0)
unit = item.get("unitOfMeasure", "Unknown")
item_region = item.get("armRegionName", "Unknown")
if sku_name not in skus:
skus[sku_name] = {
"sku_name": sku_name,
"arm_sku_name": arm_sku,
"product_name": product,
"prices": [],
"regions": set(),
}
skus[sku_name]["prices"].append({"price": price, "unit": unit, "region": item_region})
skus[sku_name]["regions"].add(item_region)
# Convert sets to lists for JSON serialization
for sku_data in skus.values():
sku_data["regions"] = list(sku_data["regions"])
# Keep only the cheapest price for summary - handle empty sequences safely
if not sku_data["prices"]:
sku_data["min_price"] = 0
sku_data["sample_unit"] = "Unknown"
else:
valid_prices = [p["price"] for p in sku_data["prices"] if p.get("price", 0) > 0]
if valid_prices:
sku_data["min_price"] = min(valid_prices)
else:
# If no valid prices > 0, use the first price (even if 0)
sku_data["min_price"] = sku_data["prices"][0].get("price", 0)
sku_data["sample_unit"] = sku_data["prices"][0].get("unit", "Unknown")
return {
"service_found": service_used,
"original_search": service_hint,
"skus": skus,
"total_skus": len(skus),
"currency": currency_code,
"match_type": result.get("match_type", "exact"),
}
# If no exact matches, return suggestions
return {
"service_found": None,
"original_search": service_hint,
"skus": {},
"total_skus": 0,
"currency": currency_code,
"suggestions": result.get("suggestions", []),
"match_type": "no_match",
}
def create_server() -> Server:
"""Create and configure the MCP server instance."""
server = Server("azure-pricing")
pricing_server = AzurePricingServer()
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""List available tools."""
return [
Tool(
name="azure_price_search",
description="Search Azure retail prices with various filters",
inputSchema={
"type": "object",
"properties": {
"service_name": {
"type": "string",
"description": "Azure service name (e.g., 'Virtual Machines', 'Storage')",
},
"service_family": {
"type": "string",
"description": "Service family (e.g., 'Compute', 'Storage', 'Networking')",
},
"region": {"type": "string", "description": "Azure region (e.g., 'eastus', 'westeurope')"},
"sku_name": {
"type": "string",
"description": "SKU name to search for (partial matches supported)",
},
"price_type": {
"type": "string",
"description": "Price type: 'Consumption', 'Reservation', or 'DevTestConsumption'",
},
"currency_code": {
"type": "string",
"description": "Currency code (default: USD)",
"default": "USD",
},
"limit": {
"type": "integer",
"description": "Maximum number of results (default: 50)",
"default": 50,
},
"discount_percentage": {
"type": "number",
"description": "Discount percentage to apply to prices (e.g., 10 for 10% discount)",
},
"validate_sku": {
"type": "boolean",
"description": "Whether to validate SKU names and provide suggestions (default: true)",
"default": True,
},
},
},
),
Tool(
name="azure_price_compare",
description="Compare Azure prices across regions or SKUs",
inputSchema={
"type": "object",
"properties": {
"service_name": {"type": "string", "description": "Azure service name to compare"},
"sku_name": {"type": "string", "description": "Specific SKU to compare (optional)"},
"regions": {
"type": "array",
"items": {"type": "string"},
"description": "List of regions to compare (if not provided, compares SKUs)",
},
"currency_code": {
"type": "string",
"description": "Currency code (default: USD)",
"default": "USD",
},
"discount_percentage": {
"type": "number",
"description": "Discount percentage to apply to prices (e.g., 10 for 10% discount)",
},
},
"required": ["service_name"],
},
),
Tool(
name="azure_cost_estimate",
description="Estimate Azure costs based on usage patterns",
inputSchema={
"type": "object",
"properties": {
"service_name": {"type": "string", "description": "Azure service name"},
"sku_name": {"type": "string", "description": "SKU name"},
"region": {"type": "string", "description": "Azure region"},
"hours_per_month": {
"type": "number",
"description": "Expected hours of usage per month (default: 730 for full month)",
"default": 730,
},
"currency_code": {
"type": "string",
"description": "Currency code (default: USD)",
"default": "USD",
},
"discount_percentage": {
"type": "number",
"description": "Discount percentage to apply to prices (e.g., 10 for 10% discount)",
},
},
"required": ["service_name", "sku_name", "region"],
},
),
Tool(
name="azure_discover_skus",
description="Discover available SKUs for a specific Azure service",
inputSchema={
"type": "object",
"properties": {
"service_name": {"type": "string", "description": "Azure service name"},
"region": {"type": "string", "description": "Azure region (optional)"},
"price_type": {
"type": "string",
"description": "Price type (default: 'Consumption')",
"default": "Consumption",
},
"limit": {
"type": "integer",
"description": "Maximum number of SKUs to return (default: 100)",
"default": 100,
},
},
"required": ["service_name"],
},
),
Tool(
name="azure_sku_discovery",
description="Discover available SKUs for Azure services with intelligent name matching",
inputSchema={
"type": "object",
"properties": {
"service_hint": {
"type": "string",
"description": "Service name or description (e.g., 'app service', 'web app', 'vm', 'storage'). Supports fuzzy matching.",
},
"region": {"type": "string", "description": "Optional Azure region to filter results"},
"currency_code": {
"type": "string",
"description": "Currency code (default: USD)",
"default": "USD",
},
"limit": {
"type": "integer",
"description": "Maximum number of results (default: 30)",
"default": 30,
},
},
"required": ["service_hint"],
},
),
Tool(
name="azure_region_recommend",
description="Find the cheapest Azure regions for a given service and SKU. Dynamically discovers all available regions, compares prices, and returns ranked recommendations with savings percentages.",
inputSchema={
"type": "object",
"properties": {
"service_name": {
"type": "string",
"description": "Azure service name (e.g., 'Virtual Machines', 'Azure App Service')",
},
"sku_name": {
"type": "string",
"description": "SKU name to price across regions (e.g., 'D4s v3', 'P1v3')",
},
"top_n": {
"type": "integer",
"description": "Number of top recommendations to return (default: 10)",
"default": 10,
},
"currency_code": {
"type": "string",
"description": "Currency code (default: USD)",
"default": "USD",
},
"discount_percentage": {
"type": "number",
"description": "Discount percentage to apply to prices (e.g., 10 for 10% discount)",
},
},
"required": ["service_name", "sku_name"],
},
),
Tool(
name="azure_ri_pricing",
description="Get Reserved Instance pricing and savings analysis",
inputSchema={
"type": "object",
"properties": {
"service_name": {
"type": "string",
"description": "Azure service name (e.g., 'Virtual Machines')",
},
"sku_name": {
"type": "string",
"description": "SKU name (e.g., 'D4s v3')",
},
"region": {
"type": "string",
"description": "Azure region (e.g., 'eastus')",
},
"reservation_term": {
"type": "string",
"description": "Reservation term ('1 Year' or '3 Years')",
"enum": ["1 Year", "3 Years"],
},
"currency_code": {
"type": "string",
"description": "Currency code (default: USD)",
"default": "USD",
},
"compare_on_demand": {
"type": "boolean",
"description": "Compare with On-Demand prices to calculate savings (default: true)",
"default": True,
},
"limit": {
"type": "integer",
"description": "Maximum number of results (default: 50)",
"default": 50,
},
},
"required": ["service_name"],
},
),
Tool(
name="get_customer_discount",
description="Get customer discount information. Returns default 10% discount for all customers.",
inputSchema={
"type": "object",
"properties": {
"customer_id": {
"type": "string",
"description": "Customer ID (optional, defaults to 'default' customer)",
}
},
},
),
]
# Import and register the tool handler
from .handlers import register_tool_handlers
register_tool_handlers(server, pricing_server)
return server
async def main():
"""Main entry point for the server."""
import argparse
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Azure Pricing MCP Server")
parser.add_argument(
"--transport",
choices=["stdio", "http"],
default="stdio",
help="Transport type: stdio (for local MCP clients) or http (for remote access)",
)
parser.add_argument(
"--host", default="127.0.0.1", help="Host to bind HTTP server (default: 127.0.0.1, use 0.0.0.0 for Docker)"
)
parser.add_argument("--port", type=int, default=8080, help="Port for HTTP server (default: 8080)")
# Only parse known args to avoid issues with MCP passing additional args
args, _ = parser.parse_known_args()
server = create_server()
if args.transport == "http":
# Use HTTP transport for remote access (Docker use case)
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Mount, Route
logger.info(f"Starting HTTP MCP server on {args.host}:{args.port}")
# Create SSE transport
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request):
async with sse.connect_sse(request.scope, request.receive, request._send) as streams:
initialization_options = server.create_initialization_options(
notification_options=NotificationOptions(tools_changed=True)
)
await server.run(streams[0], streams[1], initialization_options)
return Response()
app = Starlette(
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
]
)
import uvicorn
config = uvicorn.Config(app, host=args.host, port=args.port, log_level="info")
server_instance = uvicorn.Server(config)
await server_instance.serve()
else:
# Use stdio transport for local MCP clients (VS Code, Claude Desktop)
logger.info("Starting stdio MCP server")
async with stdio_server() as (read_stream, write_stream):
initialization_options = server.create_initialization_options(
notification_options=NotificationOptions(tools_changed=True)
)
await server.run(read_stream, write_stream, initialization_options)