We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/ThreatFlux/YaraFlux'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Fixed tests for rule tools to improve coverage."""
import json
from unittest.mock import MagicMock, Mock, patch
import pytest
from fastapi import HTTPException
from yaraflux_mcp_server.mcp_tools.rule_tools import (
add_yara_rule,
delete_yara_rule,
get_yara_rule,
import_threatflux_rules,
list_yara_rules,
update_yara_rule,
validate_yara_rule,
)
from yaraflux_mcp_server.yara_service import YaraError
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_list_yara_rules_success(mock_yara_service):
"""Test list_yara_rules successfully returns rules."""
# Setup mocks
rule1 = Mock()
rule1.model_dump.return_value = {"name": "rule1.yar", "source": "custom"}
rule2 = Mock()
rule2.model_dump.return_value = {"name": "rule2.yar", "source": "community"}
mock_yara_service.list_rules.return_value = [rule1, rule2]
# Call the function (without filters)
result = list_yara_rules()
# Verify results
assert len(result) == 2
assert {"name": "rule1.yar", "source": "custom"} in result
assert {"name": "rule2.yar", "source": "community"} in result
# Verify mocks were called correctly
mock_yara_service.list_rules.assert_called_once_with(None)
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_list_yara_rules_filtered(mock_yara_service):
"""Test list_yara_rules with source filtering."""
# Setup mocks
rule1 = Mock()
rule1.model_dump.return_value = {"name": "rule1.yar", "source": "custom"}
rule2 = Mock()
rule2.model_dump.return_value = {"name": "rule2.yar", "source": "custom"}
mock_yara_service.list_rules.return_value = [rule1, rule2]
# Call the function with source filter
result = list_yara_rules("custom")
# Verify results
assert len(result) == 2
# Verify mocks were called correctly
mock_yara_service.list_rules.assert_called_once_with("custom")
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_list_yara_rules_all_source(mock_yara_service):
"""Test list_yara_rules with 'all' source."""
# Setup mocks
rule1 = Mock()
rule1.model_dump.return_value = {"name": "rule1.yar", "source": "custom"}
rule2 = Mock()
rule2.model_dump.return_value = {"name": "rule2.yar", "source": "community"}
mock_yara_service.list_rules.return_value = [rule1, rule2]
# Call the function with 'all' source
result = list_yara_rules("all")
# Verify results
assert len(result) == 2
# Verify mocks were called correctly
mock_yara_service.list_rules.assert_called_once_with(None)
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_list_yara_rules_error(mock_yara_service):
"""Test list_yara_rules with an error."""
# Setup mock to raise an exception
mock_yara_service.list_rules.side_effect = Exception("Test error")
# Call the function
result = list_yara_rules()
# Verify results
assert result == []
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_get_yara_rule_success(mock_yara_service):
"""Test get_yara_rule successfully retrieves a rule."""
# Setup mocks
mock_yara_service.get_rule.return_value = "rule test { condition: true }"
rule = Mock()
rule.name = "test.yar"
rule.model_dump.return_value = {"name": "test.yar", "source": "custom"}
mock_yara_service.list_rules.return_value = [rule]
# Call the function
result = get_yara_rule(rule_name="test.yar", source="custom")
# Verify results
assert result["success"] is True
assert result["result"]["name"] == "test.yar"
assert result["result"]["source"] == "custom"
assert result["result"]["content"] == "rule test { condition: true }"
assert result["result"]["metadata"] == {"name": "test.yar", "source": "custom"}
# Verify mocks were called correctly
mock_yara_service.get_rule.assert_called_once_with("test.yar", "custom")
mock_yara_service.list_rules.assert_called_once_with("custom")
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_get_yara_rule_invalid_source(mock_yara_service):
"""Test get_yara_rule with invalid source."""
# Call the function with invalid source
result = get_yara_rule(rule_name="test.yar", source="invalid")
# Verify results
assert result["success"] is False
assert "Invalid source" in result["message"]
# Verify mock was not called
mock_yara_service.get_rule.assert_not_called()
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_get_yara_rule_no_metadata(mock_yara_service):
"""Test get_yara_rule with no matching metadata."""
# Setup mocks
mock_yara_service.get_rule.return_value = "rule test { condition: true }"
rule = Mock()
rule.name = "other_rule.yar"
rule.model_dump.return_value = {"name": "other_rule.yar", "source": "custom"}
mock_yara_service.list_rules.return_value = [rule] # Different rule name
# Call the function
result = get_yara_rule(rule_name="test.yar", source="custom")
# Verify results
assert result["success"] is True
assert result["result"]["name"] == "test.yar"
assert result["result"]["metadata"] == {} # No metadata found
# Verify mocks were called correctly
mock_yara_service.get_rule.assert_called_once_with("test.yar", "custom")
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_get_yara_rule_error(mock_yara_service):
"""Test get_yara_rule with error."""
# Setup mock to raise an exception
mock_yara_service.get_rule.side_effect = YaraError("Rule not found")
# Call the function
result = get_yara_rule(rule_name="test.yar", source="custom")
# Verify results
assert result["success"] is False
assert "Rule not found" in result["message"]
assert result["name"] == "test.yar"
assert result["source"] == "custom"
@patch("builtins.__import__")
def test_validate_yara_rule_valid(mock_import):
"""Test validate_yara_rule with valid rule."""
# Setup mock for the yara import
mock_yara_module = Mock()
mock_import.return_value = mock_yara_module
# Call the function
result = validate_yara_rule(content="rule test { condition: true }")
# Verify results
assert "valid" in result
assert result["valid"] is True
assert result["message"] == "Rule is valid"
@patch("builtins.__import__")
def test_validate_yara_rule_invalid(mock_import):
"""Test validate_yara_rule with invalid rule."""
# Setup mocks for the yara import to raise an exception
mock_yara_module = Mock()
mock_yara_module.compile.side_effect = Exception('line 1: undefined identifier "invalid"')
mock_import.return_value = mock_yara_module
# Call the function
result = validate_yara_rule(content="rule test { condition: invalid }")
# Verify results
assert "valid" in result
assert result["valid"] is False
assert "undefined identifier" in result["message"]
assert result["error_type"] == "YaraError"
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_add_yara_rule_success(mock_yara_service):
"""Test add_yara_rule successfully adds a rule."""
# Setup mock
metadata = Mock()
metadata.model_dump.return_value = {"name": "test.yar", "source": "custom"}
mock_yara_service.add_rule.return_value = metadata
# Call the function
result = add_yara_rule(name="test.yar", content="rule test { condition: true }", source="custom")
# Verify results
assert result["success"] is True
assert "added successfully" in result["message"]
assert result["metadata"] == {"name": "test.yar", "source": "custom"}
# Verify mock was called correctly
mock_yara_service.add_rule.assert_called_once_with("test.yar", "rule test { condition: true }", "custom")
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_add_yara_rule_adds_extension(mock_yara_service):
"""Test add_yara_rule adds .yar extension if missing."""
# Setup mock
metadata = Mock()
metadata.model_dump.return_value = {"name": "test.yar", "source": "custom"}
mock_yara_service.add_rule.return_value = metadata
# Call the function without .yar extension
result = add_yara_rule(name="test", content="rule test { condition: true }", source="custom") # No .yar extension
# Verify results
assert result["success"] is True
# Verify mock was called with .yar extension
mock_yara_service.add_rule.assert_called_once_with("test.yar", "rule test { condition: true }", "custom")
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_add_yara_rule_invalid_source(mock_yara_service):
"""Test add_yara_rule with invalid source."""
# Call the function with invalid source
result = add_yara_rule(name="test.yar", content="rule test { condition: true }", source="invalid")
# Verify results
assert result["success"] is False
assert "Invalid source" in result["message"]
# Verify mock was not called
mock_yara_service.add_rule.assert_not_called()
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_add_yara_rule_empty_content(mock_yara_service):
"""Test add_yara_rule with empty content."""
# Call the function with empty content
result = add_yara_rule(name="test.yar", content=" ", source="custom") # Empty after strip
# Verify results
assert result["success"] is False
assert "content cannot be empty" in result["message"]
# Verify mock was not called
mock_yara_service.add_rule.assert_not_called()
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_add_yara_rule_error(mock_yara_service):
"""Test add_yara_rule with error."""
# Setup mock to raise an exception
mock_yara_service.add_rule.side_effect = YaraError("Compilation error")
# Call the function
result = add_yara_rule(name="test.yar", content="rule test { condition: true }", source="custom")
# Verify results
assert result["success"] is False
assert "Compilation error" in result["message"]
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_update_yara_rule_success(mock_yara_service):
"""Test update_yara_rule successfully updates a rule."""
# Setup mocks
metadata = Mock()
metadata.model_dump.return_value = {"name": "test.yar", "source": "custom"}
mock_yara_service.update_rule.return_value = metadata
# Call the function
result = update_yara_rule(name="test.yar", content="rule test { condition: true }", source="custom")
# Verify results
assert result["success"] is True
assert "updated successfully" in result["message"]
assert result["metadata"] == {"name": "test.yar", "source": "custom"}
# Verify mocks were called correctly
mock_yara_service.get_rule.assert_called_once_with("test.yar", "custom")
mock_yara_service.update_rule.assert_called_once_with("test.yar", "rule test { condition: true }", "custom")
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_update_yara_rule_not_found(mock_yara_service):
"""Test update_yara_rule with rule not found."""
# Setup mock to raise an exception
mock_yara_service.get_rule.side_effect = YaraError("Rule not found")
# Call the function
result = update_yara_rule(name="test.yar", content="rule test { condition: true }", source="custom")
# Verify results
assert result["success"] is False
assert "Rule not found" in result["message"]
# Verify only get_rule was called, not update_rule
mock_yara_service.get_rule.assert_called_once_with("test.yar", "custom")
mock_yara_service.update_rule.assert_not_called()
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_delete_yara_rule_success(mock_yara_service):
"""Test delete_yara_rule successfully deletes a rule."""
# Setup mock
mock_yara_service.delete_rule.return_value = True
# Call the function
result = delete_yara_rule(name="test.yar", source="custom")
# Verify results
assert result["success"] is True
assert "deleted successfully" in result["message"]
# Verify mock was called correctly
mock_yara_service.delete_rule.assert_called_once_with("test.yar", "custom")
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_delete_yara_rule_not_found(mock_yara_service):
"""Test delete_yara_rule with rule not found."""
# Setup mock
mock_yara_service.delete_rule.return_value = False
# Call the function
result = delete_yara_rule(name="test.yar", source="custom")
# Verify results
assert result["success"] is False
assert "not found" in result["message"]
# Verify mock was called correctly
mock_yara_service.delete_rule.assert_called_once_with("test.yar", "custom")
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_delete_yara_rule_error(mock_yara_service):
"""Test delete_yara_rule with error."""
# Setup mock to raise an exception
mock_yara_service.delete_rule.side_effect = YaraError("Permission denied")
# Call the function
result = delete_yara_rule(name="test.yar", source="custom")
# Verify results
assert result["success"] is False
assert "Permission denied" in result["message"]
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.httpx")
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_import_threatflux_rules_success(mock_yara_service, mock_httpx):
"""Test import_threatflux_rules successfully imports rules."""
# Setup mock test response
mock_test_response = MagicMock()
mock_test_response.status_code = 200
# Setup mock index response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"rules": ["rule1.yar", "rule2.yar"]}
# Setup mock response for rule files
mock_rule_response = MagicMock()
mock_rule_response.status_code = 200
mock_rule_response.text = "rule test { condition: true }"
# Configure httpx mock to return different responses for different calls
mock_httpx.get.side_effect = [mock_test_response, mock_response, mock_rule_response, mock_rule_response]
# Call the function
result = import_threatflux_rules()
# Verify results
assert result["success"] is True
# Verify yara_service was called
assert mock_yara_service.add_rule.call_count >= 1
mock_yara_service.load_rules.assert_called_once()
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.httpx")
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_import_threatflux_rules_with_custom_url(mock_yara_service, mock_httpx):
"""Test import_threatflux_rules with custom URL."""
# Setup mock test response
mock_test_response = MagicMock()
mock_test_response.status_code = 200
# Setup mock response for index.json
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"rules": ["rule1.yar"]}
# Setup mock response for rule file
mock_rule_response = MagicMock()
mock_rule_response.status_code = 200
mock_rule_response.text = "rule test { condition: true }"
# Configure httpx mock to return different responses
mock_httpx.get.side_effect = [mock_test_response, mock_response, mock_rule_response]
# Call the function with custom URL
result = import_threatflux_rules(url="https://github.com/custom/repo")
# Verify results
assert result["success"] is True
# Verify connection test was made first
mock_httpx.get.assert_any_call("https://raw.githubusercontent.com/custom/repo", timeout=10)
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.httpx")
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_import_threatflux_rules_no_index(mock_yara_service, mock_httpx):
"""Test import_threatflux_rules with no index.json."""
# Setup initial test response (success)
mock_test_response = MagicMock()
mock_test_response.status_code = 200
# Setup mock response for index.json (not found)
mock_response = MagicMock()
mock_response.status_code = 404
# Setup mock response for rule file
mock_rule_response = MagicMock()
mock_rule_response.status_code = 200
mock_rule_response.text = "rule test { condition: true }"
# Configure httpx mock to return different responses
# First 200 for test, then 404 for index, then a few 200s for rule files
mock_httpx.get.side_effect = [mock_test_response, mock_response, mock_rule_response, mock_rule_response]
# Call the function
result = import_threatflux_rules()
# Still should successfully import some rules
assert result["success"] is True
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.httpx")
@patch("yaraflux_mcp_server.mcp_tools.rule_tools.yara_service")
def test_import_threatflux_rules_error(mock_yara_service, mock_httpx):
"""Test import_threatflux_rules with error."""
# Setup httpx to raise an exception for the first get call
mock_httpx.get.side_effect = Exception("Connection error")
# Call the function
result = import_threatflux_rules()
# Verify results - with our new connection test implementation
assert isinstance(result, dict)
assert "success" in result
assert not result["success"] # Should be False
assert "message" in result
assert "Connection error" in result["message"]
assert "error" in result