arg.py•1.5 kB
from typing import Any, Dict, List, Optional
from azure.mgmt.resourcegraph import ResourceGraphClient
from azure.mgmt.resourcegraph.models import QueryRequest, QueryRequestOptions
def execute_kql(
credential,
subscriptions: Optional[List[str]],
kql_query: str,
top: int = 100,
management_groups: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Execute a KQL query against Azure Resource Graph.
Returns a dict with keys: status, results, result_count, warnings, query
"""
client = ResourceGraphClient(credential=credential)
# Build typed request for reliability
options = QueryRequestOptions(result_format="objectArray", top=top)
# Either subscriptions or management_groups must be provided.
request = QueryRequest(
subscriptions=subscriptions,
management_groups=management_groups,
query=kql_query,
options=options,
)
try:
response = client.resources(request)
# response.data is a list[dict] when result_format=objectArray
rows = list(response.data or [])
return {
"status": "success",
"results": rows,
"result_count": len(rows),
"warnings": [],
"query": kql_query,
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"results": [],
"result_count": 0,
"warnings": [],
"query": kql_query,
}