Skip to main content
Glama
test_scan.py12.6 kB
"""Unit tests for scan router.""" import os import tempfile from datetime import UTC, datetime from io import BytesIO from unittest.mock import MagicMock, Mock, patch from uuid import UUID, uuid4 import pytest from fastapi import FastAPI from fastapi.testclient import TestClient from yaraflux_mcp_server.auth import get_current_active_user from yaraflux_mcp_server.models import ScanRequest, User, UserRole, YaraScanResult from yaraflux_mcp_server.routers.scan import router from yaraflux_mcp_server.yara_service import YaraError # Create test app app = FastAPI() app.include_router(router) @pytest.fixture def test_user(): """Test user fixture.""" return User(username="testuser", role=UserRole.USER, disabled=False, email="test@example.com") @pytest.fixture def client_with_user(test_user): """TestClient with normal user dependency override.""" app.dependency_overrides[get_current_active_user] = lambda: test_user with TestClient(app) as client: yield client # Clear overrides after test app.dependency_overrides = {} @pytest.fixture def sample_scan_result(): """Sample scan result fixture.""" pytest.skip("YaraScanResult model needs updating for tests") return YaraScanResult( scan_id=str(uuid4()), timestamp=datetime.now(UTC).isoformat(), scan_time=123.45, # Needs to be a float, not string status="completed", file_name="test_file.exe", file_size=1024, file_hash="d41d8cd98f00b204e9800998ecf8427e", file_type="application/x-executable", matches=[ { "rule": "test_rule", "namespace": "default", "tags": ["test", "malware"], "meta": {"description": "Test rule", "author": "Test Author"}, "strings": [{"offset": 100, "name": "$a", "value": "suspicious string"}], } ], duration_ms=123, ) class TestScanUrl: """Tests for scan_url endpoint.""" @patch("yaraflux_mcp_server.routers.scan.yara_service") def test_scan_url_success(self, mock_yara_service, client_with_user, sample_scan_result): """Test scanning URL successfully.""" # Setup mock mock_yara_service.fetch_and_scan.return_value = sample_scan_result # Prepare request data scan_request = {"url": "https://example.com/test_file.exe", "rule_names": ["rule1", "rule2"], "timeout": 60} # Make request response = client_with_user.post("/scan/url", json=scan_request) # Check response assert response.status_code == 200 result = response.json() assert result["result"]["scan_id"] == str(sample_scan_result.scan_id) # Convert UUID to string for comparison assert len(result["result"]["matches"]) == 1 assert result["result"]["matches"][0]["rule"] == "test_rule" # Verify service was called correctly mock_yara_service.fetch_and_scan.assert_called_once_with( url="https://example.com/test_file.exe", rule_names=["rule1", "rule2"], timeout=60 ) @patch("yaraflux_mcp_server.routers.scan.yara_service") def test_scan_url_without_optional_params(self, mock_yara_service, client_with_user, sample_scan_result): """Test scanning URL without optional parameters.""" # Setup mock mock_yara_service.fetch_and_scan.return_value = sample_scan_result # Prepare request data with only required URL scan_request = {"url": "https://example.com/test_file.exe"} # Make request response = client_with_user.post("/scan/url", json=scan_request) # Check response assert response.status_code == 200 # Verify service was called with only URL and default values for others mock_yara_service.fetch_and_scan.assert_called_once_with( url="https://example.com/test_file.exe", rule_names=None, timeout=None ) def test_scan_url_missing_url(self, client_with_user): """Test scanning without URL.""" # Prepare request data without URL scan_request = {"rule_names": ["rule1", "rule2"], "timeout": 60} # Make request response = client_with_user.post("/scan/url", json=scan_request) # Check response assert response.status_code == 400 assert "URL is required" in response.json()["detail"] @patch("yaraflux_mcp_server.routers.scan.yara_service") def test_scan_url_yara_error(self, mock_yara_service, client_with_user): """Test scanning URL with YARA error.""" # Setup mock with YARA error mock_yara_service.fetch_and_scan.side_effect = YaraError("YARA scanning error") # Prepare request data scan_request = {"url": "https://example.com/test_file.exe"} # Make request response = client_with_user.post("/scan/url", json=scan_request) # Check response assert response.status_code == 400 assert "YARA scanning error" in response.json()["detail"] @patch("yaraflux_mcp_server.routers.scan.yara_service") def test_scan_url_generic_error(self, mock_yara_service, client_with_user): """Test scanning URL with generic error.""" # Setup mock with generic error mock_yara_service.fetch_and_scan.side_effect = Exception("Generic error") # Prepare request data scan_request = {"url": "https://example.com/test_file.exe"} # Make request response = client_with_user.post("/scan/url", json=scan_request) # Check response assert response.status_code == 500 assert "Generic error" in response.json()["detail"] class TestScanFile: """Tests for scan_file endpoint.""" @patch("yaraflux_mcp_server.routers.scan.tempfile.NamedTemporaryFile") @patch("yaraflux_mcp_server.routers.scan.yara_service") def test_scan_file_success(self, mock_yara_service, mock_temp_file, client_with_user, sample_scan_result): """Test scanning uploaded file successfully.""" # Setup mocks mock_temp = Mock() mock_temp.name = "/tmp/testfile" mock_temp_file.return_value = mock_temp mock_yara_service.match_file.return_value = sample_scan_result # Create test file file_content = b"Test file content" file = {"file": ("test_file.exe", BytesIO(file_content), "application/octet-stream")} # Additional form data data = {"rule_names": "rule1,rule2", "timeout": "60"} # Make request response = client_with_user.post("/scan/file", files=file, data=data) # Check response assert response.status_code == 200 result = response.json() assert result["result"]["scan_id"] == str(sample_scan_result.scan_id) assert len(result["result"]["matches"]) == 1 # Verify temp file was written to and service was called mock_temp.write.assert_called_once_with(file_content) mock_yara_service.match_file.assert_called_once_with( file_path="/tmp/testfile", rule_names=["rule1", "rule2"], timeout=60 ) # Verify cleanup was attempted assert mock_temp.close.called @patch("yaraflux_mcp_server.routers.scan.tempfile.NamedTemporaryFile") @patch("yaraflux_mcp_server.routers.scan.yara_service") def test_scan_file_without_optional_params( self, mock_yara_service, mock_temp_file, client_with_user, sample_scan_result ): """Test scanning file without optional parameters.""" # Setup mocks mock_temp = Mock() mock_temp.name = "/tmp/testfile" mock_temp_file.return_value = mock_temp mock_yara_service.match_file.return_value = sample_scan_result # Create test file file_content = b"Test file content" file = {"file": ("test_file.exe", BytesIO(file_content), "application/octet-stream")} # Make request without optional form data response = client_with_user.post("/scan/file", files=file) # Check response assert response.status_code == 200 # Verify service was called with right params mock_yara_service.match_file.assert_called_once_with( file_path="/tmp/testfile", rule_names=None, timeout=None # No rules specified # No timeout specified ) def test_scan_file_missing_file(self, client_with_user): """Test scanning without file.""" # Make request without file response = client_with_user.post("/scan/file") # Check response assert response.status_code == 422 # Validation error assert "field required" in response.text.lower() @patch("yaraflux_mcp_server.routers.scan.tempfile.NamedTemporaryFile") @patch("yaraflux_mcp_server.routers.scan.yara_service") def test_scan_file_yara_error(self, mock_yara_service, mock_temp_file, client_with_user): """Test scanning file with YARA error.""" # Setup mocks mock_temp = Mock() mock_temp.name = "/tmp/testfile" mock_temp_file.return_value = mock_temp mock_yara_service.match_file.side_effect = YaraError("YARA scanning error") # Create test file file_content = b"Test file content" file = {"file": ("test_file.exe", BytesIO(file_content), "application/octet-stream")} # Make request response = client_with_user.post("/scan/file", files=file) # Check response assert response.status_code == 400 assert "YARA scanning error" in response.json()["detail"] # Verify cleanup was attempted assert mock_temp.close.called @patch("yaraflux_mcp_server.routers.scan.tempfile.NamedTemporaryFile") @patch("yaraflux_mcp_server.routers.scan.yara_service") @patch("yaraflux_mcp_server.routers.scan.os.unlink") def test_scan_file_cleanup_error( self, mock_unlink, mock_yara_service, mock_temp_file, client_with_user, sample_scan_result ): """Test scanning file with cleanup error.""" # Setup mocks mock_temp = Mock() mock_temp.name = "/tmp/testfile" mock_temp_file.return_value = mock_temp mock_yara_service.match_file.return_value = sample_scan_result mock_unlink.side_effect = OSError("Cannot delete temp file") # Create test file file_content = b"Test file content" file = {"file": ("test_file.exe", BytesIO(file_content), "application/octet-stream")} # Make request - should still succeed despite cleanup error response = client_with_user.post("/scan/file", files=file) # Check response assert response.status_code == 200 # Verify cleanup was attempted but error was handled mock_unlink.assert_called_once_with("/tmp/testfile") class TestGetScanResult: """Tests for get_scan_result endpoint.""" @patch("yaraflux_mcp_server.routers.scan.get_storage_client") def test_get_scan_result_success(self, mock_get_storage, client_with_user, sample_scan_result): """Test getting scan result successfully.""" # Setup mock mock_storage = Mock() mock_get_storage.return_value = mock_storage mock_storage.get_result.return_value = sample_scan_result.model_dump() # Make request scan_id = sample_scan_result.scan_id response = client_with_user.get(f"/scan/result/{scan_id}") # Check response assert response.status_code == 200 result = response.json() assert result["result"]["scan_id"] == str(scan_id) # Convert UUID to string for comparison assert len(result["result"]["matches"]) == 1 assert result["result"]["matches"][0]["rule"] == "test_rule" # Verify storage was accessed correctly mock_storage.get_result.assert_called_once_with(str(scan_id)) # String is used in the API call @patch("yaraflux_mcp_server.routers.scan.get_storage_client") def test_get_scan_result_not_found(self, mock_get_storage, client_with_user): """Test getting non-existent scan result.""" # Setup mock with error mock_storage = Mock() mock_get_storage.return_value = mock_storage mock_storage.get_result.side_effect = Exception("Scan result not found") # Make request with random UUID scan_id = str(uuid4()) response = client_with_user.get(f"/scan/result/{scan_id}") # Check response assert response.status_code == 404 assert "Scan result not found" in response.json()["detail"]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ThreatFlux/YaraFlux'

If you have feedback or need assistance with the MCP directory API, please join our Discord server