nl2kql.py•3.93 kB
from typing import Dict, Tuple
import re
from .queries import (
list_subscriptions_kql,
list_resource_groups_kql,
untagged_resource_groups_kql,
advisor_recommendations_kql,
policy_noncompliant_kql,
health_advisories_kql,
resource_changes_recent_kql,
resource_container_changes_recent_kql,
public_ips_kql,
unencrypted_storage_kql,
keyvault_network_kql,
generic_list_resources_kql,
stopped_vms_kql,
manual_changes_kql,
)
def generate_kql(question: str) -> Tuple[str, Dict[str, str]]:
"""Very simple heuristic NL -> KQL generator.
Returns (kql_query, metadata) where metadata includes 'intent'.
"""
q = question.strip().lower()
meta: Dict[str, str] = {"intent": "generic"}
# Common VM queries
if ("vm" in q or "virtual machine" in q) and ("stopped" in q or "deallocated" in q):
meta["intent"] = "stopped_vms"
kql = stopped_vms_kql()
return kql, meta
# Subscriptions list
if ("subscription" in q) and ("list" in q or "all" in q or "show" in q):
meta["intent"] = "list_subscriptions"
kql = list_subscriptions_kql()
return kql, meta
# Resource groups without tags
if ("resource group" in q or "resource groups" in q) and ("without tag" in q or "no tag" in q or "untagged" in q):
meta["intent"] = "untagged_resource_groups"
kql = untagged_resource_groups_kql()
return kql, meta
# Generic resource group listing
if ("resource group" in q or "resource groups" in q) and ("list" in q or "show" in q or "all" in q or "any" in q or "find" in q):
meta["intent"] = "list_resource_groups"
kql = list_resource_groups_kql(limit=50)
return kql, meta
# Advisor recommendations
if ("advisor" in q or "recommendation" in q):
meta["intent"] = "advisor_recommendations"
kql = advisor_recommendations_kql()
return kql, meta
# Policy non-compliance
if ("policy" in q) and ("non-compliant" in q or "noncompliant" in q or "non compliant" in q or "compliance" in q):
meta["intent"] = "policy_non_compliance"
kql = policy_noncompliant_kql()
return kql, meta
# Health incidents/advisories
if ("health" in q or "incident" in q or "advisories" in q or "advisory" in q):
meta["intent"] = "service_health"
kql = health_advisories_kql()
return kql, meta
# Manual changes last 30 days (resourcechanges)
if ("manual" in q and "change" in q) and ("30" in q or "30d" in q or "last 30" in q or "month" in q):
meta["intent"] = "manual_changes_30d"
kql = manual_changes_kql(days=30)
return kql, meta
# Recent changes (resource level)
if ("change" in q or "changed" in q or "changes" in q) and not ("resource group" in q):
meta["intent"] = "resource_changes_recent"
kql = resource_changes_recent_kql(days=30)
return kql, meta
# Recent changes (resource group / subscription level)
if ("change" in q or "changed" in q or "changes" in q) and ("resource group" in q or "subscription" in q):
meta["intent"] = "resource_container_changes_recent"
kql = resource_container_changes_recent_kql(days=30)
return kql, meta
if ("public ip" in q or "internet" in q) and ("exposed" in q or "open" in q):
meta["intent"] = "public_ips"
kql = public_ips_kql()
return kql, meta
if ("storage" in q and ("unencrypted" in q or "not encrypted" in q)):
meta["intent"] = "unencrypted_storage"
kql = unencrypted_storage_kql()
return kql, meta
if ("keyvault" in q or "key vault" in q) and ("firewall" in q or "public" in q):
meta["intent"] = "keyvault_network"
kql = keyvault_network_kql()
return kql, meta
# Default generic listing
kql = generic_list_resources_kql(limit=50)
return kql, meta