list_detections
Retrieve and filter security detections from Panther by type, severity, state, tags, and other criteria to monitor rules and policies.
Instructions
List detections from your Panther instance with support for multiple detection types and filtering.
Note: The output_ids filter is applied client-side after fetching all results from the API, as the Panther REST API does not support server-side filtering by outputID. For more efficient API-level filtering, consider using the 'tag' parameter if your detections are tagged by environment.
Permissions:{'all_of': ['View Rules', 'View Policies']}
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| detection_types | No | One or more detection types - rules, scheduled_rules, simple_rules, or policies. | |
| cursor | No | Optional cursor for pagination from a previous query (only supported for single detection type) | |
| limit | No | Maximum number of results to return per detection type | |
| name_contains | No | Substring search by name (case-insensitive) | |
| state | No | Filter by state - 'enabled' or 'disabled' | |
| severity | No | Filter by severity levels - INFO, LOW, MEDIUM, HIGH, or CRITICAL. | |
| tag | No | A case-insensitive list of tags to filter by. | |
| log_type | No | A list of log types to filter by (applies to rules and simple-rules only). | |
| resource_type | No | Filter by resource types (applies to policies only) - list of resource type names | |
| compliance_status | No | Filter by compliance status (applies to policies only) - 'PASS', 'FAIL', or 'ERROR' | |
| created_by | No | Filter by creator user ID or actor ID | |
| last_modified_by | No | Filter by last modifier user ID or actor ID | |
| output_ids | No | Client-side filter by destination output IDs. Filters results after fetching from API to include only detections with at least one matching outputID. |
Implementation Reference
- Core handler for list_detections tool. Includes @mcp_tool decorator, input schema via Annotated[Field], validation helpers integrated, API interaction logic, client-side filtering, and structured response formatting.@mcp_tool( annotations={ "permissions": all_perms(Permission.RULE_READ, Permission.POLICY_READ), "readOnlyHint": True, } ) async def list_detections( detection_types: Annotated[ list[str], Field( description="One or more detection types - rules, scheduled_rules, simple_rules, or policies.", examples=[ ["rules", "simple_rules", "scheduled_rules"], ["policies"], ], ), ] = ["rules"], cursor: Annotated[ str | None, Field( description="Optional cursor for pagination from a previous query (only supported for single detection type)" ), ] = None, limit: Annotated[ int, Field( description="Maximum number of results to return per detection type", default=100, ge=1, le=1000, ), ] = 100, name_contains: Annotated[ str | None, Field(description="Substring search by name (case-insensitive)") ] = None, state: Annotated[ str, Field( description="Filter by state - 'enabled' or 'disabled'", default="enabled" ), ] = "", severity: Annotated[ list[str], Field( description="Filter by severity levels - INFO, LOW, MEDIUM, HIGH, or CRITICAL.", examples=[ ["MEDIUM", "HIGH", "CRITICAL"], ["INFO", "LOW"], ], ), ] = [], tag: Annotated[ list[str], Field( description="A case-insensitive list of tags to filter by.", examples=[["Initial Access", "Persistence"]], ), ] = [], log_type: Annotated[ list[str], Field( description="A list of log types to filter by (applies to rules and simple-rules only).", examples=[["AWS.CloudTrail", "GCP.AuditLog"]], ), ] = [], resource_type: Annotated[ list[str], Field( description="Filter by resource types (applies to policies only) - list of resource type names", examples=[["AWS.S3.Bucket", "AWS.EC2.SecurityGroup"]], ), ] = [], compliance_status: Annotated[ str | None, Field( description="Filter by compliance status (applies to policies only) - 'PASS', 'FAIL', or 'ERROR'" ), ] = None, created_by: Annotated[ str | None, Field(description="Filter by creator user ID or actor ID") ] = None, last_modified_by: Annotated[ str | None, Field(description="Filter by last modifier user ID or actor ID") ] = None, output_ids: Annotated[ list[str], Field( description="Client-side filter by destination output IDs. Filters results after fetching from API to include only detections with at least one matching outputID.", examples=[["destination-id-123"], ["prod-slack", "prod-pagerduty"]], ), ] = [], ) -> dict[str, Any]: """List detections from your Panther instance with support for multiple detection types and filtering. Note: The output_ids filter is applied client-side after fetching all results from the API, as the Panther REST API does not support server-side filtering by outputID. For more efficient API-level filtering, consider using the 'tag' parameter if your detections are tagged by environment. """ # Validate detection types validation_error = validate_detection_types(detection_types) if validation_error: return validation_error logger.info(f"Fetching {limit} detections per type for types: {detection_types}") # For multiple detection types, cursor pagination is not supported if len(detection_types) > 1 and cursor: return { "success": False, "message": "Cursor pagination is not supported when querying multiple detection types. Please query one type at a time for pagination.", } # Validate filtering parameters if state and state not in VALID_STATES: return { "success": False, "message": f"Invalid state value. Must be one of: {', '.join(VALID_STATES)}", } if severity: invalid_severities = [s for s in severity if s not in VALID_SEVERITIES] if invalid_severities: return { "success": False, "message": f"Invalid severity values: {invalid_severities}. Valid values are: {', '.join(VALID_SEVERITIES)}", } if compliance_status and compliance_status not in VALID_COMPLIANCE_STATUSES: return { "success": False, "message": f"Invalid compliance_status value. Must be one of: {', '.join(VALID_COMPLIANCE_STATUSES)}", } # Validate detection-type-specific parameters if log_type and not any(dt in ["rules", "simple_rules"] for dt in detection_types): return { "success": False, "message": "log_type parameter is only valid for 'rules' and 'simple_rules' detection types.", } if resource_type and "policies" not in detection_types: return { "success": False, "message": "resource_type parameter is only valid for 'policies' detection type.", } if compliance_status and "policies" not in detection_types: return { "success": False, "message": "compliance_status parameter is only valid for 'policies' detection type.", } # Use the centralized field mapping field_map = LIST_FIELD_MAP try: all_results = {} has_next_pages = {} next_cursors = {} async with get_rest_client() as client: for detection_type in detection_types: # Build query parameters using helper function params = build_detection_params( limit, cursor, detection_types, name_contains, state, severity, tag, created_by, last_modified_by, log_type, resource_type, compliance_status, detection_type, ) result, _ = await client.get( get_endpoint_for_detection(detection_type), params=params ) # Extract detections and pagination info detections = result.get("results", []) next_cursor = result.get("next") # Store results for this detection type all_results[detection_type] = detections next_cursors[detection_type] = next_cursor has_next_pages[detection_type] = bool(next_cursor) # Process results for each detection type response_data = {"success": True} for detection_type in detection_types: detections = all_results[detection_type] # Keep only specific fields for each detection to limit the amount of data returned if detection_type == "policies": filtered_metadata = [ { "id": item["id"], "description": item.get("description"), "displayName": item.get("displayName"), "enabled": item.get("enabled", False), "severity": item.get("severity"), "resourceTypes": item.get("resourceTypes", []), "tags": item.get("tags", []), "reports": item.get("reports", {}), "managed": item.get("managed", False), "outputIDs": item.get("outputIDs", []), "createdBy": item.get("createdBy"), "createdAt": item.get("createdAt"), "lastModified": item.get("lastModified"), } for item in detections ] elif detection_type == "scheduled_rules": filtered_metadata = [ { "id": item["id"], "description": item.get("description"), "displayName": item.get("displayName"), "enabled": item.get("enabled", False), "severity": item.get("severity"), "scheduledQueries": item.get("scheduledQueries", []), "tags": item.get("tags", []), "reports": item.get("reports", {}), "managed": item.get("managed", False), "outputIDs": item.get("outputIDs", []), "threshold": item.get("threshold"), "dedupPeriodMinutes": item.get("dedupPeriodMinutes"), "createdBy": item.get("createdBy"), "createdAt": item.get("createdAt"), "lastModified": item.get("lastModified"), } for item in detections ] else: # rules and simple_rules filtered_metadata = [ { "id": item["id"], "description": item.get("description"), "displayName": item.get("displayName"), "enabled": item.get("enabled"), "severity": item.get("severity"), "logTypes": item.get("logTypes"), "tags": item.get("tags"), "reports": item.get("reports", {}), "managed": item.get("managed"), "outputIDs": item.get("outputIDs", []), "threshold": item.get("threshold"), "dedupPeriodMinutes": item.get("dedupPeriodMinutes"), "createdBy": item.get("createdBy"), "createdAt": item.get("createdAt"), "lastModified": item.get("lastModified"), } for item in detections ] # Apply client-side output_ids filtering if requested if output_ids: filtered_metadata = [ item for item in filtered_metadata if any( output_id in item.get("outputIDs", []) for output_id in output_ids ) ] logger.info( f"Applied client-side output_ids filter for {detection_type}: {len(filtered_metadata)} results matched" ) # Add to response response_data[field_map[detection_type]] = filtered_metadata response_data[f"total_{field_map[detection_type]}"] = len(filtered_metadata) # Add pagination info (only for single detection type queries) if len(detection_types) == 1: response_data["has_next_page"] = has_next_pages[detection_type] response_data["next_cursor"] = next_cursors[detection_type] else: response_data[f"{detection_type}_has_next_page"] = has_next_pages[ detection_type ] response_data[f"{detection_type}_next_cursor"] = next_cursors[ detection_type ] # Add overall summary for multi-type queries if len(detection_types) > 1: total_detections = sum(len(all_results[dt]) for dt in detection_types) response_data["total_all_detections"] = total_detections response_data["detection_types_queried"] = detection_types logger.info(f"Successfully retrieved detections for types: {detection_types}") return response_data except Exception as e: logger.error(f"Failed to list detection types {detection_types}: {str(e)}") return { "success": False, "message": f"Failed to list detection types {detection_types}: {str(e)}", }
- src/mcp_panther/server.py:71-76 (registration)Central point where register_all_tools(mcp) is called, which iterates over all @mcp_tool decorated functions (including list_detections) and registers them with the FastMCP server instance.# Register all tools with MCP using the registry register_all_tools(mcp) # Register all prompts with MCP using the registry register_all_prompts(mcp) # Register all resources with MCP using the registry register_all_resources(mcp)
- Pydantic input schema defined via Annotated parameters with Field metadata for description, examples, validation (e.g., limit ge=1 le=1000), making it self-describing for MCP clients.async def list_detections( detection_types: Annotated[ list[str], Field( description="One or more detection types - rules, scheduled_rules, simple_rules, or policies.", examples=[ ["rules", "simple_rules", "scheduled_rules"], ["policies"], ], ), ] = ["rules"], cursor: Annotated[ str | None, Field( description="Optional cursor for pagination from a previous query (only supported for single detection type)" ), ] = None, limit: Annotated[ int, Field( description="Maximum number of results to return per detection type", default=100, ge=1, le=1000, ), ] = 100, name_contains: Annotated[ str | None, Field(description="Substring search by name (case-insensitive)") ] = None, state: Annotated[ str, Field( description="Filter by state - 'enabled' or 'disabled'", default="enabled" ), ] = "", severity: Annotated[ list[str], Field( description="Filter by severity levels - INFO, LOW, MEDIUM, HIGH, or CRITICAL.", examples=[ ["MEDIUM", "HIGH", "CRITICAL"], ["INFO", "LOW"], ], ), ] = [], tag: Annotated[ list[str], Field( description="A case-insensitive list of tags to filter by.", examples=[["Initial Access", "Persistence"]], ), ] = [], log_type: Annotated[ list[str], Field( description="A list of log types to filter by (applies to rules and simple-rules only).", examples=[["AWS.CloudTrail", "GCP.AuditLog"]], ), ] = [], resource_type: Annotated[ list[str], Field( description="Filter by resource types (applies to policies only) - list of resource type names", examples=[["AWS.S3.Bucket", "AWS.EC2.SecurityGroup"]], ), ] = [], compliance_status: Annotated[ str | None, Field( description="Filter by compliance status (applies to policies only) - 'PASS', 'FAIL', or 'ERROR'" ), ] = None, created_by: Annotated[ str | None, Field(description="Filter by creator user ID or actor ID") ] = None, last_modified_by: Annotated[ str | None, Field(description="Filter by last modifier user ID or actor ID") ] = None, output_ids: Annotated[ list[str], Field( description="Client-side filter by destination output IDs. Filters results after fetching from API to include only detections with at least one matching outputID.", examples=[["destination-id-123"], ["prod-slack", "prod-pagerduty"]], ), ] = [], ) -> dict[str, Any]:
- Input validation helper for detection_types parameter used in list_detections.def validate_detection_types(detection_types: list[str]) -> dict[str, Any] | None: """Validate detection types and return error dict if invalid, None if valid.""" if not detection_types: return { "success": False, "message": "At least one detection type must be specified.", } invalid_types = [dt for dt in detection_types if dt not in DETECTION_TYPES] if invalid_types: valid_types = ", ".join(DETECTION_TYPES.keys()) return { "success": False, "message": f"Invalid detection_types {invalid_types}. Valid values are: {valid_types}", } return None
- Helper to build API request parameters, ensuring compatibility across detection types and validating param applicability.def build_detection_params( limit: int, cursor: str | None, detection_types: list[str], name_contains: str | None, state: str | None, severity: list[str] | None, tag: list[str] | None, created_by: str | None, last_modified_by: str | None, log_type: list[str] | None, resource_type: list[str] | None, compliance_status: str | None, detection_type: str, ) -> dict[str, Any]: """Build query parameters for detection API calls.""" params = {"limit": limit} # Add cursor for single detection type queries only if cursor and len(detection_types) == 1: params["cursor"] = cursor logger.info(f"Using cursor for pagination: {cursor}") # Add common filtering parameters if name_contains: params["name-contains"] = name_contains if state: params["state"] = state if severity: params["severity"] = severity if tag: params["tag"] = tag if created_by: params["created-by"] = created_by if last_modified_by: params["last-modified-by"] = last_modified_by # Add detection-type-specific parameters if detection_type in ["rules", "simple_rules"] and log_type: params["log-type"] = log_type elif detection_type == "policies": if resource_type: params["resource-type"] = resource_type if compliance_status: params["compliance-status"] = compliance_status return params