monitoring.py•12.4 kB
"""
Tools for managing OCI Monitoring and Observability resources.
"""
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import oci
logger = logging.getLogger(__name__)
def list_alarms(monitoring_client: oci.monitoring.MonitoringClient, compartment_id: str) -> List[Dict[str, Any]]:
    """
    List all alarms in a compartment.
    Args:
        monitoring_client: OCI Monitoring client
        compartment_id: OCID of the compartment
    Returns:
        List of alarms with their details
    """
    try:
        alarms_response = oci.pagination.list_call_get_all_results(
            monitoring_client.list_alarms,
            compartment_id
        )
        alarms = []
        for alarm in alarms_response.data:
            alarms.append({
                "id": alarm.id,
                "display_name": alarm.display_name,
                "compartment_id": alarm.compartment_id,
                "metric_compartment_id": alarm.metric_compartment_id,
                "namespace": alarm.namespace,
                "query": alarm.query,
                "severity": alarm.severity,
                "lifecycle_state": alarm.lifecycle_state,
                "is_enabled": alarm.is_enabled,
                "destinations": alarm.destinations,
                "time_created": str(alarm.time_created),
                "time_updated": str(alarm.time_updated),
            })
        logger.info(f"Found {len(alarms)} alarms in compartment {compartment_id}")
        return alarms
    except Exception as e:
        logger.exception(f"Error listing alarms: {e}")
        raise
def get_alarm(monitoring_client: oci.monitoring.MonitoringClient, alarm_id: str) -> Dict[str, Any]:
    """
    Get details of a specific alarm.
    Args:
        monitoring_client: OCI Monitoring client
        alarm_id: OCID of the alarm
    Returns:
        Details of the alarm
    """
    try:
        alarm = monitoring_client.get_alarm(alarm_id).data
        alarm_details = {
            "id": alarm.id,
            "display_name": alarm.display_name,
            "compartment_id": alarm.compartment_id,
            "metric_compartment_id": alarm.metric_compartment_id,
            "metric_compartment_id_in_subtree": alarm.metric_compartment_id_in_subtree,
            "namespace": alarm.namespace,
            "resource_group": alarm.resource_group,
            "query": alarm.query,
            "resolution": alarm.resolution,
            "pending_duration": alarm.pending_duration,
            "severity": alarm.severity,
            "body": alarm.body,
            "is_enabled": alarm.is_enabled,
            "lifecycle_state": alarm.lifecycle_state,
            "suppress": {
                "time_suppress_from": str(alarm.suppression.time_suppress_from) if alarm.suppression else None,
                "time_suppress_until": str(alarm.suppression.time_suppress_until) if alarm.suppression else None,
                "description": alarm.suppression.description if alarm.suppression else None,
            } if alarm.suppression else None,
            "destinations": alarm.destinations,
            "repeat_notification_duration": alarm.repeat_notification_duration,
            "time_created": str(alarm.time_created),
            "time_updated": str(alarm.time_updated),
        }
        logger.info(f"Retrieved details for alarm {alarm_id}")
        return alarm_details
    except Exception as e:
        logger.exception(f"Error getting alarm details: {e}")
        raise
def get_alarm_history(monitoring_client: oci.monitoring.MonitoringClient,
                      alarm_id: str,
                      alarm_historytype: str = "STATE_TRANSITION_HISTORY") -> List[Dict[str, Any]]:
    """
    Get alarm state history.
    Args:
        monitoring_client: OCI Monitoring client
        alarm_id: OCID of the alarm
        alarm_historytype: Type of history (STATE_TRANSITION_HISTORY, STATE_HISTORY, RULE_HISTORY)
    Returns:
        List of alarm history entries
    """
    try:
        history_response = oci.pagination.list_call_get_all_results(
            monitoring_client.get_alarm_history,
            alarm_id,
            alarm_historytype=alarm_historytype
        )
        history = []
        for entry in history_response.data:
            history.append({
                "summary": entry.summary,
                "timestamp": str(entry.timestamp),
                "timestamp_triggered": str(entry.timestamp_triggered) if hasattr(entry, 'timestamp_triggered') and entry.timestamp_triggered else None,
            })
        logger.info(f"Retrieved {len(history)} history entries for alarm {alarm_id}")
        return history
    except Exception as e:
        logger.exception(f"Error getting alarm history: {e}")
        raise
def list_metrics(monitoring_client: oci.monitoring.MonitoringClient,
                 compartment_id: str,
                 namespace: Optional[str] = None) -> List[Dict[str, Any]]:
    """
    List available metrics in a compartment.
    Args:
        monitoring_client: OCI Monitoring client
        compartment_id: OCID of the compartment
        namespace: Optional namespace to filter metrics
    Returns:
        List of available metrics
    """
    try:
        list_metrics_details = oci.monitoring.models.ListMetricsDetails(
            compartment_id=compartment_id,
            namespace=namespace
        )
        metrics_response = oci.pagination.list_call_get_all_results(
            monitoring_client.list_metrics,
            compartment_id,
            list_metrics_details
        )
        metrics = []
        for metric in metrics_response.data:
            metrics.append({
                "name": metric.name,
                "namespace": metric.namespace,
                "resource_group": metric.resource_group,
                "compartment_id": metric.compartment_id,
                "dimensions": metric.dimensions,
            })
        logger.info(f"Found {len(metrics)} metrics in compartment {compartment_id}")
        return metrics
    except Exception as e:
        logger.exception(f"Error listing metrics: {e}")
        raise
