Skip to main content
Glama

OCI MCP Server

by jopsis
security.py16.6 kB
""" Tools for managing OCI Security resources. """ import logging from typing import Dict, List, Any, Optional import oci logger = logging.getLogger(__name__) def list_security_lists(network_client: oci.core.VirtualNetworkClient, compartment_id: str, vcn_id: Optional[str] = None) -> List[Dict[str, Any]]: """ List all security lists in a compartment, optionally filtered by VCN. Args: network_client: OCI VirtualNetwork client compartment_id: OCID of the compartment vcn_id: Optional OCID of the VCN to filter by Returns: List of security lists with their details """ try: security_lists_response = oci.pagination.list_call_get_all_results( network_client.list_security_lists, compartment_id, vcn_id=vcn_id ) security_lists = [] for security_list in security_lists_response.data: security_lists.append({ "id": security_list.id, "display_name": security_list.display_name, "compartment_id": security_list.compartment_id, "vcn_id": security_list.vcn_id, "lifecycle_state": security_list.lifecycle_state, "time_created": str(security_list.time_created), "ingress_security_rules_count": len(security_list.ingress_security_rules) if security_list.ingress_security_rules else 0, "egress_security_rules_count": len(security_list.egress_security_rules) if security_list.egress_security_rules else 0, }) logger.info(f"Found {len(security_lists)} security lists in compartment {compartment_id}") return security_lists except Exception as e: logger.exception(f"Error listing security lists: {e}") raise def get_security_list(network_client: oci.core.VirtualNetworkClient, security_list_id: str) -> Dict[str, Any]: """ Get details of a specific security list. Args: network_client: OCI VirtualNetwork client security_list_id: OCID of the security list Returns: Details of the security list """ try: security_list = network_client.get_security_list(security_list_id).data # Format ingress rules ingress_rules = [] if security_list.ingress_security_rules: for rule in security_list.ingress_security_rules: ingress_rules.append({ "protocol": rule.protocol, "source": rule.source, "source_type": rule.source_type, "is_stateless": rule.is_stateless, "description": rule.description, "tcp_options": { "destination_port_range": { "min": rule.tcp_options.destination_port_range.min if rule.tcp_options and rule.tcp_options.destination_port_range else None, "max": rule.tcp_options.destination_port_range.max if rule.tcp_options and rule.tcp_options.destination_port_range else None, } if rule.tcp_options and rule.tcp_options.destination_port_range else None, "source_port_range": { "min": rule.tcp_options.source_port_range.min if rule.tcp_options and rule.tcp_options.source_port_range else None, "max": rule.tcp_options.source_port_range.max if rule.tcp_options and rule.tcp_options.source_port_range else None, } if rule.tcp_options and rule.tcp_options.source_port_range else None, } if rule.tcp_options else None, "udp_options": { "destination_port_range": { "min": rule.udp_options.destination_port_range.min if rule.udp_options and rule.udp_options.destination_port_range else None, "max": rule.udp_options.destination_port_range.max if rule.udp_options and rule.udp_options.destination_port_range else None, } if rule.udp_options and rule.udp_options.destination_port_range else None, "source_port_range": { "min": rule.udp_options.source_port_range.min if rule.udp_options and rule.udp_options.source_port_range else None, "max": rule.udp_options.source_port_range.max if rule.udp_options and rule.udp_options.source_port_range else None, } if rule.udp_options and rule.udp_options.source_port_range else None, } if rule.udp_options else None, "icmp_options": { "type": rule.icmp_options.type if rule.icmp_options else None, "code": rule.icmp_options.code if rule.icmp_options else None, } if rule.icmp_options else None, }) # Format egress rules egress_rules = [] if security_list.egress_security_rules: for rule in security_list.egress_security_rules: egress_rules.append({ "protocol": rule.protocol, "destination": rule.destination, "destination_type": rule.destination_type, "is_stateless": rule.is_stateless, "description": rule.description, "tcp_options": { "destination_port_range": { "min": rule.tcp_options.destination_port_range.min if rule.tcp_options and rule.tcp_options.destination_port_range else None, "max": rule.tcp_options.destination_port_range.max if rule.tcp_options and rule.tcp_options.destination_port_range else None, } if rule.tcp_options and rule.tcp_options.destination_port_range else None, "source_port_range": { "min": rule.tcp_options.source_port_range.min if rule.tcp_options and rule.tcp_options.source_port_range else None, "max": rule.tcp_options.source_port_range.max if rule.tcp_options and rule.tcp_options.source_port_range else None, } if rule.tcp_options and rule.tcp_options.source_port_range else None, } if rule.tcp_options else None, "udp_options": { "destination_port_range": { "min": rule.udp_options.destination_port_range.min if rule.udp_options and rule.udp_options.destination_port_range else None, "max": rule.udp_options.destination_port_range.max if rule.udp_options and rule.udp_options.destination_port_range else None, } if rule.udp_options and rule.udp_options.destination_port_range else None, "source_port_range": { "min": rule.udp_options.source_port_range.min if rule.udp_options and rule.udp_options.source_port_range else None, "max": rule.udp_options.source_port_range.max if rule.udp_options and rule.udp_options.source_port_range else None, } if rule.udp_options and rule.udp_options.source_port_range else None, } if rule.udp_options else None, "icmp_options": { "type": rule.icmp_options.type if rule.icmp_options else None, "code": rule.icmp_options.code if rule.icmp_options else None, } if rule.icmp_options else None, }) security_list_details = { "id": security_list.id, "display_name": security_list.display_name, "compartment_id": security_list.compartment_id, "vcn_id": security_list.vcn_id, "lifecycle_state": security_list.lifecycle_state, "time_created": str(security_list.time_created), "ingress_security_rules": ingress_rules, "egress_security_rules": egress_rules, } logger.info(f"Retrieved details for security list {security_list_id}") return security_list_details except Exception as e: logger.exception(f"Error getting security list details: {e}") raise def list_network_security_groups(network_client: oci.core.VirtualNetworkClient, compartment_id: str, vcn_id: Optional[str] = None) -> List[Dict[str, Any]]: """ List all network security groups in a compartment, optionally filtered by VCN. Args: network_client: OCI VirtualNetwork client compartment_id: OCID of the compartment vcn_id: Optional OCID of the VCN to filter by Returns: List of network security groups with their details """ try: nsgs_response = oci.pagination.list_call_get_all_results( network_client.list_network_security_groups, compartment_id, vcn_id=vcn_id ) nsgs = [] for nsg in nsgs_response.data: nsgs.append({ "id": nsg.id, "display_name": nsg.display_name, "compartment_id": nsg.compartment_id, "vcn_id": nsg.vcn_id, "lifecycle_state": nsg.lifecycle_state, "time_created": str(nsg.time_created), }) logger.info(f"Found {len(nsgs)} network security groups in compartment {compartment_id}") return nsgs except Exception as e: logger.exception(f"Error listing network security groups: {e}") raise def get_network_security_group(network_client: oci.core.VirtualNetworkClient, nsg_id: str) -> Dict[str, Any]: """ Get details of a specific network security group. Args: network_client: OCI VirtualNetwork client nsg_id: OCID of the network security group Returns: Details of the network security group """ try: nsg = network_client.get_network_security_group(nsg_id).data nsg_details = { "id": nsg.id, "display_name": nsg.display_name, "compartment_id": nsg.compartment_id, "vcn_id": nsg.vcn_id, "lifecycle_state": nsg.lifecycle_state, "time_created": str(nsg.time_created), } logger.info(f"Retrieved details for network security group {nsg_id}") return nsg_details except Exception as e: logger.exception(f"Error getting network security group details: {e}") raise def list_vaults(kms_vault_client: oci.key_management.KmsVaultClient, compartment_id: str) -> List[Dict[str, Any]]: """ List all vaults in a compartment. Args: kms_vault_client: OCI KmsVault client compartment_id: OCID of the compartment Returns: List of vaults with their details """ try: vaults_response = oci.pagination.list_call_get_all_results( kms_vault_client.list_vaults, compartment_id ) vaults = [] for vault in vaults_response.data: vaults.append({ "id": vault.id, "display_name": vault.display_name, "compartment_id": vault.compartment_id, "lifecycle_state": vault.lifecycle_state, "time_created": str(vault.time_created), "vault_type": vault.vault_type, "crypto_endpoint": vault.crypto_endpoint, "management_endpoint": vault.management_endpoint, "is_primary": vault.is_primary, }) logger.info(f"Found {len(vaults)} vaults in compartment {compartment_id}") return vaults except Exception as e: logger.exception(f"Error listing vaults: {e}") raise def get_vault(kms_vault_client: oci.key_management.KmsVaultClient, vault_id: str) -> Dict[str, Any]: """ Get details of a specific vault. Args: kms_vault_client: OCI KmsVault client vault_id: OCID of the vault Returns: Details of the vault """ try: vault = kms_vault_client.get_vault(vault_id).data vault_details = { "id": vault.id, "display_name": vault.display_name, "compartment_id": vault.compartment_id, "lifecycle_state": vault.lifecycle_state, "time_created": str(vault.time_created), "vault_type": vault.vault_type, "crypto_endpoint": vault.crypto_endpoint, "management_endpoint": vault.management_endpoint, "is_primary": vault.is_primary, "replica_details": { "replication_id": vault.replica_details.replication_id if vault.replica_details else None, } if vault.replica_details else None, } logger.info(f"Retrieved details for vault {vault_id}") return vault_details except Exception as e: logger.exception(f"Error getting vault details: {e}") raise def list_keys(config: dict, management_endpoint: str, compartment_id: str) -> List[Dict[str, Any]]: """ List all keys in a vault's compartment. Args: config: OCI config dict management_endpoint: Management endpoint from the vault compartment_id: OCID of the compartment Returns: List of keys with their details """ try: # Create KMS Management client with the vault's management endpoint kms_management_client = oci.key_management.KmsManagementClient(config, service_endpoint=management_endpoint) keys_response = oci.pagination.list_call_get_all_results( kms_management_client.list_keys, compartment_id ) keys = [] for key in keys_response.data: keys.append({ "id": key.id, "display_name": key.display_name, "compartment_id": key.compartment_id, "lifecycle_state": key.lifecycle_state, "time_created": str(key.time_created), "vault_id": key.vault_id, "protection_mode": key.protection_mode, "algorithm": key.algorithm, }) logger.info(f"Found {len(keys)} keys in compartment {compartment_id}") return keys except Exception as e: logger.exception(f"Error listing keys: {e}") raise def get_key(config: dict, management_endpoint: str, key_id: str) -> Dict[str, Any]: """ Get details of a specific key. Args: config: OCI config dict management_endpoint: Management endpoint from the vault key_id: OCID of the key Returns: Details of the key """ try: # Create KMS Management client with the vault's management endpoint kms_management_client = oci.key_management.KmsManagementClient(config, service_endpoint=management_endpoint) key = kms_management_client.get_key(key_id).data key_details = { "id": key.id, "display_name": key.display_name, "compartment_id": key.compartment_id, "lifecycle_state": key.lifecycle_state, "time_created": str(key.time_created), "vault_id": key.vault_id, "protection_mode": key.protection_mode, "algorithm": key.algorithm, "current_key_version": key.current_key_version, "key_shape": { "algorithm": key.key_shape.algorithm if key.key_shape else None, "length": key.key_shape.length if key.key_shape else None, "curve_id": key.key_shape.curve_id if key.key_shape else None, } if key.key_shape else None, "is_primary": key.is_primary, "replica_details": { "replication_id": key.replica_details.replication_id if key.replica_details else None, } if key.replica_details else None, } logger.info(f"Retrieved details for key {key_id}") return key_details except Exception as e: logger.exception(f"Error getting key details: {e}") raise

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jopsis/mcp-server-oci'

If you have feedback or need assistance with the MCP directory API, please join our Discord server