Skip to main content
Glama
CrowdStrike

CrowdStrike Falcon MCP Server

Official
by CrowdStrike
test_detections.py9.37 kB
""" E2E tests for the Detections module. """ import json import unittest import pytest from tests.e2e.utils.base_e2e_test import BaseE2ETest @pytest.mark.e2e class TestDetectionsModuleE2E(BaseE2ETest): """ End-to-end test suite for the Falcon MCP Server Detections Module. """ def test_get_top_3_high_severity_detections(self): """Verify the agent can retrieve the top 3 high-severity detections.""" async def test_logic(): fixtures = [ { "operation": "GetQueriesAlertsV2", "validator": lambda kwargs: "severity:" in kwargs.get("parameters", {}).get("filter", "").lower() and kwargs.get("parameters", {}).get("limit", 0) <= 10, "response": { "status_code": 200, "body": { "resources": ["detection-1", "detection-2", "detection-3"] }, }, }, { "operation": "PostEntitiesAlertsV2", "validator": lambda kwargs: "detection-1" in kwargs.get("body", {}).get("composite_ids", []), "response": { "status_code": 200, "body": { "resources": [ { "id": "detection-1", "composite_id": "detection-1", "status": "new", "severity": 90, "severity_name": "Critical", "confidence": 85, "description": "A critical detection for E2E testing.", "created_timestamp": "2024-01-20T10:00:00Z", "agent_id": "test-agent-001", }, { "id": "detection-2", "composite_id": "detection-2", "status": "new", "severity": 70, "severity_name": "High", "confidence": 80, "description": "A high severity detection for E2E testing.", "created_timestamp": "2024-01-20T09:30:00Z", "agent_id": "test-agent-002", }, { "id": "detection-3", "composite_id": "detection-3", "status": "new", "severity": 70, "severity_name": "High", "confidence": 75, "description": "Another high severity detection for E2E testing.", "created_timestamp": "2024-01-20T09:00:00Z", "agent_id": "test-agent-003", }, ] }, }, }, ] self._mock_api_instance.command.side_effect = ( self._create_mock_api_side_effect(fixtures) ) prompt = "Give me the details of the top 3 high severity detections, return only detection id and descriptions" return await self._run_agent_stream(prompt) def assertions(tools, result): self.assertGreaterEqual(len(tools), 1, "Expected 1 tool call") used_tool = tools[len(tools) - 1] self.assertEqual( used_tool["input"]["tool_name"], "falcon_search_detections" ) # Check for severity-related filtering (numeric or text-based) tool_input_str = json.dumps(used_tool["input"]["tool_input"]).lower() self.assertTrue( "severity:" in tool_input_str or "high" in tool_input_str, f"Expected severity filtering in tool input: {tool_input_str}", ) self.assertIn("detection-1", used_tool["output"]) self.assertIn("detection-2", used_tool["output"]) self.assertIn("detection-3", used_tool["output"]) self.assertGreaterEqual( self._mock_api_instance.command.call_count, 2, "Expected 2 API calls" ) api_call_1_params = self._mock_api_instance.command.call_args_list[0][ 1 ].get("parameters", {}) filter_str = api_call_1_params.get("filter", "").lower() # Accept either numeric severity filters or text-based filters self.assertTrue( "severity:" in filter_str or "high" in filter_str, f"Expected severity filtering in API call: {filter_str}", ) self.assertEqual(api_call_1_params.get("limit"), 3) self.assertIn("severity.desc", api_call_1_params.get("sort", "")) api_call_2_body = self._mock_api_instance.command.call_args_list[1][1].get( "body", {} ) self.assertEqual( api_call_2_body.get("composite_ids"), ["detection-1", "detection-2", "detection-3"], ) self.assertIn("detection-1", result) self.assertIn("detection-2", result) self.assertIn("detection-3", result) self.run_test_with_retries( "test_get_top_3_high_severity_detections", test_logic, assertions, ) def test_get_highest_detection_for_ip(self): """Verify the agent can find the highest-severity detection for a specific IP.""" async def test_logic(): fixtures = [ { "operation": "GetQueriesAlertsV2", "validator": lambda kwargs: "10.0.0.1" in kwargs.get("parameters", {}).get("filter", ""), "response": { "status_code": 200, "body": {"resources": ["detection-4"]}, }, }, { "operation": "PostEntitiesAlertsV2", "validator": lambda kwargs: "detection-4" in kwargs.get("body", {}).get("composite_ids", []), "response": { "status_code": 200, "body": { "resources": [ { "id": "detection-4", "composite_id": "detection-4", "status": "new", "severity": 90, "severity_name": "Critical", "confidence": 95, "description": "A critical detection on a specific IP.", "created_timestamp": "2024-01-20T11:00:00Z", "agent_id": "test-agent-004", "local_ip": "10.0.0.1", } ] }, }, }, ] self._mock_api_instance.command.side_effect = ( self._create_mock_api_side_effect(fixtures) ) prompt = "What is the highest detection for the device with local_ip 10.0.0.1? Return the detection id as well" return await self._run_agent_stream(prompt) def assertions(tools, result): self.assertGreaterEqual( len(tools), 1, f"Expected 1 tool call, but got {len(tools)}" ) used_tool = tools[len(tools) - 1] self.assertEqual( used_tool["input"]["tool_name"], "falcon_search_detections" ) self.assertIn("10.0.0.1", json.dumps(used_tool["input"]["tool_input"])) self.assertIn("detection-4", used_tool["output"]) self.assertGreaterEqual( self._mock_api_instance.command.call_count, 2, "Expected 2 API calls" ) api_call_1_params = self._mock_api_instance.command.call_args_list[0][ 1 ].get("parameters", {}) self.assertIn("10.0.0.1", api_call_1_params.get("filter")) api_call_2_body = self._mock_api_instance.command.call_args_list[1][1].get( "body", {} ) self.assertEqual(api_call_2_body.get("composite_ids"), ["detection-4"]) self.assertIn("detection-4", result) self.assertNotIn("detection-1", result) self.run_test_with_retries( "test_get_highest_detection_for_ip", test_logic, assertions ) if __name__ == "__main__": unittest.main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/CrowdStrike/falcon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server