datadog

import mcp.server.stdio import mcp.types as types from datadog_api_client import ApiClient, Configuration from datadog_api_client.v2.api.logs_api import LogsApi import os import json from loguru import logger from fastmcp import FastMCP from fastmcp.prompts.base import UserMessage, AssistantMessage from typing import Generator from datadog_api_client.v2.models import LogsListResponse from icecream import ic from dotenv import load_dotenv from datadog_api_client.v1.api.monitors_api import MonitorsApi load_dotenv() mcp = FastMCP( "Datadog-MCP-Server", dependencies=[ "loguru", "icecream", "python-dotenv", "datadog-api-client", ], ) def fetch_logs_paginated( api_instance: LogsApi, query_params: dict, max_results: int = 1000 ) -> Generator[LogsListResponse, None, None]: """Fetch logs with pagination support.""" current_page = 0 total_logs = 0 while total_logs < max_results: query_params["page"] = { "limit": min(100, max_results - total_logs), "cursor": current_page, } response = api_instance.list_logs(body=query_params) if not response.data: break yield response total_logs += len(response.data) current_page += 1 def extract_tag_value(tags: list, prefix: str) -> str: """Helper pour extraire une valeur de tag avec un préfixe donné""" for tag in tags: if tag.startswith(prefix): return tag.split(":", 1)[1] return None @mcp.tool() def get_monitor_states( name: str, timeframe: int = 1, ) -> list[types.TextContent]: """ Get monitor states for a specific monitor with retry mechanism Args: name: monitor name timeframe: Hours to look back (default: 1) """ def serialize_monitor(monitor) -> dict: """Helper to serialize monitor data""" return { "id": str(monitor.id), "name": monitor.name, "query": monitor.query, "status": str(monitor.overall_state), "last_triggered": monitor.last_triggered_ts if hasattr(monitor, "last_triggered_ts") else None, "message": monitor.message if hasattr(monitor, "message") else None, "type": monitor.type if hasattr(monitor, "type") else None, "created": str(monitor.created) if hasattr(monitor, "created") else None, "modified": str(monitor.modified) if hasattr(monitor, "modified") else None, } def fetch_monitors(): with ApiClient(configuration) as api_client: monitors_api = MonitorsApi(api_client) # Get all monitors and filter by name response = monitors_api.list_monitors( page_size=100 # 👈 Increased page size ) # Filter monitors by name (case insensitive) monitor_details = [] for monitor in response: if name.lower() in monitor.name.lower(): monitor_details.append(monitor) return monitor_details try: configuration = Configuration() api_key = os.getenv("DD_API_KEY") app_key = os.getenv("DD_APP_KEY") if not api_key or not app_key: return [ types.TextContent( type="text", text="Error: Missing Datadog API credentials" ) ] configuration.api_key["DD-API-KEY"] = api_key configuration.api_key["DD-APPLICATION-KEY"] = app_key configuration.server_variables["site"] = "datadoghq.eu" monitors = fetch_monitors() if not monitors: return [ types.TextContent( type="text", text=f"No monitors found with name containing '{name}'" ) ] # Serialize monitors monitor_states = [serialize_monitor(monitor) for monitor in monitors] return [ types.TextContent( type="text", text=json.dumps( monitor_states, indent=2, default=str ), # 👈 Added default serializer ) ] except ValueError as ve: return [types.TextContent(type="text", text=str(ve))] except Exception as e: logger.error(f"Error fetching monitor states: {str(e)}") return [ types.TextContent( type="text", text=f"Error fetching monitor states: {str(e)}" ) ] @mcp.tool() def get_k8s_logs( cluster: str, timeframe: int = 5, namespace: str = None ) -> list[types.TextContent]: try: configuration = Configuration() api_key = os.getenv("DD_API_KEY") app_key = os.getenv("DD_APP_KEY") configuration.server_variables["site"] = "datadoghq.eu" configuration.api_key["DD-API-KEY"] = api_key configuration.api_key["DD-APPLICATION-KEY"] = app_key with ApiClient(configuration) as api_client: api_instance = LogsApi(api_client) # Construction d'une requête plus précise pour les erreurs query_components = [ # "source:kubernetes", f"kube_cluster_name:{cluster}", "status:error OR level:error OR severity:error", # 👈 Filtre des erreurs ] if namespace: query_components.append(f"kube_namespace:{namespace}") query = " AND ".join(query_components) response = api_instance.list_logs( body={ "filter": { "query": query, "from": f"now-{timeframe}h", # 👈 Timeframe dynamique "to": "now", }, "sort": "-timestamp", # Plus récent d'abord "page": { "limit": 100, # Augmenté pour voir plus d'erreurs }, } ) # Formatage plus pertinent de la réponse ic(f"Query: {query}") # 👈 Log de la requête # ic(f"Response: {response}") # 👈 Log de la réponse brute logs_data = response.to_dict() # ic(f"Logs data: {logs_data}") # 👈 Log des données formatted_logs = [] for log in logs_data.get("data", []): attributes = log.get("attributes", {}) ic(attributes) formatted_logs.append( { "timestamp": attributes.get("timestamp"), "host": attributes.get("host"), "service": attributes.get("service"), "pod_name": extract_tag_value( attributes.get("tags", []), "pod_name:" ), "namespace": extract_tag_value( attributes.get("tags", []), "kube_namespace:" ), "container_name": extract_tag_value( attributes.get("tags", []), "kube_container_name:" ), "message": attributes.get("message"), "status": attributes.get("status"), } ) return [ types.TextContent( type="text", text=json.dumps(formatted_logs, indent=2) ) ] except Exception as e: logger.error(f"Error fetching logs: {str(e)}") return [types.TextContent(type="text", text=f"Error: {str(e)}")] @mcp.prompt() def analyze_monitors_data(name: str, timeframe: int = 3) -> list: """ Analyze monitor data for a specific monitor. Parameters: name (str): The name of the monitor to analyze timeframe (int): Hours to look back for data Returns: list: Structured monitor analysis """ try: monitor_data = get_monitor_states(name=name, timeframe=timeframe) if not monitor_data: return [ AssistantMessage( f"No monitor data found for '{name}' in the last {timeframe} hours." ) ] # Format the response more naturally messages = [ UserMessage(f"Monitor Analysis for '{name}' (last {timeframe} hours):") ] for data in monitor_data: messages.append( AssistantMessage( f"Monitor State: {data['state']}, Timestamp: {data['timestamp']}" ) ) return messages except Exception as e: logger.error(f"Error analyzing monitor data: {str(e)}") return [AssistantMessage(f"Error: {str(e)}")] @mcp.prompt() def analyze_error_logs( cluster: str = "rke2", timeframe: int = 3, namespace: str = None ) -> list: """ Analyze error logs from a Kubernetes cluster. Parameters: cluster (str): The cluster name to analyze timeframe (int): Hours to look back for errors namespace (str): Optional namespace filter Returns: list: Structured error analysis """ logs = get_k8s_logs(cluster=cluster, namespace=namespace, timeframe=timeframe) if not logs: return [ AssistantMessage( f"No error logs found for cluster '{cluster}' in the last {timeframe} hours." ) ] # Format the response more naturally messages = [ UserMessage(f"Error Analysis for cluster '{cluster}' (last {timeframe} hours):") ] try: log_data = json.loads(logs[0].text) if not log_data: messages.append( AssistantMessage("No errors found in the specified timeframe.") ) else: # Group errors by service for better analysis errors_by_service = {} for log in log_data: service = log.get("service", "unknown") if service not in errors_by_service: errors_by_service[service] = [] errors_by_service[service].append(log) for service, errors in errors_by_service.items(): messages.append( AssistantMessage( f"Service {service}: Found {len(errors)} errors\n" + f"Most recent error: {errors[0].get('error_message', 'No message')}" ) ) except json.JSONDecodeError: messages.append(AssistantMessage("Error parsing log data.")) return messages