#!/usr/bin/env python3
"""
Azure Security MCP Server (FastMCP)
MCP server for Azure infrastructure security analysis using FastMCP.
"""
import json
import logging
import os
from pathlib import Path
from typing import Optional
from fastmcp import FastMCP
from azure.identity import DefaultAzureCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.storage import StorageManagementClient
from azure.mgmt.compute import ComputeManagementClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastMCP server for Azure security
mcp = FastMCP("azure-security-analyzer")
@mcp.prompt()
def system_prompt() -> str:
"""Instructions for the Azure security agent."""
script_dir = Path(__file__).resolve().parent
prompt_path = script_dir / "prompts" / "system_instructions.md"
if prompt_path.exists():
return prompt_path.read_text(encoding="utf-8")
return "You are an Azure Cloud Security Analyst. Use the MCP tools to discover and analyze Azure resources for security issues. Provide severity ratings and actionable recommendations."
# Global Azure clients (initialized per subscription)
azure_clients = {
"credential": None,
"resource_client": None,
"network_client": None,
"storage_client": None,
"compute_client": None,
"subscription_id": None,
}
def initialize_azure_clients(subscription_id: str) -> bool:
"""Initialize Azure clients with subscription ID."""
try:
credential = DefaultAzureCredential()
azure_clients["credential"] = credential
azure_clients["subscription_id"] = subscription_id
azure_clients["resource_client"] = ResourceManagementClient(credential, subscription_id)
azure_clients["network_client"] = NetworkManagementClient(credential, subscription_id)
azure_clients["storage_client"] = StorageManagementClient(credential, subscription_id)
azure_clients["compute_client"] = ComputeManagementClient(credential, subscription_id)
logger.info(f"Azure clients initialized for subscription: {subscription_id}")
return True
except Exception as e:
logger.error(f"Failed to initialize Azure clients: {str(e)}")
return False
# --- Discovery tools ---
@mcp.tool()
async def azure_list_resource_groups(subscription_id: str) -> str:
"""List all resource groups in the subscription. Use this first to discover what resource groups exist before analyzing them."""
if not initialize_azure_clients(subscription_id):
return "Failed to initialize Azure clients. Check credentials."
try:
rgs = []
for rg in azure_clients["resource_client"].resource_groups.list():
rgs.append({"name": rg.name, "location": rg.location, "id": rg.id})
result = {
"total_resource_groups": len(rgs),
"resource_groups": rgs,
"summary": f"Found {len(rgs)} resource group(s). Use these names with other tools to analyze resources.",
}
return json.dumps(result, indent=2)
except Exception as e:
return f"Error listing resource groups: {str(e)}"
@mcp.tool()
async def azure_list_nsgs(subscription_id: str, resource_group: Optional[str] = None) -> str:
"""List all Network Security Groups in the subscription or in a specific resource group. Use to discover NSG names for security analysis."""
if not initialize_azure_clients(subscription_id):
return "Failed to initialize Azure clients. Check credentials."
try:
nsgs = []
if resource_group:
nsg_list = azure_clients["network_client"].network_security_groups.list(resource_group)
else:
nsg_list = azure_clients["network_client"].network_security_groups.list_all()
for nsg in nsg_list:
rg_name = nsg.id.split("/")[4] if "/" in nsg.id else "N/A"
nsgs.append({"name": nsg.name, "location": nsg.location, "resource_group": rg_name})
result = {
"total_nsgs": len(nsgs),
"nsgs": nsgs,
"summary": f"Found {len(nsgs)} NSG(s). Use azure_check_nsg_rules with resource_group and nsg_name to analyze each.",
}
return json.dumps(result, indent=2)
except Exception as e:
return f"Error listing NSGs: {str(e)}"
@mcp.tool()
async def azure_list_storage_accounts(subscription_id: str, resource_group: Optional[str] = None) -> str:
"""List all storage accounts in the subscription or in a specific resource group. Use to discover storage account names for security audit."""
if not initialize_azure_clients(subscription_id):
return "Failed to initialize Azure clients. Check credentials."
try:
accounts = []
if resource_group:
sa_list = azure_clients["storage_client"].storage_accounts.list_by_resource_group(resource_group)
else:
sa_list = azure_clients["storage_client"].storage_accounts.list()
for sa in sa_list:
rg_name = sa.id.split("/")[4] if "/" in sa.id else "N/A"
accounts.append({"name": sa.name, "location": sa.location, "resource_group": rg_name})
result = {
"total_storage_accounts": len(accounts),
"storage_accounts": accounts,
"summary": f"Found {len(accounts)} storage account(s). Use azure_check_storage_security to audit each.",
}
return json.dumps(result, indent=2)
except Exception as e:
return f"Error listing storage accounts: {str(e)}"
@mcp.tool()
async def azure_list_resources(subscription_id: str, resource_group: Optional[str] = None) -> str:
"""List all resources in a resource group or subscription. Returns resource names, types, and locations."""
if not initialize_azure_clients(subscription_id):
return "Failed to initialize Azure clients. Check credentials."
try:
resources = []
if resource_group:
resource_list = azure_clients["resource_client"].resources.list_by_resource_group(resource_group)
else:
resource_list = azure_clients["resource_client"].resources.list()
for resource in resource_list:
resources.append({
"name": resource.name,
"type": resource.type,
"location": resource.location,
"resource_group": resource.id.split("/")[4] if "/" in resource.id else "N/A",
})
result = {"total_resources": len(resources), "resources": resources}
return json.dumps(result, indent=2)
except Exception as e:
return f"Error listing resources: {str(e)}"
# --- Analysis tools ---
@mcp.tool()
async def azure_check_nsg_rules(subscription_id: str, resource_group: str, nsg_name: str) -> str:
"""Analyze Network Security Group (NSG) rules for security issues like open ports (22, 3389, etc.) and overly permissive rules."""
if not initialize_azure_clients(subscription_id):
return "Failed to initialize Azure clients. Check credentials."
try:
nsg = azure_clients["network_client"].network_security_groups.get(resource_group, nsg_name)
issues = []
dangerous_ports = [22, 3389, 1433, 3306, 5432, 27017, 6379]
for rule in nsg.security_rules:
if rule.direction == "Inbound" and rule.access == "Allow":
if rule.source_address_prefix in ("*", "Internet"):
issues.append({
"severity": "HIGH",
"rule_name": rule.name,
"issue": "Inbound rule allows traffic from any source (*)",
"destination_port": rule.destination_port_range,
"protocol": rule.protocol,
"recommendation": "Restrict source IP addresses to specific ranges",
})
if rule.destination_port_range:
port_range = rule.destination_port_range
if port_range == "*":
issues.append({
"severity": "CRITICAL",
"rule_name": rule.name,
"issue": "Inbound rule allows all ports (*)",
"source": rule.source_address_prefix,
"recommendation": "Specify exact ports needed",
})
else:
try:
port = int(port_range)
if port in dangerous_ports:
issues.append({
"severity": "HIGH",
"rule_name": rule.name,
"issue": f"Potentially dangerous port {port} is exposed",
"port": port,
"source": rule.source_address_prefix,
"recommendation": f"Consider restricting access to port {port} or using VPN/bastion",
})
except ValueError:
pass
result = {
"nsg_name": nsg_name,
"location": nsg.location,
"total_rules": len(nsg.security_rules),
"issues_found": len(issues),
"security_issues": issues,
"summary": f"Found {len(issues)} security issues in NSG rules",
}
return json.dumps(result, indent=2)
except Exception as e:
return f"Error checking NSG rules: {str(e)}"
@mcp.tool()
async def azure_check_storage_security(
subscription_id: str, resource_group: str, storage_account_name: str
) -> str:
"""Check storage account security configuration including public access, encryption, and secure transfer requirements."""
if not initialize_azure_clients(subscription_id):
return "Failed to initialize Azure clients. Check credentials."
try:
storage_account = azure_clients["storage_client"].storage_accounts.get_properties(
resource_group, storage_account_name
)
issues = []
if storage_account.allow_blob_public_access:
issues.append({
"severity": "HIGH",
"issue": "Public blob access is enabled",
"recommendation": "Disable public blob access unless specifically required",
})
if not storage_account.enable_https_traffic_only:
issues.append({
"severity": "CRITICAL",
"issue": "HTTPS-only traffic is not enforced",
"recommendation": "Enable HTTPS-only traffic (secure transfer required)",
})
if hasattr(storage_account, "minimum_tls_version") and storage_account.minimum_tls_version != "TLS1_2":
issues.append({
"severity": "MEDIUM",
"issue": f"Minimum TLS version is {storage_account.minimum_tls_version}",
"recommendation": "Set minimum TLS version to TLS 1.2",
})
encryption_status = "Enabled" if storage_account.encryption else "Disabled"
result = {
"storage_account": storage_account_name,
"location": storage_account.location,
"encryption": encryption_status,
"https_only": storage_account.enable_https_traffic_only,
"public_access_enabled": storage_account.allow_blob_public_access,
"issues_found": len(issues),
"security_issues": issues,
"summary": f"Found {len(issues)} security issues in storage account",
}
return json.dumps(result, indent=2)
except Exception as e:
return f"Error checking storage security: {str(e)}"
@mcp.tool()
async def azure_list_public_ips(subscription_id: str, resource_group: Optional[str] = None) -> str:
"""List all public IP addresses in a resource group or subscription to identify exposed resources."""
if not initialize_azure_clients(subscription_id):
return "Failed to initialize Azure clients. Check credentials."
try:
public_ips = []
if resource_group:
ip_list = azure_clients["network_client"].public_ip_addresses.list(resource_group)
else:
ip_list = azure_clients["network_client"].public_ip_addresses.list_all()
for ip in ip_list:
public_ips.append({
"name": ip.name,
"ip_address": ip.ip_address if ip.ip_address else "Not assigned",
"allocation_method": ip.public_ip_allocation_method,
"location": ip.location,
"resource_group": ip.id.split("/")[4] if "/" in ip.id else "N/A",
})
result = {
"total_public_ips": len(public_ips),
"public_ips": public_ips,
"security_note": "Public IPs expose resources to the internet. Ensure proper NSG rules are in place.",
}
return json.dumps(result, indent=2)
except Exception as e:
return f"Error listing public IPs: {str(e)}"
@mcp.tool()
async def azure_check_vm_security(subscription_id: str, resource_group: str, vm_name: str) -> str:
"""Check virtual machine security configuration including disk encryption, managed identities, and extensions."""
if not initialize_azure_clients(subscription_id):
return "Failed to initialize Azure clients. Check credentials."
try:
vm = azure_clients["compute_client"].virtual_machines.get(
resource_group, vm_name, expand="instanceView"
)
issues = []
if not vm.identity or vm.identity.type == "None":
issues.append({
"severity": "MEDIUM",
"issue": "VM does not have a managed identity configured",
"recommendation": "Enable managed identity for secure access to Azure resources",
})
if vm.storage_profile.os_disk.encryption_settings:
if not vm.storage_profile.os_disk.encryption_settings.enabled:
issues.append({
"severity": "HIGH",
"issue": "OS disk encryption is not enabled",
"recommendation": "Enable Azure Disk Encryption for OS disk",
})
else:
issues.append({
"severity": "HIGH",
"issue": "OS disk encryption settings not configured",
"recommendation": "Enable Azure Disk Encryption",
})
result = {
"vm_name": vm_name,
"location": vm.location,
"vm_size": vm.hardware_profile.vm_size,
"os_type": vm.storage_profile.os_disk.os_type,
"managed_identity": "Enabled" if vm.identity and vm.identity.type != "None" else "Disabled",
"issues_found": len(issues),
"security_issues": issues,
"summary": f"Found {len(issues)} security issues in VM configuration",
}
return json.dumps(result, indent=2)
except Exception as e:
return f"Error checking VM security: {str(e)}"
if __name__ == "__main__":
# No banner when used as subprocess: agent reads stdout for JSON-RPC only.
mcp.run(show_banner=False)