def query_metric_data(monitoring_client: oci.monitoring.MonitoringClient,
                      compartment_id: str,
                      query: str,
                      start_time: str,
                      end_time: str,
                      resolution: str = "1m") -> List[Dict[str, Any]]:
    """
    Query metric data for a time range.
    Args:
        monitoring_client: OCI Monitoring client
        compartment_id: OCID of the compartment
        query: Metric query in MQL format
        start_time: Start time in ISO format (YYYY-MM-DDTHH:MM:SSZ)
        end_time: End time in ISO format (YYYY-MM-DDTHH:MM:SSZ)
        resolution: Data resolution (1m, 5m, 1h)
    Returns:
        List of metric data points
    """
    try:
        summarize_metrics_data_details = oci.monitoring.models.SummarizeMetricsDataDetails(
            namespace="oci_computeagent",  # This will be overridden by the query
            query=query,
            start_time=datetime.fromisoformat(start_time.replace('Z', '+00:00')),
            end_time=datetime.fromisoformat(end_time.replace('Z', '+00:00')),
            resolution=resolution
        )
        metrics_data_response = monitoring_client.summarize_metrics_data(
            compartment_id,
            summarize_metrics_data_details
        )
        data_points = []
        for metric_data in metrics_data_response.data:
            data_points.append({
                "namespace": metric_data.namespace,
                "resource_group": metric_data.resource_group,
                "compartment_id": metric_data.compartment_id,
                "name": metric_data.name,
                "dimensions": metric_data.dimensions,
                "aggregated_datapoints": [
                    {
                        "timestamp": str(dp.timestamp),
                        "value": dp.value
                    }
                    for dp in metric_data.aggregated_datapoints
                ] if metric_data.aggregated_datapoints else [],
                "resolution": metric_data.resolution,
            })
        logger.info(f"Retrieved metric data with {len(data_points)} series")
        return data_points
    except Exception as e:
        logger.exception(f"Error querying metric data: {e}")
        raise
def search_logs(logging_search_client: oci.loggingsearch.LogSearchClient,
                search_logs_details: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    Search logs using the Logging Search API.
    Args:
        logging_search_client: OCI LoggingSearch client
        search_logs_details: Search parameters including query, time range, and limit
    Returns:
        List of log entries matching the search criteria
    """
    try:
        search_details = oci.loggingsearch.models.SearchLogsDetails(
            time_start=datetime.fromisoformat(search_logs_details['time_start'].replace('Z', '+00:00')),
            time_end=datetime.fromisoformat(search_logs_details['time_end'].replace('Z', '+00:00')),
            search_query=search_logs_details['search_query'],
            is_return_field_info=search_logs_details.get('is_return_field_info', False)
        )
        logs_response = logging_search_client.search_logs(search_details)
        log_entries = []
        for result in logs_response.data.results:
            log_entries.append({
                "time": str(result.time),
                "log_content": result.data,
            })
        logger.info(f"Found {len(log_entries)} log entries")
        return log_entries
    except Exception as e:
        logger.exception(f"Error searching logs: {e}")
        raise
def list_log_groups(logging_client: oci.logging.LoggingManagementClient,
                    compartment_id: str) -> List[Dict[str, Any]]:
    """
    List all log groups in a compartment.
    Args:
        logging_client: OCI Logging Management client
        compartment_id: OCID of the compartment
    Returns:
        List of log groups with their details
    """
    try:
        log_groups_response = oci.pagination.list_call_get_all_results(
            logging_client.list_log_groups,
            compartment_id
        )
        log_groups = []
        for log_group in log_groups_response.data:
            log_groups.append({
                "id": log_group.id,
                "display_name": log_group.display_name,
                "description": log_group.description,
                "compartment_id": log_group.compartment_id,
                "time_created": str(log_group.time_created),
                "time_last_modified": str(log_group.time_last_modified),
                "lifecycle_state": log_group.lifecycle_state,
            })
        logger.info(f"Found {len(log_groups)} log groups in compartment {compartment_id}")
        return log_groups
    except Exception as e:
        logger.exception(f"Error listing log groups: {e}")
        raise
def list_logs(logging_client: oci.logging.LoggingManagementClient,
              log_group_id: str) -> List[Dict[str, Any]]:
    """
    List all logs in a log group.
    Args:
        logging_client: OCI Logging Management client
        log_group_id: OCID of the log group
    Returns:
        List of logs with their details
    """
    try:
        logs_response = oci.pagination.list_call_get_all_results(
            logging_client.list_logs,
            log_group_id
        )
        logs = []
        for log in logs_response.data:
            logs.append({
                "id": log.id,
                "log_group_id": log.log_group_id,
                "display_name": log.display_name,
                "log_type": log.log_type,
                "lifecycle_state": log.lifecycle_state,
                "is_enabled": log.is_enabled,
                "retention_duration": log.retention_duration,
                "compartment_id": log.compartment_id,
                "time_created": str(log.time_created),
                "time_last_modified": str(log.time_last_modified),
            })
        logger.info(f"Found {len(logs)} logs in log group {log_group_id}")
        return logs
    except Exception as e:
        logger.exception(f"Error listing logs: {e}")
        raise