Skip to main content
Glama

OCI MCP Server

by jopsis
oke.py25.6 kB
""" Tools for managing OCI Container Engine for Kubernetes (OKE) resources. """ import logging from typing import Dict, List, Any, Optional import base64 import oci logger = logging.getLogger(__name__) def list_clusters(container_engine_client: oci.container_engine.ContainerEngineClient, compartment_id: str) -> List[Dict[str, Any]]: """ List all OKE clusters in a compartment. Args: container_engine_client: OCI ContainerEngine client compartment_id: OCID of the compartment Returns: List of clusters with their details """ try: clusters_response = oci.pagination.list_call_get_all_results( container_engine_client.list_clusters, compartment_id ) clusters = [] for cluster in clusters_response.data: clusters.append({ "id": cluster.id, "name": cluster.name, "compartment_id": cluster.compartment_id, "lifecycle_state": cluster.lifecycle_state, "lifecycle_details": cluster.lifecycle_details, "vcn_id": cluster.vcn_id, "kubernetes_version": cluster.kubernetes_version, "time_created": str(cluster.time_created) if cluster.time_created else None, "time_updated": str(cluster.time_updated) if cluster.time_updated else None, "endpoint_config": { "subnet_id": cluster.endpoint_config.subnet_id if cluster.endpoint_config else None, "is_public_ip_enabled": cluster.endpoint_config.is_public_ip_enabled if cluster.endpoint_config else None, } if cluster.endpoint_config else None, "type": cluster.type if hasattr(cluster, 'type') else None, }) logger.info(f"Found {len(clusters)} OKE clusters in compartment {compartment_id}") return clusters except Exception as e: logger.exception(f"Error listing OKE clusters: {e}") raise def get_cluster(container_engine_client: oci.container_engine.ContainerEngineClient, cluster_id: str) -> Dict[str, Any]: """ Get details of a specific OKE cluster. Args: container_engine_client: OCI ContainerEngine client cluster_id: OCID of the cluster Returns: Details of the cluster """ try: cluster = container_engine_client.get_cluster(cluster_id).data # Format endpoint config endpoint_config = None if cluster.endpoint_config: endpoint_config = { "subnet_id": cluster.endpoint_config.subnet_id, "nsg_ids": cluster.endpoint_config.nsg_ids, "is_public_ip_enabled": cluster.endpoint_config.is_public_ip_enabled, } # Format endpoints endpoints = None if cluster.endpoints: endpoints = { "kubernetes": cluster.endpoints.kubernetes if hasattr(cluster.endpoints, 'kubernetes') else None, "public_endpoint": cluster.endpoints.public_endpoint if hasattr(cluster.endpoints, 'public_endpoint') else None, "private_endpoint": cluster.endpoints.private_endpoint if hasattr(cluster.endpoints, 'private_endpoint') else None, "vcn_hostname_endpoint": cluster.endpoints.vcn_hostname_endpoint if hasattr(cluster.endpoints, 'vcn_hostname_endpoint') else None, } # Format cluster metadata metadata = None if cluster.metadata: metadata = { "time_created": str(cluster.metadata.time_created) if hasattr(cluster.metadata, 'time_created') and cluster.metadata.time_created else None, "created_by_user_id": cluster.metadata.created_by_user_id if hasattr(cluster.metadata, 'created_by_user_id') else None, "created_by_work_request_id": cluster.metadata.created_by_work_request_id if hasattr(cluster.metadata, 'created_by_work_request_id') else None, "time_updated": str(cluster.metadata.time_updated) if hasattr(cluster.metadata, 'time_updated') and cluster.metadata.time_updated else None, "updated_by_user_id": cluster.metadata.updated_by_user_id if hasattr(cluster.metadata, 'updated_by_user_id') else None, "updated_by_work_request_id": cluster.metadata.updated_by_work_request_id if hasattr(cluster.metadata, 'updated_by_work_request_id') else None, } # Format options options = None if cluster.options: options = { "service_lb_subnet_ids": cluster.options.service_lb_subnet_ids if hasattr(cluster.options, 'service_lb_subnet_ids') else None, "kubernetes_network_config": { "pods_cidr": cluster.options.kubernetes_network_config.pods_cidr if cluster.options.kubernetes_network_config else None, "services_cidr": cluster.options.kubernetes_network_config.services_cidr if cluster.options.kubernetes_network_config else None, } if hasattr(cluster.options, 'kubernetes_network_config') and cluster.options.kubernetes_network_config else None, "add_ons": { "is_kubernetes_dashboard_enabled": cluster.options.add_ons.is_kubernetes_dashboard_enabled if cluster.options.add_ons and hasattr(cluster.options.add_ons, 'is_kubernetes_dashboard_enabled') else None, "is_tiller_enabled": cluster.options.add_ons.is_tiller_enabled if cluster.options.add_ons and hasattr(cluster.options.add_ons, 'is_tiller_enabled') else None, } if hasattr(cluster.options, 'add_ons') and cluster.options.add_ons else None, "admission_controller_options": { "is_pod_security_policy_enabled": cluster.options.admission_controller_options.is_pod_security_policy_enabled if cluster.options.admission_controller_options and hasattr(cluster.options.admission_controller_options, 'is_pod_security_policy_enabled') else None, } if hasattr(cluster.options, 'admission_controller_options') and cluster.options.admission_controller_options else None, "persistent_volume_config": { "defined_tags": cluster.options.persistent_volume_config.defined_tags if cluster.options.persistent_volume_config and hasattr(cluster.options.persistent_volume_config, 'defined_tags') else None, "freeform_tags": cluster.options.persistent_volume_config.freeform_tags if cluster.options.persistent_volume_config and hasattr(cluster.options.persistent_volume_config, 'freeform_tags') else None, } if hasattr(cluster.options, 'persistent_volume_config') and cluster.options.persistent_volume_config else None, "service_lb_config": { "defined_tags": cluster.options.service_lb_config.defined_tags if cluster.options.service_lb_config and hasattr(cluster.options.service_lb_config, 'defined_tags') else None, "freeform_tags": cluster.options.service_lb_config.freeform_tags if cluster.options.service_lb_config and hasattr(cluster.options.service_lb_config, 'freeform_tags') else None, } if hasattr(cluster.options, 'service_lb_config') and cluster.options.service_lb_config else None, } cluster_details = { "id": cluster.id, "name": cluster.name, "compartment_id": cluster.compartment_id, "lifecycle_state": cluster.lifecycle_state, "lifecycle_details": cluster.lifecycle_details, "vcn_id": cluster.vcn_id, "kubernetes_version": cluster.kubernetes_version, "time_created": str(cluster.time_created) if cluster.time_created else None, "time_updated": str(cluster.time_updated) if cluster.time_updated else None, "endpoint_config": endpoint_config, "endpoints": endpoints, "metadata": metadata, "options": options, "available_kubernetes_upgrades": cluster.available_kubernetes_upgrades if hasattr(cluster, 'available_kubernetes_upgrades') else None, "image_policy_config": { "is_policy_enabled": cluster.image_policy_config.is_policy_enabled if cluster.image_policy_config and hasattr(cluster.image_policy_config, 'is_policy_enabled') else None, } if hasattr(cluster, 'image_policy_config') and cluster.image_policy_config else None, "cluster_pod_network_options": cluster.cluster_pod_network_options if hasattr(cluster, 'cluster_pod_network_options') else None, "type": cluster.type if hasattr(cluster, 'type') else None, "freeform_tags": cluster.freeform_tags if hasattr(cluster, 'freeform_tags') else None, "defined_tags": cluster.defined_tags if hasattr(cluster, 'defined_tags') else None, } logger.info(f"Retrieved details for OKE cluster {cluster_id}") return cluster_details except Exception as e: logger.exception(f"Error getting OKE cluster details: {e}") raise def list_node_pools(container_engine_client: oci.container_engine.ContainerEngineClient, compartment_id: str, cluster_id: Optional[str] = None) -> List[Dict[str, Any]]: """ List all node pools in a compartment, optionally filtered by cluster. Args: container_engine_client: OCI ContainerEngine client compartment_id: OCID of the compartment cluster_id: Optional OCID of the cluster to filter by Returns: List of node pools with their details """ try: kwargs = {"compartment_id": compartment_id} if cluster_id: kwargs["cluster_id"] = cluster_id node_pools_response = oci.pagination.list_call_get_all_results( container_engine_client.list_node_pools, **kwargs ) node_pools = [] for np in node_pools_response.data: node_pools.append({ "id": np.id, "name": np.name, "compartment_id": np.compartment_id, "cluster_id": np.cluster_id, "lifecycle_state": np.lifecycle_state, "lifecycle_details": np.lifecycle_details, "kubernetes_version": np.kubernetes_version, "node_image_name": np.node_image_name if hasattr(np, 'node_image_name') else None, "node_shape": np.node_shape, "quantity_per_subnet": np.quantity_per_subnet if hasattr(np, 'quantity_per_subnet') else None, "subnet_ids": np.subnet_ids if hasattr(np, 'subnet_ids') else None, "node_config_details": { "size": np.node_config_details.size if np.node_config_details else None, "placement_configs": [ { "availability_domain": pc.availability_domain, "subnet_id": pc.subnet_id, "capacity_reservation_id": pc.capacity_reservation_id if hasattr(pc, 'capacity_reservation_id') else None, } for pc in np.node_config_details.placement_configs ] if np.node_config_details and hasattr(np.node_config_details, 'placement_configs') and np.node_config_details.placement_configs else [], } if hasattr(np, 'node_config_details') and np.node_config_details else None, "time_created": str(np.time_created) if hasattr(np, 'time_created') and np.time_created else None, }) logger.info(f"Found {len(node_pools)} node pools in compartment {compartment_id}" + (f" for cluster {cluster_id}" if cluster_id else "")) return node_pools except Exception as e: logger.exception(f"Error listing node pools: {e}") raise def get_node_pool(container_engine_client: oci.container_engine.ContainerEngineClient, node_pool_id: str) -> Dict[str, Any]: """ Get details of a specific node pool. Args: container_engine_client: OCI ContainerEngine client node_pool_id: OCID of the node pool Returns: Details of the node pool """ try: np = container_engine_client.get_node_pool(node_pool_id).data # Format node config details node_config_details = None if hasattr(np, 'node_config_details') and np.node_config_details: placement_configs = [] if hasattr(np.node_config_details, 'placement_configs') and np.node_config_details.placement_configs: for pc in np.node_config_details.placement_configs: placement_configs.append({ "availability_domain": pc.availability_domain, "subnet_id": pc.subnet_id, "capacity_reservation_id": pc.capacity_reservation_id if hasattr(pc, 'capacity_reservation_id') else None, "fault_domains": pc.fault_domains if hasattr(pc, 'fault_domains') else None, }) node_config_details = { "size": np.node_config_details.size, "placement_configs": placement_configs, "nsg_ids": np.node_config_details.nsg_ids if hasattr(np.node_config_details, 'nsg_ids') else None, "kms_key_id": np.node_config_details.kms_key_id if hasattr(np.node_config_details, 'kms_key_id') else None, "is_pv_encryption_in_transit_enabled": np.node_config_details.is_pv_encryption_in_transit_enabled if hasattr(np.node_config_details, 'is_pv_encryption_in_transit_enabled') else None, "freeform_tags": np.node_config_details.freeform_tags if hasattr(np.node_config_details, 'freeform_tags') else None, "defined_tags": np.node_config_details.defined_tags if hasattr(np.node_config_details, 'defined_tags') else None, } # Format node shape config node_shape_config = None if hasattr(np, 'node_shape_config') and np.node_shape_config: node_shape_config = { "ocpus": np.node_shape_config.ocpus if hasattr(np.node_shape_config, 'ocpus') else None, "memory_in_gbs": np.node_shape_config.memory_in_gbs if hasattr(np.node_shape_config, 'memory_in_gbs') else None, } # Format node source details node_source_details = None if hasattr(np, 'node_source_details') and np.node_source_details: node_source_details = { "source_type": np.node_source_details.source_type, "image_id": np.node_source_details.image_id if hasattr(np.node_source_details, 'image_id') else None, "boot_volume_size_in_gbs": np.node_source_details.boot_volume_size_in_gbs if hasattr(np.node_source_details, 'boot_volume_size_in_gbs') else None, } # Format initial node labels initial_node_labels = [] if hasattr(np, 'initial_node_labels') and np.initial_node_labels: for label in np.initial_node_labels: initial_node_labels.append({ "key": label.key if hasattr(label, 'key') else None, "value": label.value if hasattr(label, 'value') else None, }) # Format node eviction node pool settings node_eviction_node_pool_settings = None if hasattr(np, 'node_eviction_node_pool_settings') and np.node_eviction_node_pool_settings: node_eviction_node_pool_settings = { "eviction_grace_duration": np.node_eviction_node_pool_settings.eviction_grace_duration if hasattr(np.node_eviction_node_pool_settings, 'eviction_grace_duration') else None, "is_force_delete_after_grace_duration": np.node_eviction_node_pool_settings.is_force_delete_after_grace_duration if hasattr(np.node_eviction_node_pool_settings, 'is_force_delete_after_grace_duration') else None, } # Format node pool cycling details node_pool_cycling_details = None if hasattr(np, 'node_pool_cycling_details') and np.node_pool_cycling_details: node_pool_cycling_details = { "maximum_unavailable": np.node_pool_cycling_details.maximum_unavailable if hasattr(np.node_pool_cycling_details, 'maximum_unavailable') else None, "maximum_surge": np.node_pool_cycling_details.maximum_surge if hasattr(np.node_pool_cycling_details, 'maximum_surge') else None, "is_node_cycling_enabled": np.node_pool_cycling_details.is_node_cycling_enabled if hasattr(np.node_pool_cycling_details, 'is_node_cycling_enabled') else None, } node_pool_details = { "id": np.id, "name": np.name, "compartment_id": np.compartment_id, "cluster_id": np.cluster_id, "lifecycle_state": np.lifecycle_state, "lifecycle_details": np.lifecycle_details, "kubernetes_version": np.kubernetes_version, "node_image_id": np.node_image_id if hasattr(np, 'node_image_id') else None, "node_image_name": np.node_image_name if hasattr(np, 'node_image_name') else None, "node_shape": np.node_shape, "node_shape_config": node_shape_config, "node_source_details": node_source_details, "node_config_details": node_config_details, "initial_node_labels": initial_node_labels, "ssh_public_key": np.ssh_public_key if hasattr(np, 'ssh_public_key') else None, "quantity_per_subnet": np.quantity_per_subnet if hasattr(np, 'quantity_per_subnet') else None, "subnet_ids": np.subnet_ids if hasattr(np, 'subnet_ids') else None, "nodes": [ { "id": node.id if hasattr(node, 'id') else None, "name": node.name if hasattr(node, 'name') else None, "availability_domain": node.availability_domain if hasattr(node, 'availability_domain') else None, "subnet_id": node.subnet_id if hasattr(node, 'subnet_id') else None, "lifecycle_state": node.lifecycle_state if hasattr(node, 'lifecycle_state') else None, "fault_domain": node.fault_domain if hasattr(node, 'fault_domain') else None, "private_ip": node.private_ip if hasattr(node, 'private_ip') else None, "public_ip": node.public_ip if hasattr(node, 'public_ip') else None, "node_error": { "code": node.node_error.code if node.node_error and hasattr(node.node_error, 'code') else None, "message": node.node_error.message if node.node_error and hasattr(node.node_error, 'message') else None, } if hasattr(node, 'node_error') and node.node_error else None, } for node in np.nodes ] if hasattr(np, 'nodes') and np.nodes else [], "node_eviction_node_pool_settings": node_eviction_node_pool_settings, "node_pool_cycling_details": node_pool_cycling_details, "freeform_tags": np.freeform_tags if hasattr(np, 'freeform_tags') else None, "defined_tags": np.defined_tags if hasattr(np, 'defined_tags') else None, "time_created": str(np.time_created) if hasattr(np, 'time_created') and np.time_created else None, } logger.info(f"Retrieved details for node pool {node_pool_id}") return node_pool_details except Exception as e: logger.exception(f"Error getting node pool details: {e}") raise def get_cluster_kubeconfig(container_engine_client: oci.container_engine.ContainerEngineClient, cluster_id: str) -> Dict[str, Any]: """ Get the kubeconfig for a specific OKE cluster. Args: container_engine_client: OCI ContainerEngine client cluster_id: OCID of the cluster Returns: Kubeconfig content and metadata """ try: # Create kubeconfig request create_kubeconfig_details = oci.container_engine.models.CreateClusterKubeconfigContentDetails() # Get the kubeconfig kubeconfig_response = container_engine_client.create_kubeconfig( cluster_id, create_kubeconfig_details ) # Read the kubeconfig content kubeconfig_content = kubeconfig_response.data.content.decode('utf-8') if hasattr(kubeconfig_response.data, 'content') else kubeconfig_response.data.text result = { "cluster_id": cluster_id, "kubeconfig": kubeconfig_content, "format": "yaml", "usage": "Save this content to ~/.kube/config or use with kubectl --kubeconfig flag", } logger.info(f"Retrieved kubeconfig for cluster {cluster_id}") return result except Exception as e: logger.exception(f"Error getting cluster kubeconfig: {e}") raise def list_work_requests(container_engine_client: oci.container_engine.ContainerEngineClient, compartment_id: str, resource_id: Optional[str] = None) -> List[Dict[str, Any]]: """ List work requests in a compartment, optionally filtered by resource. Args: container_engine_client: OCI ContainerEngine client compartment_id: OCID of the compartment resource_id: Optional OCID of the resource to filter by (cluster or node pool) Returns: List of work requests with their details """ try: kwargs = {"compartment_id": compartment_id} if resource_id: kwargs["resource_id"] = resource_id work_requests_response = oci.pagination.list_call_get_all_results( container_engine_client.list_work_requests, **kwargs ) work_requests = [] for wr in work_requests_response.data: work_requests.append({ "id": wr.id, "operation_type": wr.operation_type, "status": wr.status, "compartment_id": wr.compartment_id, "resources": [ { "action_type": res.action_type if hasattr(res, 'action_type') else None, "entity_type": res.entity_type if hasattr(res, 'entity_type') else None, "identifier": res.identifier if hasattr(res, 'identifier') else None, "entity_uri": res.entity_uri if hasattr(res, 'entity_uri') else None, } for res in wr.resources ] if hasattr(wr, 'resources') and wr.resources else [], "percent_complete": wr.percent_complete if hasattr(wr, 'percent_complete') else None, "time_accepted": str(wr.time_accepted) if hasattr(wr, 'time_accepted') and wr.time_accepted else None, "time_started": str(wr.time_started) if hasattr(wr, 'time_started') and wr.time_started else None, "time_finished": str(wr.time_finished) if hasattr(wr, 'time_finished') and wr.time_finished else None, }) logger.info(f"Found {len(work_requests)} work requests in compartment {compartment_id}" + (f" for resource {resource_id}" if resource_id else "")) return work_requests except Exception as e: logger.exception(f"Error listing work requests: {e}") raise def get_work_request(container_engine_client: oci.container_engine.ContainerEngineClient, work_request_id: str) -> Dict[str, Any]: """ Get details of a specific work request. Args: container_engine_client: OCI ContainerEngine client work_request_id: OCID of the work request Returns: Details of the work request """ try: wr = container_engine_client.get_work_request(work_request_id).data work_request_details = { "id": wr.id, "operation_type": wr.operation_type, "status": wr.status, "compartment_id": wr.compartment_id, "resources": [ { "action_type": res.action_type if hasattr(res, 'action_type') else None, "entity_type": res.entity_type if hasattr(res, 'entity_type') else None, "identifier": res.identifier if hasattr(res, 'identifier') else None, "entity_uri": res.entity_uri if hasattr(res, 'entity_uri') else None, } for res in wr.resources ] if hasattr(wr, 'resources') and wr.resources else [], "percent_complete": wr.percent_complete if hasattr(wr, 'percent_complete') else None, "time_accepted": str(wr.time_accepted) if hasattr(wr, 'time_accepted') and wr.time_accepted else None, "time_started": str(wr.time_started) if hasattr(wr, 'time_started') and wr.time_started else None, "time_finished": str(wr.time_finished) if hasattr(wr, 'time_finished') and wr.time_finished else None, } logger.info(f"Retrieved details for work request {work_request_id}") return work_request_details except Exception as e: logger.exception(f"Error getting work request details: {e}") raise

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jopsis/mcp-server-oci'

If you have feedback or need assistance with the MCP directory API, please join our Discord server