start_ai_alert_triage
Initiate AI-powered analysis of security alerts to assess risk, analyze events, and provide investigation recommendations.
Instructions
Start an AI-powered triage analysis for a Panther alert with intelligent insights and recommendations.
This tool initiates Panther's embedded AI agent to triage an alert and provide an intelligent report about the events, risk level, potential impact, and recommended next steps for investigation.
The AI triage includes analysis of:
Alert metadata (severity, detection rule, timestamps)
Related events and logs (if available)
Comments from previous investigations
Contextual security analysis and recommendations
Returns: Dict containing: - success: Boolean indicating if triage was generated successfully - summary: The AI-generated triage summary text - stream_id: The stream ID used for this analysis - metadata: Information about the analysis request - message: Error message if unsuccessful
Permissions:{'all_of': ['Run Panther AI']}
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| alert_id | Yes | The ID of the alert to start AI triage for | |
| prompt | No | Optional additional prompt to provide context for the AI triage | |
| timeout_seconds | No | Timeout in seconds to wait for AI triage completion |
Implementation Reference
- src/mcp_panther/panther_mcp_core/tools/alerts.py:947-952 (registration)The @mcp_tool decorator that registers the 'start_ai_alert_triage' tool, specifying permissions (RUN_PANTHER_AI) and readOnlyHint.@mcp_tool( annotations={ "permissions": all_perms(Permission.RUN_PANTHER_AI), "readOnlyHint": True, } )
- Input schema defined via Annotated Pydantic fields: alert_id (str, required), prompt (str | None, optional), timeout_seconds (int, default=180, ge=120 le=300).async def start_ai_alert_triage( alert_id: Annotated[ str, Field(min_length=1, description="The ID of the alert to start AI triage for"), ], prompt: Annotated[ str | None, Field( min_length=1, description="Optional additional prompt to provide context for the AI triage", ), ] = None, timeout_seconds: Annotated[ int, Field( description="Timeout in seconds to wait for AI triage completion", ge=120, le=300, ), ] = 180, ) -> dict[str, Any]:
- The core handler logic: constructs GraphQL variables for AI_SUMMARIZE_ALERT_MUTATION to start triage stream, polls AI_INFERENCE_STREAM_QUERY until finished or timeout, accumulates responseText, handles errors and returns summary with metadata."""Start an AI-powered triage analysis for a Panther alert with intelligent insights and recommendations. This tool initiates Panther's embedded AI agent to triage an alert and provide an intelligent report about the events, risk level, potential impact, and recommended next steps for investigation. The AI triage includes analysis of: - Alert metadata (severity, detection rule, timestamps) - Related events and logs (if available) - Comments from previous investigations - Contextual security analysis and recommendations Returns: Dict containing: - success: Boolean indicating if triage was generated successfully - summary: The AI-generated triage summary text - stream_id: The stream ID used for this analysis - metadata: Information about the analysis request - message: Error message if unsuccessful """ logger.info(f"Starting AI triage for alert {alert_id}") try: # Set output length to medium (fixed, not configurable by AI) output_length = "medium" # Prepare the AI summarize request with minimal required fields request_input = { "alertId": alert_id, "outputLength": output_length, "metadata": { "kind": "alert" # Required: tells AI this is an alert analysis }, } # Add optional prompt if provided if prompt: request_input["prompt"] = prompt variables = {"input": request_input} if prompt: logger.info(f"Using additional prompt: {prompt[:100]}...") logger.info(f"Initiating AI triage with output_length={output_length}") # Step 1: Start the AI triage result = await _execute_query(AI_SUMMARIZE_ALERT_MUTATION, variables) if not result or "aiSummarizeAlert" not in result: logger.error("Failed to initiate AI triage") return { "success": False, "message": "Failed to initiate AI triage", } stream_id = result["aiSummarizeAlert"]["streamId"] logger.info(f"AI triage started with stream ID: {stream_id}") # Step 2: Poll for results with timeout start_time = asyncio.get_event_loop().time() poll_interval = 2.0 # Start with 2 second intervals max_poll_interval = 10.0 # Maximum 10 second intervals accumulated_response = "" while True: current_time = asyncio.get_event_loop().time() elapsed = current_time - start_time if elapsed > timeout_seconds: logger.warning(f"AI triage timed out after {timeout_seconds} seconds") return { "success": False, "message": f"AI triage generation timed out after {timeout_seconds} seconds", "stream_id": stream_id, "partial_summary": accumulated_response if accumulated_response else None, } # Poll the inference stream stream_variables = {"streamId": stream_id} stream_result = await _execute_query( AI_INFERENCE_STREAM_QUERY, stream_variables ) if not stream_result or "aiInferenceStream" not in stream_result: logger.error("Failed to poll AI inference stream") await asyncio.sleep(poll_interval) continue inference_data = stream_result["aiInferenceStream"] # Check for errors if inference_data.get("error"): logger.error(f"AI inference error: {inference_data['error']}") return { "success": False, "message": f"AI inference failed: {inference_data['error']}", "stream_id": stream_id, } # Get the latest response text (AI streams the full response each time) response_text = inference_data.get("responseText", "") if response_text: accumulated_response = response_text # Check if finished if inference_data.get("finished", False): logger.info(f"AI triage completed after {elapsed:.1f} seconds") break # Wait before next poll, with exponential backoff await asyncio.sleep(poll_interval) poll_interval = min(poll_interval * 1.2, max_poll_interval) # Return the completed triage return { "success": True, "summary": accumulated_response, "stream_id": stream_id, "metadata": { "alert_id": alert_id, "output_length": output_length, "generation_time_seconds": round(elapsed, 1), "prompt_included": prompt is not None, }, } except Exception as e: logger.error(f"Failed to start AI alert triage: {str(e)}") return { "success": False, "message": f"Failed to start AI alert triage: {str(e)}", }