#!/usr/bin/env python3
"""
Demo script showing the agent analyzing mock Azure infrastructure
This demonstrates the agent's capabilities without requiring actual Azure resources
"""
import json
import asyncio
from groq import Groq
import os
from dotenv import load_dotenv
load_dotenv()
# Mock Azure security data (simulates what MCP tools would return)
MOCK_NSG_ANALYSIS = {
"nsg_name": "nsg-webapp-prod",
"location": "eastus",
"total_rules": 5,
"issues_found": 3,
"security_issues": [
{
"severity": "CRITICAL",
"rule_name": "allow-all-inbound",
"issue": "Inbound rule allows all ports (*) from any source",
"source": "*",
"destination_port": "*",
"recommendation": "Specify exact ports needed and restrict source IPs"
},
{
"severity": "HIGH",
"rule_name": "allow-rdp-any",
"issue": "RDP port 3389 exposed to the internet",
"port": 3389,
"source": "*",
"recommendation": "Use Azure Bastion for secure RDP access, never expose directly"
},
{
"severity": "HIGH",
"rule_name": "allow-sql-public",
"issue": "SQL Server port 1433 is publicly accessible",
"port": 1433,
"source": "0.0.0.0/0",
"recommendation": "Restrict to application subnet or use Private Link"
}
],
"summary": "Found 3 security issues in NSG rules"
}
MOCK_STORAGE_ANALYSIS = {
"storage_account": "prodwebappstorage",
"location": "eastus",
"encryption": "Enabled",
"https_only": False,
"public_access_enabled": True,
"issues_found": 2,
"security_issues": [
{
"severity": "CRITICAL",
"issue": "HTTPS-only traffic is not enforced",
"recommendation": "Enable HTTPS-only traffic (secure transfer required) immediately"
},
{
"severity": "HIGH",
"issue": "Public blob access is enabled",
"recommendation": "Disable public blob access unless specifically required. Use SAS tokens or private endpoints instead."
}
],
"summary": "Found 2 security issues in storage account"
}
MOCK_PUBLIC_IPS = {
"total_public_ips": 3,
"public_ips": [
{
"name": "pip-webapp-prod",
"ip_address": "20.185.123.45",
"allocation_method": "Static",
"location": "eastus",
"resource_group": "rg-webapp-prod"
},
{
"name": "pip-database-server",
"ip_address": "52.142.67.89",
"allocation_method": "Dynamic",
"location": "eastus",
"resource_group": "rg-database-prod"
},
{
"name": "pip-admin-vm",
"ip_address": "40.117.234.12",
"allocation_method": "Static",
"location": "westus",
"resource_group": "rg-management"
}
],
"security_note": "Public IPs expose resources to the internet. Ensure proper NSG rules are in place."
}
MOCK_RESOURCES = {
"total_resources": 8,
"resources": [
{"name": "vm-webapp-prod-01", "type": "Microsoft.Compute/virtualMachines", "location": "eastus", "resource_group": "rg-webapp-prod"},
{"name": "nsg-webapp-prod", "type": "Microsoft.Network/networkSecurityGroups", "location": "eastus", "resource_group": "rg-webapp-prod"},
{"name": "prodwebappstorage", "type": "Microsoft.Storage/storageAccounts", "location": "eastus", "resource_group": "rg-webapp-prod"},
{"name": "pip-webapp-prod", "type": "Microsoft.Network/publicIPAddresses", "location": "eastus", "resource_group": "rg-webapp-prod"},
{"name": "db-server-sql", "type": "Microsoft.Sql/servers", "location": "eastus", "resource_group": "rg-database-prod"},
{"name": "pip-database-server", "type": "Microsoft.Network/publicIPAddresses", "location": "eastus", "resource_group": "rg-database-prod"},
{"name": "vm-admin-01", "type": "Microsoft.Compute/virtualMachines", "location": "westus", "resource_group": "rg-management"},
{"name": "kv-secrets-prod", "type": "Microsoft.KeyVault/vaults", "location": "eastus", "resource_group": "rg-webapp-prod"}
]
}
# Mock tool definitions
MOCK_TOOLS = [
{
"type": "function",
"function": {
"name": "azure_list_resources",
"description": "List all resources in a resource group or subscription",
"parameters": {
"type": "object",
"properties": {
"subscription_id": {"type": "string"},
"resource_group": {"type": "string"}
},
"required": ["subscription_id"]
}
}
},
{
"type": "function",
"function": {
"name": "azure_check_nsg_rules",
"description": "Analyze Network Security Group rules for security issues",
"parameters": {
"type": "object",
"properties": {
"subscription_id": {"type": "string"},
"resource_group": {"type": "string"},
"nsg_name": {"type": "string"}
},
"required": ["subscription_id", "resource_group", "nsg_name"]
}
}
},
{
"type": "function",
"function": {
"name": "azure_check_storage_security",
"description": "Check storage account security configuration",
"parameters": {
"type": "object",
"properties": {
"subscription_id": {"type": "string"},
"resource_group": {"type": "string"},
"storage_account_name": {"type": "string"}
},
"required": ["subscription_id", "resource_group", "storage_account_name"]
}
}
},
{
"type": "function",
"function": {
"name": "azure_list_public_ips",
"description": "List all public IP addresses",
"parameters": {
"type": "object",
"properties": {
"subscription_id": {"type": "string"},
"resource_group": {"type": "string"}
},
"required": ["subscription_id"]
}
}
}
]
def mock_tool_execution(tool_name: str, arguments: dict) -> str:
"""Simulate tool execution with mock data"""
if tool_name == "azure_list_resources":
return json.dumps(MOCK_RESOURCES, indent=2)
elif tool_name == "azure_check_nsg_rules":
return json.dumps(MOCK_NSG_ANALYSIS, indent=2)
elif tool_name == "azure_check_storage_security":
return json.dumps(MOCK_STORAGE_ANALYSIS, indent=2)
elif tool_name == "azure_list_public_ips":
return json.dumps(MOCK_PUBLIC_IPS, indent=2)
else:
return json.dumps({"error": f"Unknown tool: {tool_name}"})
async def run_demo_analysis(groq_api_key: str, query: str):
"""Run a demo analysis with Groq"""
client = Groq(api_key=groq_api_key)
model = "llama-3.3-70b-versatile"
print(f"\n{'='*70}")
print(f"DEMO: {query}")
print(f"{'='*70}\n")
system_message = {
"role": "system",
"content": """You are an expert Azure cloud security analyst. Analyze the infrastructure data provided by the tools and:
1. Identify security risks with severity levels (CRITICAL, HIGH, MEDIUM, LOW)
2. Explain the potential impact of each issue
3. Provide specific, actionable recommendations
4. Prioritize issues by severity
5. Give an overall security score (0-100) and summary
Be thorough but concise. Use clear formatting with sections."""
}
messages = [
system_message,
{"role": "user", "content": query}
]
max_iterations = 5
iteration = 0
while iteration < max_iterations:
iteration += 1
response = client.chat.completions.create(
model=model,
messages=messages,
tools=MOCK_TOOLS,
tool_choice="auto",
temperature=0.1,
max_tokens=2000
)
assistant_message = response.choices[0].message
# Check if done
if not assistant_message.tool_calls:
print(f"\n{'='*70}")
print("SECURITY ANALYSIS REPORT")
print(f"{'='*70}\n")
print(assistant_message.content)
print(f"\n{'='*70}\n")
return
# Add assistant message
messages.append({
"role": "assistant",
"content": assistant_message.content,
"tool_calls": [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
}
for tc in assistant_message.tool_calls
]
})
# Execute tools
for tool_call in assistant_message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
print(f"🔧 Tool: {tool_name}")
print(f" Args: {json.dumps(tool_args, indent=8)}")
result = mock_tool_execution(tool_name, tool_args)
print(f" ✓ Completed\n")
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result
})
print("Reached max iterations")
async def main():
"""Run demo scenarios"""
groq_api_key = os.getenv("GROQ_API_KEY")
if not groq_api_key:
print("="*70)
print("ERROR: GROQ_API_KEY not found")
print("="*70)
print("\nTo run this demo:")
print("1. Get a free API key from: https://console.groq.com")
print("2. Create a .env file:")
print(" echo 'GROQ_API_KEY=your_key_here' > .env")
print("\nOr run with:")
print(" GROQ_API_KEY=your_key python demo.py")
return
print("\n" + "="*70)
print("AZURE SECURITY AGENT - DEMO MODE")
print("="*70)
print("\nThis demo shows the agent analyzing mock Azure infrastructure.")
print("No actual Azure resources are accessed.\n")
demos = [
{
"name": "Comprehensive Security Audit",
"query": "Perform a comprehensive security audit of the production resource group 'rg-webapp-prod'. Check all resources, NSGs, storage accounts, and public IPs. Provide a detailed security report with prioritized recommendations."
},
{
"name": "Network Security Focus",
"query": "Analyze the NSG 'nsg-webapp-prod' in resource group 'rg-webapp-prod' and identify all network security vulnerabilities. Focus on exposed ports and overly permissive rules."
},
{
"name": "Storage Security Audit",
"query": "Audit the storage account 'prodwebappstorage' in resource group 'rg-webapp-prod'. Check for public access, encryption, and compliance with security best practices."
}
]
print("Available demos:")
for i, demo in enumerate(demos, 1):
print(f" {i}. {demo['name']}")
print(" 0. Run all demos")
print()
try:
choice = input("Select demo (0-3): ").strip()
if choice == "0":
for demo in demos:
await run_demo_analysis(groq_api_key, demo['query'])
print("\n" + "▼"*70 + "\n")
await asyncio.sleep(2) # Pause between demos
elif choice in ["1", "2", "3"]:
demo = demos[int(choice) - 1]
await run_demo_analysis(groq_api_key, demo['query'])
else:
print("Invalid choice")
except KeyboardInterrupt:
print("\n\nDemo interrupted")
except Exception as e:
print(f"\nError: {str(e)}")
if __name__ == "__main__":
asyncio.run(main())