"""
Tools for managing OCI Security resources.
"""
import logging
from typing import Dict, List, Any, Optional
import oci
logger = logging.getLogger(__name__)
def list_security_lists(network_client: oci.core.VirtualNetworkClient, compartment_id: str,
vcn_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""
List all security lists in a compartment, optionally filtered by VCN.
Args:
network_client: OCI VirtualNetwork client
compartment_id: OCID of the compartment
vcn_id: Optional OCID of the VCN to filter by
Returns:
List of security lists with their details
"""
try:
security_lists_response = oci.pagination.list_call_get_all_results(
network_client.list_security_lists,
compartment_id,
vcn_id=vcn_id
)
security_lists = []
for security_list in security_lists_response.data:
security_lists.append({
"id": security_list.id,
"display_name": security_list.display_name,
"compartment_id": security_list.compartment_id,
"vcn_id": security_list.vcn_id,
"lifecycle_state": security_list.lifecycle_state,
"time_created": str(security_list.time_created),
"ingress_security_rules_count": len(security_list.ingress_security_rules) if security_list.ingress_security_rules else 0,
"egress_security_rules_count": len(security_list.egress_security_rules) if security_list.egress_security_rules else 0,
})
logger.info(f"Found {len(security_lists)} security lists in compartment {compartment_id}")
return security_lists
except Exception as e:
logger.exception(f"Error listing security lists: {e}")
raise
def get_security_list(network_client: oci.core.VirtualNetworkClient, security_list_id: str) -> Dict[str, Any]:
"""
Get details of a specific security list.
Args:
network_client: OCI VirtualNetwork client
security_list_id: OCID of the security list
Returns:
Details of the security list
"""
try:
security_list = network_client.get_security_list(security_list_id).data
# Format ingress rules
ingress_rules = []
if security_list.ingress_security_rules:
for rule in security_list.ingress_security_rules:
ingress_rules.append({
"protocol": rule.protocol,
"source": rule.source,
"source_type": rule.source_type,
"is_stateless": rule.is_stateless,
"description": rule.description,
"tcp_options": {
"destination_port_range": {
"min": rule.tcp_options.destination_port_range.min if rule.tcp_options and rule.tcp_options.destination_port_range else None,
"max": rule.tcp_options.destination_port_range.max if rule.tcp_options and rule.tcp_options.destination_port_range else None,
} if rule.tcp_options and rule.tcp_options.destination_port_range else None,
"source_port_range": {
"min": rule.tcp_options.source_port_range.min if rule.tcp_options and rule.tcp_options.source_port_range else None,
"max": rule.tcp_options.source_port_range.max if rule.tcp_options and rule.tcp_options.source_port_range else None,
} if rule.tcp_options and rule.tcp_options.source_port_range else None,
} if rule.tcp_options else None,
"udp_options": {
"destination_port_range": {
"min": rule.udp_options.destination_port_range.min if rule.udp_options and rule.udp_options.destination_port_range else None,
"max": rule.udp_options.destination_port_range.max if rule.udp_options and rule.udp_options.destination_port_range else None,
} if rule.udp_options and rule.udp_options.destination_port_range else None,
"source_port_range": {
"min": rule.udp_options.source_port_range.min if rule.udp_options and rule.udp_options.source_port_range else None,
"max": rule.udp_options.source_port_range.max if rule.udp_options and rule.udp_options.source_port_range else None,
} if rule.udp_options and rule.udp_options.source_port_range else None,
} if rule.udp_options else None,
"icmp_options": {
"type": rule.icmp_options.type if rule.icmp_options else None,
"code": rule.icmp_options.code if rule.icmp_options else None,
} if rule.icmp_options else None,
})
# Format egress rules
egress_rules = []
if security_list.egress_security_rules:
for rule in security_list.egress_security_rules:
egress_rules.append({
"protocol": rule.protocol,
"destination": rule.destination,
"destination_type": rule.destination_type,
"is_stateless": rule.is_stateless,
"description": rule.description,
"tcp_options": {
"destination_port_range": {
"min": rule.tcp_options.destination_port_range.min if rule.tcp_options and rule.tcp_options.destination_port_range else None,
"max": rule.tcp_options.destination_port_range.max if rule.tcp_options and rule.tcp_options.destination_port_range else None,
} if rule.tcp_options and rule.tcp_options.destination_port_range else None,
"source_port_range": {
"min": rule.tcp_options.source_port_range.min if rule.tcp_options and rule.tcp_options.source_port_range else None,
"max": rule.tcp_options.source_port_range.max if rule.tcp_options and rule.tcp_options.source_port_range else None,
} if rule.tcp_options and rule.tcp_options.source_port_range else None,
} if rule.tcp_options else None,
"udp_options": {
"destination_port_range": {
"min": rule.udp_options.destination_port_range.min if rule.udp_options and rule.udp_options.destination_port_range else None,
"max": rule.udp_options.destination_port_range.max if rule.udp_options and rule.udp_options.destination_port_range else None,
} if rule.udp_options and rule.udp_options.destination_port_range else None,
"source_port_range": {
"min": rule.udp_options.source_port_range.min if rule.udp_options and rule.udp_options.source_port_range else None,
"max": rule.udp_options.source_port_range.max if rule.udp_options and rule.udp_options.source_port_range else None,
} if rule.udp_options and rule.udp_options.source_port_range else None,
} if rule.udp_options else None,
"icmp_options": {
"type": rule.icmp_options.type if rule.icmp_options else None,
"code": rule.icmp_options.code if rule.icmp_options else None,
} if rule.icmp_options else None,
})
security_list_details = {
"id": security_list.id,
"display_name": security_list.display_name,
"compartment_id": security_list.compartment_id,
"vcn_id": security_list.vcn_id,
"lifecycle_state": security_list.lifecycle_state,
"time_created": str(security_list.time_created),
"ingress_security_rules": ingress_rules,
"egress_security_rules": egress_rules,
}
logger.info(f"Retrieved details for security list {security_list_id}")
return security_list_details
except Exception as e:
logger.exception(f"Error getting security list details: {e}")
raise
def list_network_security_groups(network_client: oci.core.VirtualNetworkClient, compartment_id: str,
vcn_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""
List all network security groups in a compartment, optionally filtered by VCN.
Args:
network_client: OCI VirtualNetwork client
compartment_id: OCID of the compartment
vcn_id: Optional OCID of the VCN to filter by
Returns:
List of network security groups with their details
"""
try:
nsgs_response = oci.pagination.list_call_get_all_results(
network_client.list_network_security_groups,
compartment_id,
vcn_id=vcn_id
)
nsgs = []
for nsg in nsgs_response.data:
nsgs.append({
"id": nsg.id,
"display_name": nsg.display_name,
"compartment_id": nsg.compartment_id,
"vcn_id": nsg.vcn_id,
"lifecycle_state": nsg.lifecycle_state,
"time_created": str(nsg.time_created),
})
logger.info(f"Found {len(nsgs)} network security groups in compartment {compartment_id}")
return nsgs
except Exception as e:
logger.exception(f"Error listing network security groups: {e}")
raise
def get_network_security_group(network_client: oci.core.VirtualNetworkClient, nsg_id: str) -> Dict[str, Any]:
"""
Get details of a specific network security group.
Args:
network_client: OCI VirtualNetwork client
nsg_id: OCID of the network security group
Returns:
Details of the network security group
"""
try:
nsg = network_client.get_network_security_group(nsg_id).data
nsg_details = {
"id": nsg.id,
"display_name": nsg.display_name,
"compartment_id": nsg.compartment_id,
"vcn_id": nsg.vcn_id,
"lifecycle_state": nsg.lifecycle_state,
"time_created": str(nsg.time_created),
}
logger.info(f"Retrieved details for network security group {nsg_id}")
return nsg_details
except Exception as e:
logger.exception(f"Error getting network security group details: {e}")
raise
def list_vaults(kms_vault_client: oci.key_management.KmsVaultClient, compartment_id: str) -> List[Dict[str, Any]]:
"""
List all vaults in a compartment.
Args:
kms_vault_client: OCI KmsVault client
compartment_id: OCID of the compartment
Returns:
List of vaults with their details
"""
try:
vaults_response = oci.pagination.list_call_get_all_results(
kms_vault_client.list_vaults,
compartment_id
)
vaults = []
for vault in vaults_response.data:
vaults.append({
"id": vault.id,
"display_name": vault.display_name,
"compartment_id": vault.compartment_id,
"lifecycle_state": vault.lifecycle_state,
"time_created": str(vault.time_created),
"vault_type": vault.vault_type,
"crypto_endpoint": vault.crypto_endpoint,
"management_endpoint": vault.management_endpoint,
"is_primary": vault.is_primary,
})
logger.info(f"Found {len(vaults)} vaults in compartment {compartment_id}")
return vaults
except Exception as e:
logger.exception(f"Error listing vaults: {e}")
raise
def get_vault(kms_vault_client: oci.key_management.KmsVaultClient, vault_id: str) -> Dict[str, Any]:
"""
Get details of a specific vault.
Args:
kms_vault_client: OCI KmsVault client
vault_id: OCID of the vault
Returns:
Details of the vault
"""
try:
vault = kms_vault_client.get_vault(vault_id).data
vault_details = {
"id": vault.id,
"display_name": vault.display_name,
"compartment_id": vault.compartment_id,
"lifecycle_state": vault.lifecycle_state,
"time_created": str(vault.time_created),
"vault_type": vault.vault_type,
"crypto_endpoint": vault.crypto_endpoint,
"management_endpoint": vault.management_endpoint,
"is_primary": vault.is_primary,
"replica_details": {
"replication_id": vault.replica_details.replication_id if vault.replica_details else None,
} if vault.replica_details else None,
}
logger.info(f"Retrieved details for vault {vault_id}")
return vault_details
except Exception as e:
logger.exception(f"Error getting vault details: {e}")
raise
def list_keys(config: dict, management_endpoint: str, compartment_id: str) -> List[Dict[str, Any]]:
"""
List all keys in a vault's compartment.
Args:
config: OCI config dict
management_endpoint: Management endpoint from the vault
compartment_id: OCID of the compartment
Returns:
List of keys with their details
"""
try:
# Create KMS Management client with the vault's management endpoint
kms_management_client = oci.key_management.KmsManagementClient(config, service_endpoint=management_endpoint)
keys_response = oci.pagination.list_call_get_all_results(
kms_management_client.list_keys,
compartment_id
)
keys = []
for key in keys_response.data:
keys.append({
"id": key.id,
"display_name": key.display_name,
"compartment_id": key.compartment_id,
"lifecycle_state": key.lifecycle_state,
"time_created": str(key.time_created),
"vault_id": key.vault_id,
"protection_mode": key.protection_mode,
"algorithm": key.algorithm,
})
logger.info(f"Found {len(keys)} keys in compartment {compartment_id}")
return keys
except Exception as e:
logger.exception(f"Error listing keys: {e}")
raise
def get_key(config: dict, management_endpoint: str, key_id: str) -> Dict[str, Any]:
"""
Get details of a specific key.
Args:
config: OCI config dict
management_endpoint: Management endpoint from the vault
key_id: OCID of the key
Returns:
Details of the key
"""
try:
# Create KMS Management client with the vault's management endpoint
kms_management_client = oci.key_management.KmsManagementClient(config, service_endpoint=management_endpoint)
key = kms_management_client.get_key(key_id).data
key_details = {
"id": key.id,
"display_name": key.display_name,
"compartment_id": key.compartment_id,
"lifecycle_state": key.lifecycle_state,
"time_created": str(key.time_created),
"vault_id": key.vault_id,
"protection_mode": key.protection_mode,
"algorithm": key.algorithm,
"current_key_version": key.current_key_version,
"key_shape": {
"algorithm": key.key_shape.algorithm if key.key_shape else None,
"length": key.key_shape.length if key.key_shape else None,
"curve_id": key.key_shape.curve_id if key.key_shape else None,
} if key.key_shape else None,
"is_primary": key.is_primary,
"replica_details": {
"replication_id": key.replica_details.replication_id if key.replica_details else None,
} if key.replica_details else None,
}
logger.info(f"Retrieved details for key {key_id}")
return key_details
except Exception as e:
logger.exception(f"Error getting key details: {e}")
raise