Skip to main content
Glama
compute_manager.py27.6 kB
from typing import Any, Dict, List, Optional import base64 from ..clients.client_factory import AzureClientFactory class AzureComputeManager: """Class for managing Azure compute resources.""" def __init__(self, client_factory: AzureClientFactory): self.client_factory = client_factory async def create_vm( self, resource_group: str, vm_name: str, location: str = "westeurope", vm_size: str = "Standard_DS1_v2", admin_username: str = "azureuser", os_disk_size_gb: int = 30, image_reference: Dict[str, str] = { "publisher": "Canonical", "offer": "UbuntuServer", "sku": "18.04-LTS", "version": "latest" } ) -> Dict[str, Any]: """Create a new virtual machine.""" compute_client = self.client_factory.get_compute_client() network_client = self.client_factory.get_network_client() import asyncio loop = asyncio.get_event_loop() # Create unique names for networking components vnet_name = f"{vm_name}-vnet" subnet_name = f"{vm_name}-subnet" ip_name = f"{vm_name}-ip" nsg_name = f"{vm_name}-nsg" nic_name = f"{vm_name}-nic" try: # Create public IP address def create_public_ip(): poller = network_client.public_ip_addresses.begin_create_or_update( resource_group, ip_name, { "location": location, "sku": {"name": "Standard"}, "public_ip_allocation_method": "Static", } ) return poller.result() public_ip = await loop.run_in_executor(None, create_public_ip) # Create VNet and subnet def create_vnet_and_subnet(): poller = network_client.virtual_networks.begin_create_or_update( resource_group, vnet_name, { "location": location, "address_space": { "address_prefixes": ["10.0.0.0/16"] }, "subnets": [ { "name": subnet_name, "address_prefix": "10.0.0.0/24" } ] } ) return poller.result() vnet = await loop.run_in_executor(None, create_vnet_and_subnet) # Get subnet def get_subnet(): return network_client.subnets.get( resource_group, vnet_name, subnet_name ) subnet = await loop.run_in_executor(None, get_subnet) # Create NSG def create_nsg(): poller = network_client.network_security_groups.begin_create_or_update( resource_group, nsg_name, { "location": location, "security_rules": [ { "name": "SSH", "priority": 1000, "protocol": "Tcp", "access": "Allow", "direction": "Inbound", "source_address_prefix": "*", "source_port_range": "*", "destination_address_prefix": "*", "destination_port_range": "22" } ] } ) return poller.result() nsg = await loop.run_in_executor(None, create_nsg) # Create NIC def create_nic(): poller = network_client.network_interfaces.begin_create_or_update( resource_group, nic_name, { "location": location, "network_security_group": {"id": nsg.id}, "ip_configurations": [ { "name": "ipconfig1", "subnet": {"id": subnet.id}, "public_ip_address": {"id": public_ip.id} } ] } ) return poller.result() nic = await loop.run_in_executor(None, create_nic) # Create VM def create_virtual_machine(): import uuid vm_parameters = { "location": location, "hardware_profile": { "vm_size": vm_size }, "storage_profile": { "image_reference": image_reference, "os_disk": { "name": f"{vm_name}-os-disk", "caching": "ReadWrite", "create_option": "FromImage", "disk_size_gb": os_disk_size_gb, "managed_disk": { "storage_account_type": "Premium_LRS" } } }, "os_profile": { "computer_name": vm_name, "admin_username": admin_username, "linux_configuration": { "disable_password_authentication": True, "ssh": { "public_keys": [ { "path": f"/home/{admin_username}/.ssh/authorized_keys", "key_data": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC+wWK73dCr+jgQOAxNsHAnNNNMEMWOHYEccp6wJm2gotpr9katuF/ZAdou5AaW1C61slRkHRkpRRX9FA9CYBiitZgvCCz+3nWNN7l/Up54Zps/pHWGZLHNJZRYyAB6j5yVLMVHIHriY49d/GZTZVNB8GoJv9Gakwc/fuEZYYl4YDFiGMBP///TzlI4jhiJzjKnEvqPFki5p2ZRJqcbCiF4pJrxUQR/RXqVFQdbRLZgYfJ8xGB878RENq3yQ39d8dVOkq4edbkzwcUmwwwkYVPIoDGsYLaRHnG+To7FvMeyO7xDVQkMKzopTQV8AuKpyvpqu0a9pWOMaiCyDytO7GGN example@example.com" } ] } } }, "network_profile": { "network_interfaces": [ { "id": nic.id } ] } } poller = compute_client.virtual_machines.begin_create_or_update( resource_group, vm_name, vm_parameters ) return poller.result() vm = await loop.run_in_executor(None, create_virtual_machine) return { "id": vm.id, "name": vm.name, "resource_group": resource_group, "location": location, "vm_size": vm_size, "public_ip": public_ip.ip_address, "status": "provisioning" } except Exception as e: return { "error": str(e), "status": "failed", "resource_group": resource_group, "vm_name": vm_name } async def create_app_gateway_with_waf( self, resource_group: str, app_gateway_name: str, backend_pool_name: str, backend_fqdns: List[str] = None, backend_ips: List[str] = None, location: str = "westeurope", sku_name: str = "WAF_v2", capacity: int = 2, waf_enabled: bool = True, waf_mode: str = "Prevention", frontend_port: int = 80, backend_port: int = 80, tags: Optional[Dict[str, str]] = None ) -> Dict[str, Any]: """Create an Application Gateway with WAF and configure backend pools.""" network_client = self.client_factory.get_network_client() import asyncio loop = asyncio.get_event_loop() # Create unique names for networking components vnet_name = f"{app_gateway_name}-vnet" subnet_name = f"{app_gateway_name}-subnet" pip_name = f"{app_gateway_name}-pip" frontend_ip_config_name = "frontend-ip-config" frontend_port_name = "frontend-port" http_listener_name = "http-listener" backend_http_settings_name = "backend-http-settings" http_routing_rule_name = "http-routing-rule" try: # Create public IP def create_public_ip(): return network_client.public_ip_addresses.begin_create_or_update( resource_group, pip_name, { "location": location, "sku": {"name": "Standard"}, "public_ip_allocation_method": "Static", "tags": tags } ).result() public_ip = await loop.run_in_executor(None, create_public_ip) # Create VNet and subnet def create_vnet_and_subnet(): # Create VNet vnet_poller = network_client.virtual_networks.begin_create_or_update( resource_group, vnet_name, { "location": location, "address_space": { "address_prefixes": ["10.0.0.0/16"] }, "tags": tags } ) vnet = vnet_poller.result() # Create subnet subnet_poller = network_client.subnets.begin_create_or_update( resource_group, vnet_name, subnet_name, { "address_prefix": "10.0.0.0/24" } ) return subnet_poller.result() subnet = await loop.run_in_executor(None, create_vnet_and_subnet) # Prepare backend address pool configuration backend_addresses = [] if backend_fqdns: for fqdn in backend_fqdns: backend_addresses.append({"fqdn": fqdn}) if backend_ips: for ip in backend_ips: backend_addresses.append({"ip_address": ip}) # Create the Application Gateway def create_app_gateway(): app_gateway_params = { "location": location, "tags": tags, "sku": { "name": sku_name, "tier": sku_name.split('_')[0], "capacity": capacity }, "gateway_ip_configurations": [ { "name": "gateway-ip-configuration", "subnet": {"id": subnet.id} } ], "frontend_ip_configurations": [ { "name": frontend_ip_config_name, "public_ip_address": {"id": public_ip.id} } ], "frontend_ports": [ { "name": frontend_port_name, "port": frontend_port } ], "backend_address_pools": [ { "name": backend_pool_name, "backend_addresses": backend_addresses } ], "backend_http_settings_collection": [ { "name": backend_http_settings_name, "port": backend_port, "protocol": "Http", "cookie_based_affinity": "Disabled", "request_timeout": 30 } ], "http_listeners": [ { "name": http_listener_name, "frontend_ip_configuration": {"id": f"[resourceId('Microsoft.Network/applicationGateways/frontendIPConfigurations', '{app_gateway_name}', '{frontend_ip_config_name}')]"}, "frontend_port": {"id": f"[resourceId('Microsoft.Network/applicationGateways/frontendPorts', '{app_gateway_name}', '{frontend_port_name}')]"}, "protocol": "Http" } ], "request_routing_rules": [ { "name": http_routing_rule_name, "rule_type": "Basic", "http_listener": {"id": f"[resourceId('Microsoft.Network/applicationGateways/httpListeners', '{app_gateway_name}', '{http_listener_name}')]"}, "backend_address_pool": {"id": f"[resourceId('Microsoft.Network/applicationGateways/backendAddressPools', '{app_gateway_name}', '{backend_pool_name}')]"}, "backend_http_settings": {"id": f"[resourceId('Microsoft.Network/applicationGateways/backendHttpSettingsCollection', '{app_gateway_name}', '{backend_http_settings_name}')]"} } ] } # Add WAF configuration if WAF is enabled if waf_enabled and "WAF" in sku_name: app_gateway_params["web_application_firewall_configuration"] = { "enabled": True, "firewall_mode": waf_mode, "rule_set_type": "OWASP", "rule_set_version": "3.2" } poller = network_client.application_gateways.begin_create_or_update( resource_group, app_gateway_name, app_gateway_params ) return poller.result() app_gateway = await loop.run_in_executor(None, create_app_gateway) return { "id": app_gateway.id, "name": app_gateway.name, "resource_group": resource_group, "location": location, "public_ip": public_ip.ip_address, "backend_pool": backend_pool_name, "sku": sku_name, "waf_enabled": waf_enabled, "waf_mode": waf_mode if waf_enabled else None, "status": "provisioning" } except Exception as e: return { "error": str(e), "status": "failed", "resource_group": resource_group, "app_gateway_name": app_gateway_name } async def create_vnet_peering( self, resource_group: str, vnet_name: str, remote_vnet_name: str, remote_vnet_resource_group: str, peering_name: str = None ) -> Dict[str, Any]: """Create VNet peering between two virtual networks.""" network_client = self.client_factory.get_network_client() import asyncio loop = asyncio.get_event_loop() try: # Generate a default peering name if not provided if not peering_name: peering_name = f"{vnet_name}-{remote_vnet_name}-peering" # Get the remote VNet ID def get_remote_vnet(): return network_client.virtual_networks.get( resource_group_name=remote_vnet_resource_group, virtual_network_name=remote_vnet_name ) remote_vnet = await loop.run_in_executor(None, get_remote_vnet) # Create the VNet peering def create_peering(): poller = network_client.virtual_network_peerings.begin_create_or_update( resource_group_name=resource_group, virtual_network_name=vnet_name, virtual_network_peering_name=peering_name, parameters={ "allow_virtual_network_access": True, "allow_forwarded_traffic": True, "allow_gateway_transit": False, "use_remote_gateways": False, "remote_virtual_network": {"id": remote_vnet.id} } ) return poller.result() peering = await loop.run_in_executor(None, create_peering) return { "result": f"VNet peering '{peering_name}' created successfully", "peering_name": peering_name, "vnet_name": vnet_name, "remote_vnet_name": remote_vnet_name, "resource_group": resource_group, "remote_vnet_resource_group": remote_vnet_resource_group } except Exception as e: return { "error": str(e), "status": "failed", "resource_group": resource_group, "vnet_name": vnet_name, "remote_vnet_name": remote_vnet_name, "remote_vnet_resource_group": remote_vnet_resource_group } async def create_sql_server( self, resource_group: str, server_name: str, location: str = "westeurope", admin_username: str = "sqladmin", admin_password: str = "P@ssw0rd1234", tags: Optional[Dict[str, str]] = None ) -> Dict[str, Any]: """Create a new Azure SQL Server with optional firewall rules.""" sql_client = self.client_factory.get_sql_client() import asyncio loop = asyncio.get_event_loop() try: # Create the SQL Server def create_server(): poller = sql_client.servers.begin_create_or_update( resource_group_name=resource_group, server_name=server_name, parameters={ "location": location, "administrator_login": admin_username, "administrator_login_password": admin_password, "tags": tags } ) return poller.result() server = await loop.run_in_executor(None, create_server) return { "result": f"SQL Server '{server_name}' created successfully", "server_name": server_name, "resource_group": resource_group, "location": location } except Exception as e: return { "error": str(e), "status": "failed", "resource_group": resource_group, "server_name": server_name } async def create_sql_database( self, resource_group: str, server_name: str, database_name: str, collation: str = "SQL_Latin1_General_CP1_CI_AS", max_size_bytes: int = 2147483648, edition: str = "Basic", tags: Optional[Dict[str, str]] = None ) -> Dict[str, Any]: """Create a new database in an existing Azure SQL Server.""" sql_client = self.client_factory.get_sql_client() import asyncio loop = asyncio.get_event_loop() try: # Create the SQL Database def create_database(): poller = sql_client.databases.begin_create_or_update( resource_group_name=resource_group, server_name=server_name, database_name=database_name, parameters={ "location": sql_client.servers.get(resource_group, server_name).location, "collation": collation, "max_size_bytes": max_size_bytes, "sku": { "name": edition }, "tags": tags } ) return poller.result() database = await loop.run_in_executor(None, create_database) return { "result": f"SQL Database '{database_name}' created successfully", "database_name": database_name, "server_name": server_name, "resource_group": resource_group } except Exception as e: return { "error": str(e), "status": "failed", "resource_group": resource_group, "server_name": server_name, "database_name": database_name } async def create_acr( self, resource_group: str, registry_name: str, location: str = "westeurope", sku: str = "Basic", admin_user_enabled: bool = True, tags: Optional[Dict[str, str]] = None ) -> Dict[str, Any]: """Create a new Azure Container Registry.""" container_registry_client = self.client_factory.get_container_registry_client() import asyncio loop = asyncio.get_event_loop() try: # Create the Container Registry def create_registry(): poller = container_registry_client.registries.begin_create( resource_group_name=resource_group, registry_name=registry_name, registry={ "location": location, "sku": {"name": sku}, "admin_user_enabled": admin_user_enabled, "tags": tags } ) return poller.result() registry = await loop.run_in_executor(None, create_registry) return { "result": f"Azure Container Registry '{registry_name}' created successfully", "registry_name": registry_name, "resource_group": resource_group, "location": location, "sku": sku, "admin_user_enabled": admin_user_enabled } except Exception as e: return { "error": str(e), "status": "failed", "resource_group": resource_group, "registry_name": registry_name } async def get_acr_credentials( self, resource_group: str, registry_name: str ) -> Dict[str, Any]: """Get the login credentials for an Azure Container Registry.""" container_registry_client = self.client_factory.get_container_registry_client() import asyncio loop = asyncio.get_event_loop() try: # Get the registry credentials def list_credentials(): return container_registry_client.registries.list_credentials( resource_group_name=resource_group, registry_name=registry_name ) credentials = await loop.run_in_executor(None, list_credentials) return { "result": "Credentials retrieved successfully", "registry_name": registry_name, "resource_group": resource_group, "username": credentials.username, "passwords": [password.value for password in credentials.passwords] } except Exception as e: return { "error": str(e), "status": "failed", "resource_group": resource_group, "registry_name": registry_name } async def list_acr_repositories( self, resource_group: str, registry_name: str ) -> Dict[str, Any]: """List repositories in an Azure Container Registry.""" container_registry_client = self.client_factory.get_container_registry_client() import asyncio loop = asyncio.get_event_loop() try: # List repositories def list_repositories(): return container_registry_client.registries.list_repositories( resource_group_name=resource_group, registry_name=registry_name ) repositories = await loop.run_in_executor(None, list_repositories) return { "result": "Repositories listed successfully", "registry_name": registry_name, "resource_group": resource_group, "repositories": repositories } except Exception as e: return { "error": str(e), "status": "failed", "resource_group": resource_group, "registry_name": registry_name } async def list_acr_tags( self, resource_group: str, registry_name: str, repository_name: str ) -> Dict[str, Any]: """List tags for a repository in an Azure Container Registry.""" container_registry_client = self.client_factory.get_container_registry_client() import asyncio loop = asyncio.get_event_loop() try: # List tags for the repository def list_tags(): return container_registry_client.registries.get_repository_tags( resource_group_name=resource_group, registry_name=registry_name, repository_name=repository_name ) tags = await loop.run_in_executor(None, list_tags) return { "result": "Tags listed successfully", "registry_name": registry_name, "repository_name": repository_name, "resource_group": resource_group, "tags": tags } except Exception as e: return { "error": str(e), "status": "failed", "resource_group": resource_group, "registry_name": registry_name, "repository_name": repository_name }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/idofrizler/azure-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server