server.py•28.8 kB
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
import asyncio
from typing import List, Dict, Any
import sys
import logging
from dotenv import load_dotenv
from .config import AzureConfig
from .arg import execute_kql
from .nl2kql import generate_kql
from .kql_loader import load_kql_template
from .queries import (
list_subscriptions_kql,
list_resource_groups_kql,
untagged_resource_groups_kql,
advisor_recommendations_kql,
health_advisories_kql,
policy_noncompliant_kql,
resource_changes_recent_kql,
resource_container_changes_recent_kql,
manual_changes_kql,
vm_count_kql,
)
server = Server("azure-assistant-mcp")
load_dotenv()
# Keep stdout clean for MCP JSON; send logs/warnings to stderr
logging.basicConfig(stream=sys.stderr, level=logging.WARNING)
logging.getLogger("azure").setLevel(logging.ERROR)
try:
AZURE_CONFIG = AzureConfig()
except FileNotFoundError as e:
print(f"Warning: {e}", file=sys.stderr)
AZURE_CONFIG = None
def _enumerate_subscriptions_for_credential(credential) -> List[str]:
"""Return list of subscription IDs accessible by the given credential.
Falls back to empty list if enumeration fails or dependency is missing.
"""
try:
from azure.mgmt.subscription import SubscriptionClient # type: ignore
except Exception:
return []
try:
client = SubscriptionClient(credential=credential)
return [s.subscription_id for s in client.subscriptions.list()]
except Exception:
return []
def _normalize(s: str) -> str:
return "".join(ch.lower() for ch in s if ch.isalnum())
def _initialism(s: str) -> str:
caps = [c for c in s if c.isupper()]
return "".join(caps).lower()
def _guess_tenant_name_from_text(text: str) -> str | None:
"""Heuristically guess a tenant by matching configured names and initialisms.
Examples: 'Fabrikam' -> 'fabrikam', initialism 'fab'. Matches on containment.
"""
if not text:
return None
tnorm = _normalize(text)
for t in AZURE_CONFIG.get_tenants():
name = t.get("name") or t.get("id") or ""
n_norm = _normalize(name)
n_init = _initialism(name)
# Check direct name or initialism presence
if n_norm and n_norm in tnorm:
return t.get("name") or t.get("id")
if n_init and n_init in tnorm:
return t.get("name") or t.get("id")
return None
try:
client = SubscriptionClient(credential=credential)
return [s.subscription_id for s in client.subscriptions.list()]
except Exception:
return []
@server.list_tools()
async def list_tools() -> List[types.Tool]:
tools: List[types.Tool] = [
types.Tool(
name="ask-azure",
description="Answer a question by generating and running an Azure Resource Graph KQL query.",
inputSchema={
"type": "object",
"properties": {
"question": {"type": "string", "description": "Your natural-language question about Azure resources"},
"tenant_name": {"type": "string", "description": "Optional configured tenant name"},
"subscription_ids": {"type": "array", "items": {"type": "string"}, "description": "Optional explicit subscription IDs"},
"use_all_subscriptions": {"type": "boolean", "description": "If no subscriptions are provided, attempt to auto-discover all accessible subscriptions (default: true)", "default": True},
"auto_execute": {"type": "boolean", "description": "Execute the generated KQL automatically (default: true)", "default": True}
},
"required": ["question"],
},
),
types.Tool(
name="list-tenants",
description="List configured tenants from azure-config.json, including optional management group and default subscription info.",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
types.Tool(
name="run-arg-kql",
description="Run a provided KQL query against Azure Resource Graph.",
inputSchema={
"type": "object",
"properties": {
"kql_query": {"type": "string", "description": "KQL to execute (must reference valid ARG tables)"},
"tenant_name": {"type": "string", "description": "Optional configured tenant name"},
"subscription_ids": {"type": "array", "items": {"type": "string"}, "description": "Optional explicit subscription IDs"},
"use_all_subscriptions": {"type": "boolean", "description": "If no subscriptions are provided, attempt to auto-discover all accessible subscriptions (default: true)", "default": True},
"top": {"type": "integer", "description": "Max rows to return (default: 100)", "default": 100}
},
"required": ["kql_query"],
},
),
types.Tool(
name="run-kql-template",
description="Execute a KQL template from kql/ by name, with optional {{param}} replacements.",
inputSchema={
"type": "object",
"properties": {
"template_name": {"type": "string", "description": "Template filename without extension (looks for .md or .kql in kql folder)"},
"params": {"type": "object", "description": "Optional key/value replacements for {{key}} placeholders in the template"},
"tenant_name": {"type": "string", "description": "Optional configured tenant name"},
"subscription_ids": {"type": "array", "items": {"type": "string"}, "description": "Optional explicit subscription IDs"},
"use_all_subscriptions": {"type": "boolean", "description": "If no subscriptions are provided, attempt to auto-discover all accessible subscriptions (default: true)", "default": True},
"top": {"type": "integer", "description": "Max rows to return (default: 100)", "default": 100}
},
"required": ["template_name"],
},
),
types.Tool(
name="vm-count-by-tenant",
description="Count virtual machines per configured tenant (uses all subscriptions when available).",
inputSchema={
"type": "object",
"properties": {
"tenant_names": {"type": "array", "items": {"type": "string"}, "description": "Optional subset of tenant names to include"},
"use_all_subscriptions": {"type": "boolean", "description": "Try to include all subscriptions in each tenant (fallbacks to default)", "default": True}
},
"required": [],
},
),
types.Tool(
name="list-subscriptions",
description="List subscriptions accessible to the configured service principal for a tenant.",
inputSchema={
"type": "object",
"properties": {
"tenant_name": {"type": "string", "description": "Optional configured tenant name"}
},
"required": [],
},
),
types.Tool(
name="arg-tables",
description="Overview of common Azure Resource Graph tables, their purpose, and typical use cases.",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
types.Tool(
name="arg-examples",
description="Sample KQL snippets for common scenarios across ARG tables.",
inputSchema={
"type": "object",
"properties": {
"topic": {"type": "string", "description": "Optional topic filter, e.g., 'subscriptions', 'policy', 'advisor', 'health', 'changes', 'resourcegroups'"}
},
"required": [],
},
),
]
# Add diagnostics when debug is enabled in azure-config.json
try:
if AZURE_CONFIG and AZURE_CONFIG.is_debug_enabled():
tools.append(
types.Tool(
name="diagnostics",
description="Diagnostics for scoping: shows resolved tenant, config path, ARM enumeration vs ARG MG coverage.",
inputSchema={
"type": "object",
"properties": {
"tenant_name": {"type": "string", "description": "Optional configured tenant name"}
},
"required": [],
},
)
)
except Exception:
pass
return tools
def _format_rows(rows: List[Dict[str, Any]], limit: int = 10) -> str:
if not rows:
return "No results found."
out = []
for i, item in enumerate(rows[:limit], 1):
# Try a few common fields first
name = item.get("name")
typ = item.get("type")
rg = item.get("resourceGroup")
loc = item.get("location")
sid = item.get("subscriptionId")
rid = item.get("id")
if name or typ or rg or loc:
parts = []
if name: parts.append(f"name={name}")
if typ: parts.append(f"type={typ}")
if loc: parts.append(f"location={loc}")
if rg: parts.append(f"rg={rg}")
if sid: parts.append(f"subscriptionId={sid}")
if rid and not sid: parts.append(f"id={rid}")
out.append(f"{i}. "+", ".join(parts))
else:
out.append(f"{i}. {item}")
if len(rows) > limit:
out.append(f"... and {len(rows) - limit} more")
return "\n".join(out)
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any] | None) -> List[types.TextContent]:
if not AZURE_CONFIG:
return [types.TextContent(type="text", text="Error: azure-config.json not found.")]
arguments = arguments or {}
if name == "list-tenants":
tenants = AZURE_CONFIG.get_tenants()
if not tenants:
return [types.TextContent(type="text", text="No tenants configured.")]
lines = [
"Configured tenants:",
]
for t in tenants:
name = t.get("name") or t.get("id") or "(unnamed)"
tid = t.get("id") or "(no id)"
mg = t.get("management_group_id") or t.get("managementGroupId") or "(none)"
default_sub = t.get("default_subscription_id") or "(none)"
lines.append(f"- {name} ({tid}) | management_group_id={mg} | default_subscription_id={default_sub}")
return [types.TextContent(type="text", text="\n".join(lines))]
if name == "run-arg-kql":
tenant_name = arguments.get("tenant_name")
subs = arguments.get("subscription_ids")
use_all = bool(arguments.get("use_all_subscriptions", True))
top = int(arguments.get("top", 100))
kql = arguments.get("kql_query")
if not kql:
return [types.TextContent(type="text", text="Error: kql_query is required")]
# Auto-guess tenant if not provided
if not tenant_name:
tenant_name = _guess_tenant_name_from_text(kql)
cred, default_subs = AZURE_CONFIG.get_credentials(tenant_name)
mg = AZURE_CONFIG.get_management_group_id(tenant_name)
if subs:
pass
elif use_all and mg:
subs = [] # force MG usage below
elif use_all:
discovered = _enumerate_subscriptions_for_credential(cred)
subs = discovered or default_subs
else:
subs = default_subs
# Only error if neither subscriptions nor MG scope is available
if not subs and not (use_all and mg):
return [types.TextContent(type="text", text=(
"Error: No subscriptions available; provide subscription_ids, set default_subscription_id in config, "
"or ensure subscription discovery is possible (azure-mgmt-subscription installed and SP has access)."
))]
# Try management group scope if no subs and mg configured
if use_all and mg and not subs:
result = execute_kql(cred, None, kql, top=top, management_groups=[mg])
else:
result = execute_kql(cred, subs, kql, top=top)
if result["status"] != "success":
return [types.TextContent(type="text", text=f"ARG query failed: {result.get('error','unknown error')}")]
rows = result["results"]
scope_line = (
f"Scope: managementGroup={mg}" if (use_all and mg and not subs) else f"Subscriptions used: {len(subs)}"
)
body = [
"Azure Resource Graph Query Results:",
f"Rows: {result['result_count']}",
f"Tenant: {tenant_name or AZURE_CONFIG.get_default_tenant().get('name')}",
scope_line,
"",
_format_rows(rows),
"",
"KQL:",
result["query"][:1000],
]
return [types.TextContent(type="text", text="\n".join(body))]
if name == "run-kql-template":
tenant_name = arguments.get("tenant_name")
subs = arguments.get("subscription_ids")
use_all = bool(arguments.get("use_all_subscriptions", True))
top = int(arguments.get("top", 100))
template_name = arguments.get("template_name")
params: Dict[str, Any] = arguments.get("params") or {}
if not template_name:
return [types.TextContent(type="text", text="Error: template_name is required")]
kql = load_kql_template(template_name)
if not kql:
return [types.TextContent(type="text", text=f"Error: template '{template_name}' not found in kql templates.")]
# Simple {{key}} replacement
try:
for k, v in params.items():
kql = kql.replace(f"{{{{{k}}}}}", str(v))
except Exception:
pass
# Auto-guess tenant if not provided (best effort: look for tenant-like strings is skipped here)
cred, default_subs = AZURE_CONFIG.get_credentials(tenant_name)
mg = AZURE_CONFIG.get_management_group_id(tenant_name)
if subs:
pass
elif use_all and mg:
subs = [] # force MG usage below
elif use_all:
discovered = _enumerate_subscriptions_for_credential(cred)
subs = discovered or default_subs
else:
subs = default_subs
if not subs and not (use_all and mg):
return [types.TextContent(type="text", text=(
"Error: No subscriptions available; provide subscription_ids, set default_subscription_id in config, "
"or ensure subscription discovery is possible (azure-mgmt-subscription installed and SP has access)."
))]
if use_all and mg and not subs:
result = execute_kql(cred, None, kql, top=top, management_groups=[mg])
else:
result = execute_kql(cred, subs, kql, top=top)
if result["status"] != "success":
return [types.TextContent(type="text", text=f"ARG query failed: {result.get('error','unknown error')}")]
rows = result["results"]
scope_line = (
f"Scope: managementGroup={mg}" if (use_all and mg and not subs) else f"Subscriptions used: {len(subs)}"
)
body = [
f"KQL Template: {template_name}",
f"Rows: {result['result_count']}",
f"Tenant: {tenant_name or AZURE_CONFIG.get_default_tenant().get('name')}",
scope_line,
"",
_format_rows(rows),
"",
"KQL:",
kql[:1000],
]
return [types.TextContent(type="text", text="\n".join(body))]
if name == "arg-tables":
overview = [
"Azure Resource Graph Tables",
"",
"resourcecontainers: Subscription and resource group metadata. Inventory by scope: list subs/rgroups, tags, locations.",
"resourcecontainerchanges: Historical changes at subscription/resource group level. Auditing additions/deletions/updates.",
"resources: All deployed Azure resources. Core inventory: counts, types, locations, tags, SKUs.",
"resourcechanges: Historical changes at resource level. Troubleshoot drift and unexpected modifications.",
"advisorresources: Azure Advisor recommendations. Cost, performance, HA, security best practices.",
"healthresources: Service/Resource Health events. Outages, degraded services, maintenance impacts.",
"policyresources: Azure Policy compliance state. Non-compliant resources and policy details.",
]
return [types.TextContent(type="text", text="\n".join(overview))]
if name == "arg-examples":
topic = (arguments or {}).get("topic", "").strip().lower()
examples: List[str] = []
def add(title: str, kql: str) -> None:
examples.extend([title, "KQL:", kql, ""]) # blank line after each
if not topic or topic in ("subscriptions", "subs"):
add("List subscriptions", list_subscriptions_kql())
if not topic or topic in ("resourcegroups", "resource groups", "rg", "tags"):
add("Resource groups without tags", untagged_resource_groups_kql())
add("List resource groups", list_resource_groups_kql(limit=50))
if not topic or topic in ("changes", "resourcechanges"):
add("Recent resource changes (7d)", resource_changes_recent_kql(days=7))
add("Manual changes (last 30d) — filterable by RG", manual_changes_kql(days=30))
if not topic or topic in ("containerchanges", "resourcecontainerchanges"):
add("Recent subscription/resource group changes (30d)", resource_container_changes_recent_kql(days=30))
if not topic or topic in ("advisor", "recommendations"):
add("Advisor recommendations (all)", advisor_recommendations_kql())
if not topic or topic in ("health", "incidents"):
add("Service/Resource health advisories (recent)", health_advisories_kql())
if not topic or topic in ("policy", "compliance"):
add("Non-compliant policy resources", policy_noncompliant_kql())
if not examples:
examples = ["No examples matched the topic filter."]
header = ["ARG Examples", "", f"Topic filter: {topic or '(none)'}", ""]
return [types.TextContent(type="text", text="\n".join(header + examples))]
if name == "ask-azure":
tenant_name = arguments.get("tenant_name")
subs = arguments.get("subscription_ids")
use_all = bool(arguments.get("use_all_subscriptions", True))
auto_execute = bool(arguments.get("auto_execute", True))
question = arguments.get("question")
if not question:
return [types.TextContent(type="text", text="Error: question is required")]
kql, meta = generate_kql(question)
header = [
"Proposed Azure Resource Graph Query:",
f"Intent: {meta.get('intent','generic')}",
"",
"KQL:",
kql,
]
if not auto_execute:
header.append("\nSet auto_execute=true to run this query.")
return [types.TextContent(type="text", text="\n".join(header))]
# Auto-guess tenant if not provided
if not tenant_name:
tenant_name = _guess_tenant_name_from_text(question)
cred, default_subs = AZURE_CONFIG.get_credentials(tenant_name)
mg = AZURE_CONFIG.get_management_group_id(tenant_name)
if subs:
pass
elif use_all and mg:
subs = [] # force MG usage below
elif use_all:
discovered = _enumerate_subscriptions_for_credential(cred)
subs = discovered or default_subs
else:
subs = default_subs
if not subs and not (use_all and mg):
header.append(
"\nError: No subscriptions available; provide subscription_ids, set default_subscription_id in config, or ensure subscription discovery is possible (azure-mgmt-subscription installed and SP has access)."
)
return [types.TextContent(type="text", text="\n".join(header))]
if use_all and mg and not subs:
result = execute_kql(cred, None, kql, top=100, management_groups=[mg])
else:
result = execute_kql(cred, subs, kql, top=100)
if result["status"] != "success":
header.append(f"\nExecution failed: {result.get('error','unknown error')}")
return [types.TextContent(type="text", text="\n".join(header))]
rows = result["results"]
scope_line = (
f"Scope: managementGroup={mg}" if (use_all and mg and not subs) else f"Subscriptions used: {len(subs)}"
)
body = [
*header,
"",
f"Rows: {result['result_count']}",
f"Tenant: {tenant_name or AZURE_CONFIG.get_default_tenant().get('name')}",
scope_line,
_format_rows(rows),
]
return [types.TextContent(type="text", text="\n".join(body))]
if name == "vm-count-by-tenant":
# Optional: restrict to certain tenants
wanted = set(arguments.get("tenant_names", []) or [])
try_all = bool(arguments.get("use_all_subscriptions", True))
tenants = AZURE_CONFIG.get_tenants()
if wanted:
tenants = [t for t in tenants if t.get("name") in wanted]
if not tenants:
return [types.TextContent(type="text", text="No matching tenants found.")]
rows: List[str] = []
total = 0
for t in tenants:
tname = t.get("name") or t.get("id")
cred, default_subs = AZURE_CONFIG.get_credentials(tname)
mg = AZURE_CONFIG.get_management_group_id(tname)
subs = list(default_subs)
if try_all and mg:
subs = [] # force MG usage below
elif try_all:
discovered = _enumerate_subscriptions_for_credential(cred)
if discovered:
subs = discovered
if not subs:
rows.append(f"- {tname}: 0 (no subscriptions)")
continue
kql = vm_count_kql()
if try_all and mg and not subs:
result = execute_kql(cred, None, kql, top=1, management_groups=[mg])
else:
result = execute_kql(cred, subs, kql, top=1)
if result["status"] != "success" or not result.get("results"):
rows.append(f"- {tname}: error ({result.get('error','query failed')})")
continue
count = int(result["results"][0].get("VMCount", 0))
total += count
rows.append(f"- {tname}: {count}")
body = [
"VM counts by tenant:",
*rows,
"",
f"Total VMs across tenants: {total}",
]
return [types.TextContent(type="text", text="\n".join(body))]
if name == "list-subscriptions":
tenant_name = arguments.get("tenant_name")
cred, default_subs = AZURE_CONFIG.get_credentials(tenant_name)
discovered = _enumerate_subscriptions_for_credential(cred)
mg = AZURE_CONFIG.get_management_group_id(tenant_name)
subs = discovered or default_subs
if not subs and not mg:
return [types.TextContent(type="text", text=(
"No subscriptions found and no management_group_id configured. Configure management_group_id or ensure the SP has Reader at root."
))]
# If we were able to enumerate, we only have IDs. For user-friendly output,
# run a small ARG query to fetch display names.
total_count = len(subs)
scope_line = ""
try:
kql = list_subscriptions_kql()
# If a management group is configured, prefer listing by MG to include all contained subscriptions
if mg:
res = execute_kql(cred, None, kql, top=1000, management_groups=[mg])
scope_line = f"Scope: managementGroup={mg}"
else:
res = execute_kql(cred, subs, kql, top=1000)
scope_line = f"Scope: subscriptions={len(subs)}"
if res.get("status") == "success" and res.get("results"):
rows = res["results"]
lines = [f"- {r.get('name')} ({r.get('subscriptionId')})" for r in rows]
total_count = len(rows)
else:
lines = [f"- {sid}" for sid in subs]
except Exception:
lines = [f"- {sid}" for sid in subs]
scope_line = f"Scope: subscriptions={len(subs)} (fallback)"
body = [
f"Tenant: {tenant_name or AZURE_CONFIG.get_default_tenant().get('name')}",
f"Subscriptions: {total_count}",
scope_line,
*lines,
]
return [types.TextContent(type="text", text="\n".join(body))]
if name == "diagnostics":
tenant_name = arguments.get("tenant_name")
# Resolve tenant and credentials
try:
t = AZURE_CONFIG.get_tenant(tenant_name)
except Exception as e:
return [types.TextContent(type="text", text=f"Config error: {e}")]
tname = t.get("name") or t.get("id")
tid = t.get("id")
cfg_path = AZURE_CONFIG.get_config_path()
mg = AZURE_CONFIG.get_management_group_id(tenant_name)
cred, default_subs = AZURE_CONFIG.get_credentials(tenant_name)
# ARM subscription enumeration
discovered = _enumerate_subscriptions_for_credential(cred)
disc_count = len(discovered)
disc_preview = ", ".join(discovered[:5]) + (" ..." if disc_count > 5 else "") if discovered else "(none)"
# ARG via management group
kql = list_subscriptions_kql()
mg_lines: List[str] = []
mg_count = 0
if mg:
res_mg = execute_kql(cred, None, kql, top=1000, management_groups=[mg])
if res_mg.get("status") == "success" and res_mg.get("results"):
mg_rows = res_mg["results"]
mg_count = len(mg_rows)
mg_lines = [f"- {r.get('name')} ({r.get('subscriptionId')})" for r in mg_rows[:10]]
if mg_count > 10:
mg_lines.append(f"... and {mg_count-10} more")
else:
mg_lines = [f"MG query failed: {res_mg.get('error','unknown error')}"]
else:
mg_lines = ["No management_group_id configured for this tenant."]
# Scoping decision (default behavior with use_all_subscriptions=true, no explicit subs)
if mg:
scope_decision = f"managementGroup={mg}"
elif discovered:
scope_decision = f"subscriptions={disc_count} (enumerated)"
elif default_subs:
scope_decision = f"subscriptions={len(default_subs)} (default)"
else:
scope_decision = "none (would error)"
body = [
"Diagnostics",
f"Config: {cfg_path}",
f"Tenant: {tname} ({tid})",
f"management_group_id (normalized): {mg or '(none)'}",
f"ARM enumeration: {disc_count} -> {disc_preview}",
f"Default subs from config: {', '.join(default_subs) if default_subs else '(none)'}",
f"Scoping decision (use_all_subscriptions): {scope_decision}",
"",
"ARG subscriptions at MG scope:",
*mg_lines,
]
return [types.TextContent(type="text", text="\n".join(body))]
return [types.TextContent(type="text", text=f"Unknown tool: {name}")]
async def main() -> None:
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="azure-assistant-mcp",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)