Skip to main content
Glama
test_rules.py24.8 kB
"""Unit tests for rules router.""" from io import BytesIO from unittest.mock import MagicMock, Mock, patch import pytest from fastapi import FastAPI from fastapi.testclient import TestClient from yaraflux_mcp_server.auth import get_current_active_user, validate_admin from yaraflux_mcp_server.models import User, UserRole, YaraRuleMetadata from yaraflux_mcp_server.routers.rules import router from yaraflux_mcp_server.yara_service import YaraError # Create test app app = FastAPI() app.include_router(router) @pytest.fixture def test_user(): """Test user fixture.""" return User(username="testuser", role=UserRole.USER, disabled=False, email="test@example.com") @pytest.fixture def test_admin(): """Test admin user fixture.""" return User(username="testadmin", role=UserRole.ADMIN, disabled=False, email="admin@example.com") @pytest.fixture def client_with_user(test_user): """TestClient with normal user dependency override.""" app.dependency_overrides[get_current_active_user] = lambda: test_user with TestClient(app) as client: yield client # Clear overrides after test app.dependency_overrides = {} @pytest.fixture def client_with_admin(test_admin): """TestClient with admin user dependency override.""" app.dependency_overrides[get_current_active_user] = lambda: test_admin app.dependency_overrides[validate_admin] = lambda: test_admin with TestClient(app) as client: yield client # Clear overrides after test app.dependency_overrides = {} @pytest.fixture def sample_rule_metadata(): """Sample rule metadata fixture.""" return YaraRuleMetadata( name="test_rule", source="custom", type="text", description="Test rule", author="Test Author", created_at="2025-01-01T00:00:00", updated_at="2025-01-01T00:00:00", tags=["test"], ) @pytest.fixture def sample_rule_content(): """Sample rule content fixture.""" return """ rule test_rule { meta: description = "Test rule" author = "Test Author" strings: $a = "test string" condition: $a } """ class TestListRules: """Tests for list_rules endpoint.""" @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_list_rules_success(self, mock_yara_service, client_with_user, sample_rule_metadata): """Test listing rules successfully.""" # Setup mock mock_yara_service.list_rules.return_value = [sample_rule_metadata] # Make request response = client_with_user.get("/rules/") # Check response assert response.status_code == 200 result = response.json() assert len(result) == 1 assert result[0]["name"] == "test_rule" assert result[0]["source"] == "custom" # Verify service was called mock_yara_service.list_rules.assert_called_once_with(None) @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_list_rules_with_source(self, mock_yara_service, client_with_user, sample_rule_metadata): """Test listing rules with source filter.""" # Setup mock mock_yara_service.list_rules.return_value = [sample_rule_metadata] # Make request response = client_with_user.get("/rules/?source=custom") # Check response assert response.status_code == 200 # Verify service was called with source mock_yara_service.list_rules.assert_called_once_with("custom") @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_list_rules_error(self, mock_yara_service, client_with_user): """Test listing rules with error.""" # Setup mock with error mock_yara_service.list_rules.side_effect = YaraError("Failed to list rules") # Make request response = client_with_user.get("/rules/") # Check response assert response.status_code == 500 assert "Failed to list rules" in response.json()["detail"] class TestGetRule: """Tests for get_rule endpoint.""" @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_get_rule_success(self, mock_yara_service, client_with_user, sample_rule_metadata, sample_rule_content): """Test getting rule successfully.""" # Setup mocks mock_yara_service.get_rule.return_value = sample_rule_content mock_yara_service.list_rules.return_value = [sample_rule_metadata] # Make request response = client_with_user.get("/rules/test_rule") # Check response assert response.status_code == 200 result = response.json() assert result["name"] == "test_rule" assert result["source"] == "custom" assert "test string" in result["content"] assert "metadata" in result # Verify service was called mock_yara_service.get_rule.assert_called_once_with("test_rule", "custom") @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_get_rule_with_source(self, mock_yara_service, client_with_user, sample_rule_metadata, sample_rule_content): """Test getting rule with specific source.""" # Setup mocks mock_yara_service.get_rule.return_value = sample_rule_content mock_yara_service.list_rules.return_value = [sample_rule_metadata] # Make request response = client_with_user.get("/rules/test_rule?source=community") # Check response assert response.status_code == 200 # Verify service was called with correct source mock_yara_service.get_rule.assert_called_once_with("test_rule", "community") @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_get_rule_not_found(self, mock_yara_service, client_with_user): """Test getting non-existent rule.""" # Setup mock with error mock_yara_service.get_rule.side_effect = YaraError("Rule not found") # Make request response = client_with_user.get("/rules/nonexistent_rule") # Check response assert response.status_code == 404 assert "Rule not found" in response.json()["detail"] class TestGetRuleRaw: """Tests for get_rule_raw endpoint.""" @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_get_rule_raw_success(self, mock_yara_service, client_with_user, sample_rule_content): """Test getting raw rule content successfully.""" # Setup mock mock_yara_service.get_rule.return_value = sample_rule_content # Make request response = client_with_user.get("/rules/test_rule/raw") # Check response assert response.status_code == 200 assert "text/plain" in response.headers["content-type"] assert "test string" in response.text # Verify service was called mock_yara_service.get_rule.assert_called_once_with("test_rule", "custom") @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_get_rule_raw_not_found(self, mock_yara_service, client_with_user): """Test getting raw content for non-existent rule.""" # Setup mock with error mock_yara_service.get_rule.side_effect = YaraError("Rule not found") # Make request response = client_with_user.get("/rules/nonexistent_rule/raw") # Check response assert response.status_code == 404 assert "Rule not found" in response.json()["detail"] class TestCreateRule: """Tests for create_rule endpoint.""" @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_create_rule_success(self, mock_yara_service, client_with_user, sample_rule_metadata, sample_rule_content): """Test creating rule successfully.""" # Setup mock mock_yara_service.add_rule.return_value = sample_rule_metadata # Prepare request data rule_data = {"name": "test_rule", "content": sample_rule_content, "source": "custom"} # Make request response = client_with_user.post("/rules/", json=rule_data) # Check response assert response.status_code == 200 result = response.json() assert result["name"] == "test_rule" assert result["source"] == "custom" # Verify service was called mock_yara_service.add_rule.assert_called_once_with("test_rule", sample_rule_content) @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_create_rule_invalid(self, mock_yara_service, client_with_user): """Test creating invalid rule.""" # Setup mock with error mock_yara_service.add_rule.side_effect = YaraError("Invalid YARA syntax") # Prepare request data rule_data = {"name": "invalid_rule", "content": "invalid content", "source": "custom"} # Make request response = client_with_user.post("/rules/", json=rule_data) # Check response assert response.status_code == 400 assert "Invalid YARA syntax" in response.json()["detail"] class TestUploadRule: """Tests for upload_rule endpoint.""" @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_upload_rule_success(self, mock_yara_service, client_with_user, sample_rule_metadata, sample_rule_content): """Test uploading rule file successfully.""" # Setup mock mock_yara_service.add_rule.return_value = sample_rule_metadata # Create test file file_content = sample_rule_content.encode("utf-8") file = {"rule_file": ("test_rule.yar", BytesIO(file_content), "text/plain")} # Additional form data data = {"source": "custom"} # Make request response = client_with_user.post("/rules/upload", files=file, data=data) # Check response assert response.status_code == 200 result = response.json() assert result["name"] == "test_rule" # Verify service was called correctly mock_yara_service.add_rule.assert_called_once_with("test_rule.yar", sample_rule_content, "custom") @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_upload_rule_invalid(self, mock_yara_service, client_with_user): """Test uploading invalid rule file.""" # Setup mock with error mock_yara_service.add_rule.side_effect = YaraError("Invalid YARA syntax") # Create test file file_content = b"invalid rule content" file = {"rule_file": ("invalid.yar", BytesIO(file_content), "text/plain")} # Make request response = client_with_user.post("/rules/upload", files=file) # Check response assert response.status_code == 400 assert "Invalid YARA syntax" in response.json()["detail"] def test_upload_rule_no_file(self, client_with_user): """Test uploading without file.""" # Make request without file response = client_with_user.post("/rules/upload") # Check response assert response.status_code == 422 # Validation error assert "field required" in response.text.lower() class TestUpdateRule: """Tests for update_rule endpoint.""" @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_update_rule_success(self, mock_yara_service, client_with_user, sample_rule_metadata, sample_rule_content): """Test updating rule successfully.""" # Setup mock mock_yara_service.update_rule.return_value = sample_rule_metadata # Make request response = client_with_user.put("/rules/test_rule", json=sample_rule_content) # Check response assert response.status_code == 200 result = response.json() assert result["name"] == "test_rule" # Verify service was called correctly mock_yara_service.update_rule.assert_called_once_with("test_rule", sample_rule_content, "custom") @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_update_rule_not_found(self, mock_yara_service, client_with_user, sample_rule_content): """Test updating non-existent rule.""" # Setup mock with not found error mock_yara_service.update_rule.side_effect = YaraError("Rule not found") # Make request response = client_with_user.put("/rules/nonexistent_rule", json=sample_rule_content) # Check response assert response.status_code == 404 assert "Rule not found" in response.json()["detail"] @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_update_rule_invalid(self, mock_yara_service, client_with_user): """Test updating rule with invalid content.""" # Setup mock with validation error mock_yara_service.update_rule.side_effect = YaraError("Invalid YARA syntax") # Make request response = client_with_user.put("/rules/test_rule", json="invalid content") # Check response assert response.status_code == 400 assert "Invalid YARA syntax" in response.json()["detail"] class TestUpdateRulePlain: """Tests for update_rule_plain endpoint.""" @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_update_rule_plain_success( self, mock_yara_service, client_with_user, sample_rule_metadata, sample_rule_content ): """Test updating rule with plain text successfully.""" # Setup mock mock_yara_service.update_rule.return_value = sample_rule_metadata # Make request with plain text content response = client_with_user.put( "/rules/test_rule/plain?source=custom", content=sample_rule_content, headers={"Content-Type": "text/plain"} ) # Check response assert response.status_code == 200 result = response.json() assert result["name"] == "test_rule" # Verify service was called correctly mock_yara_service.update_rule.assert_called_once_with("test_rule", sample_rule_content, "custom") @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_update_rule_plain_not_found(self, mock_yara_service, client_with_user, sample_rule_content): """Test updating non-existent rule with plain text.""" # Setup mock with not found error mock_yara_service.update_rule.side_effect = YaraError("Rule not found") # Make request response = client_with_user.put( "/rules/nonexistent_rule/plain", content=sample_rule_content, headers={"Content-Type": "text/plain"} ) # Check response assert response.status_code == 404 assert "Rule not found" in response.json()["detail"] class TestDeleteRule: """Tests for delete_rule endpoint.""" @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_delete_rule_success(self, mock_yara_service, client_with_user): """Test deleting rule successfully.""" # Setup mock mock_yara_service.delete_rule.return_value = True # Make request response = client_with_user.delete("/rules/test_rule") # Check response assert response.status_code == 200 result = response.json() assert "deleted" in result["message"] # Verify service was called correctly mock_yara_service.delete_rule.assert_called_once_with("test_rule", "custom") @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_delete_rule_not_found(self, mock_yara_service, client_with_user): """Test deleting non-existent rule.""" # Setup mock with not found result mock_yara_service.delete_rule.return_value = False # Make request response = client_with_user.delete("/rules/nonexistent_rule") # Check response assert response.status_code == 404 assert "not found" in response.json()["detail"] @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_delete_rule_error(self, mock_yara_service, client_with_user): """Test deleting rule with error.""" # Setup mock with error mock_yara_service.delete_rule.side_effect = YaraError("Failed to delete rule") # Make request response = client_with_user.delete("/rules/test_rule") # Check response assert response.status_code == 500 assert "Failed to delete rule" in response.json()["detail"] class TestImportRules: """Tests for import_rules endpoint.""" @patch("yaraflux_mcp_server.routers.rules.import_rules_tool") def test_import_rules_success(self, mock_import_tool, client_with_admin): """Test importing rules successfully as admin.""" # Setup mock mock_import_tool.return_value = { "success": True, "message": "Rules imported successfully", "imported": 10, "failed": 0, } # Make request response = client_with_admin.post("/rules/import") # Check response assert response.status_code == 200 result = response.json() assert result["success"] is True assert result["imported"] == 10 # Verify tool was called with default parameters mock_import_tool.assert_called_once_with(None) @patch("yaraflux_mcp_server.routers.rules.import_rules_tool") def test_import_rules_with_params(self, mock_import_tool, client_with_admin): """Test importing rules with custom parameters.""" # Setup mock mock_import_tool.return_value = {"success": True, "message": "Rules imported successfully"} # Make request with custom parameters response = client_with_admin.post("/rules/import?url=https://example.com/repo&branch=develop") # Check response assert response.status_code == 200 # Verify tool was called with custom parameters mock_import_tool.assert_called_once_with("https://example.com/repo") @patch("yaraflux_mcp_server.routers.rules.import_rules_tool") def test_import_rules_failure(self, mock_import_tool, client_with_admin): """Test import failure.""" # Setup mock with failure result mock_import_tool.return_value = {"success": False, "message": "Import failed", "error": "Network error"} # Make request response = client_with_admin.post("/rules/import") # Check response assert response.status_code == 500 assert "Import failed" in response.json()["detail"] def test_import_rules_non_admin(self, client_with_user): """Test import attempt by non-admin user.""" # Make request with non-admin client response = client_with_user.post("/rules/import") # Check response - should be blocked by auth assert response.status_code == 403 class TestValidateRule: """Tests for validate_rule endpoint.""" @patch("yaraflux_mcp_server.routers.rules.validate_rule_tool") def test_validate_rule_json_success(self, mock_validate_tool, client_with_user, sample_rule_content): """Test validating rule successfully with JSON content.""" # Setup mock mock_validate_tool.return_value = {"valid": True, "message": "Rule is valid"} # Make request with JSON format response = client_with_user.post("/rules/validate", json={"content": sample_rule_content}) # Check response assert response.status_code == 200 result = response.json() assert result["valid"] is True # Verify validation was called mock_validate_tool.assert_called_once() @patch("yaraflux_mcp_server.routers.rules.validate_rule_tool") def test_validate_rule_plain_success(self, mock_validate_tool, client_with_user, sample_rule_content): """Test validating rule successfully with plain text content.""" # Setup mock mock_validate_tool.return_value = {"valid": True, "message": "Rule is valid"} # Make request with plain text response = client_with_user.post( "/rules/validate", content=sample_rule_content, headers={"Content-Type": "text/plain"} ) # Check response assert response.status_code == 200 result = response.json() assert result["valid"] is True # Verify validation was called with the plain text content mock_validate_tool.assert_called_once_with(sample_rule_content) @patch("yaraflux_mcp_server.routers.rules.validate_rule_tool") def test_validate_rule_invalid(self, mock_validate_tool, client_with_user): """Test validating invalid rule.""" # Setup mock for invalid rule mock_validate_tool.return_value = { "valid": False, "message": "Syntax error", "error_details": "line 3: syntax error, unexpected identifier", } # Make request with invalid content response = client_with_user.post( "/rules/validate", content="invalid rule", headers={"Content-Type": "text/plain"} ) # Check response assert response.status_code == 200 # Still 200 even for invalid rules result = response.json() assert result["valid"] is False assert "Syntax error" in result["message"] class TestValidateRulePlain: """Tests for validate_rule_plain endpoint.""" @patch("yaraflux_mcp_server.routers.rules.validate_rule_tool") def test_validate_rule_plain_success(self, mock_validate_tool, client_with_user, sample_rule_content): """Test validating rule with plain text endpoint.""" # Setup mock mock_validate_tool.return_value = {"valid": True, "message": "Rule is valid"} # Make request response = client_with_user.post( "/rules/validate/plain", content=sample_rule_content, headers={"Content-Type": "text/plain"} ) # Check response assert response.status_code == 200 result = response.json() assert result["valid"] is True # Verify tool was called with correct content mock_validate_tool.assert_called_once_with(sample_rule_content) @patch("yaraflux_mcp_server.routers.rules.validate_rule_tool") def test_validate_rule_plain_invalid(self, mock_validate_tool, client_with_user): """Test validating invalid rule with plain text endpoint.""" # Setup mock for invalid rule mock_validate_tool.return_value = {"valid": False, "message": "Syntax error at line 5"} # Make request with invalid content invalid_content = 'rule invalid { strings: $a = "test condition: invalid }' response = client_with_user.post( "/rules/validate/plain", content=invalid_content, headers={"Content-Type": "text/plain"} ) # Check response assert response.status_code == 200 # Still 200 for invalid rules result = response.json() assert result["valid"] is False assert "Syntax error" in result["message"] class TestCreateRulePlain: """Tests for create_rule_plain endpoint.""" @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_create_rule_plain_success( self, mock_yara_service, client_with_user, sample_rule_metadata, sample_rule_content ): """Test creating rule with plain text successfully.""" # Setup mock mock_yara_service.add_rule.return_value = sample_rule_metadata # Make request response = client_with_user.post( "/rules/plain?rule_name=test_rule&source=custom", content=sample_rule_content, headers={"Content-Type": "text/plain"}, ) # Check response assert response.status_code == 200 result = response.json() assert result["name"] == "test_rule" assert result["source"] == "custom" # Verify service was called correctly mock_yara_service.add_rule.assert_called_once_with("test_rule", sample_rule_content, "custom") @patch("yaraflux_mcp_server.routers.rules.yara_service") def test_create_rule_plain_invalid(self, mock_yara_service, client_with_user): """Test creating rule with invalid plain text.""" # Setup mock with error mock_yara_service.add_rule.side_effect = YaraError("Invalid YARA syntax") # Make request with invalid content response = client_with_user.post( "/rules/plain?rule_name=invalid_rule", content="invalid rule content", headers={"Content-Type": "text/plain"}, ) # Check response assert response.status_code == 400 assert "Invalid YARA syntax" in response.json()["detail"]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ThreatFlux/YaraFlux'

If you have feedback or need assistance with the MCP directory API, please join our Discord server