We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/docdyhr/simplenote-mcp-server'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Tests for security alerting system."""
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, Mock, patch
import pytest
from simplenote_mcp.server.alerting import (
AlertSeverity,
AlertType,
SecurityAlert,
SecurityAlerter,
alert_authentication_failure,
alert_dangerous_input,
alert_rate_limit_violation,
alert_suspicious_pattern,
get_alerter,
)
from simplenote_mcp.server.errors import SecurityError
class TestSecurityAlert:
"""Test SecurityAlert class."""
def test_alert_creation(self):
"""Test creating a security alert."""
alert = SecurityAlert(
alert_type=AlertType.DANGEROUS_INPUT,
severity=AlertSeverity.HIGH,
message="Test alert message",
context={"key": "value"},
user_id="test_user",
client_info={"ip": "127.0.0.1"},
)
assert alert.alert_type == AlertType.DANGEROUS_INPUT
assert alert.severity == AlertSeverity.HIGH
assert alert.message == "Test alert message"
assert alert.context == {"key": "value"}
assert alert.user_id == "test_user"
assert alert.client_info == {"ip": "127.0.0.1"}
assert isinstance(alert.timestamp, datetime)
assert alert.alert_id.startswith("dangerous_input_")
def test_alert_to_dict(self):
"""Test converting alert to dictionary."""
alert = SecurityAlert(
alert_type=AlertType.RATE_LIMIT_VIOLATION,
severity=AlertSeverity.MEDIUM,
message="Rate limit exceeded",
context={"requests": 100},
)
alert_dict = alert.to_dict()
assert alert_dict["alert_type"] == "rate_limit_violation"
assert alert_dict["severity"] == "MEDIUM"
assert alert_dict["message"] == "Rate limit exceeded"
assert alert_dict["context"] == {"requests": 100}
assert "timestamp" in alert_dict
assert "alert_id" in alert_dict
def test_alert_string_representation(self):
"""Test string representation of alert."""
alert = SecurityAlert(
alert_type=AlertType.AUTHENTICATION_FAILURE,
severity=AlertSeverity.LOW,
message="Login failed",
context={},
)
alert_str = str(alert)
assert alert_str == "[LOW] authentication_failure: Login failed"
class TestSecurityAlerter:
"""Test SecurityAlerter class."""
def setup_method(self):
"""Set up test environment."""
self.test_config = {
"failed_auth_threshold": 3,
"rate_limit_threshold": 2,
"suspicious_pattern_threshold": 2,
"time_window_minutes": 5,
"enable_file_alerts": False, # Disable file alerts for tests
"enable_email_alerts": False,
"enable_webhook_alerts": False,
}
self.alerter = SecurityAlerter(self.test_config)
@pytest.mark.asyncio
async def test_create_alert(self):
"""Test creating an alert."""
alert = await self.alerter.create_alert(
AlertType.DANGEROUS_INPUT,
AlertSeverity.HIGH,
"Test dangerous input",
{"pattern": "SELECT * FROM"},
user_id="test_user",
)
assert alert.alert_type == AlertType.DANGEROUS_INPUT
assert alert.severity == AlertSeverity.HIGH
assert alert.message == "Test dangerous input"
assert alert.user_id == "test_user"
# Check it was added to history
assert len(self.alerter.alert_history) == 1
assert self.alerter.alert_history[0] == alert
@pytest.mark.asyncio
async def test_authentication_failure_pattern(self):
"""Test authentication failure pattern detection."""
user_id = "test_user"
# Create multiple authentication failures
for i in range(self.test_config["failed_auth_threshold"]):
await self.alerter.create_alert(
AlertType.AUTHENTICATION_FAILURE,
AlertSeverity.MEDIUM,
f"Auth failure {i}",
{"attempt": i},
user_id=user_id,
)
# Should have created additional alert for repeated failures
# Initial alerts + 1 escalation alert
expected_alerts = self.test_config["failed_auth_threshold"] + 1
assert len(self.alerter.alert_history) == expected_alerts
# Check the escalation alert
escalation_alert = self.alerter.alert_history[-1]
assert escalation_alert.alert_type == AlertType.REPEATED_FAILURES
assert escalation_alert.severity == AlertSeverity.HIGH
@pytest.mark.asyncio
async def test_rate_limit_violation_pattern(self):
"""Test rate limit violation pattern detection."""
user_id = "test_user"
# Create multiple rate limit violations
for i in range(self.test_config["rate_limit_threshold"]):
await self.alerter.create_alert(
AlertType.RATE_LIMIT_VIOLATION,
AlertSeverity.MEDIUM,
f"Rate limit violation {i}",
{"requests": 100 + i},
user_id=user_id,
)
# Should have created additional alert for anomalous behavior
expected_alerts = self.test_config["rate_limit_threshold"] + 1
assert len(self.alerter.alert_history) == expected_alerts
# Check the escalation alert
escalation_alert = self.alerter.alert_history[-1]
assert escalation_alert.alert_type == AlertType.ANOMALOUS_BEHAVIOR
assert escalation_alert.severity == AlertSeverity.HIGH
@pytest.mark.asyncio
async def test_suspicious_input_pattern(self):
"""Test suspicious input pattern detection."""
user_id = "test_user"
# Create multiple dangerous input alerts
for i in range(self.test_config["suspicious_pattern_threshold"]):
await self.alerter.create_alert(
AlertType.DANGEROUS_INPUT,
AlertSeverity.HIGH,
f"Dangerous input {i}",
{"pattern": f"pattern_{i}"},
user_id=user_id,
)
# Should have created additional alert for security threshold exceeded
expected_alerts = self.test_config["suspicious_pattern_threshold"] + 1
assert len(self.alerter.alert_history) == expected_alerts
# Check the escalation alert
escalation_alert = self.alerter.alert_history[-1]
assert escalation_alert.alert_type == AlertType.SECURITY_THRESHOLD_EXCEEDED
assert escalation_alert.severity == AlertSeverity.CRITICAL
def test_get_recent_alerts(self):
"""Test getting recent alerts."""
# Add some test alerts manually
old_alert = SecurityAlert(
AlertType.DANGEROUS_INPUT,
AlertSeverity.LOW,
"Old alert",
{},
)
old_alert.timestamp = datetime.utcnow() - timedelta(hours=2)
recent_alert = SecurityAlert(
AlertType.RATE_LIMIT_VIOLATION,
AlertSeverity.HIGH,
"Recent alert",
{},
)
self.alerter.alert_history.extend([old_alert, recent_alert])
# Get recent alerts (last 60 minutes)
recent_alerts = self.alerter.get_recent_alerts(minutes=60)
assert len(recent_alerts) == 1
assert recent_alerts[0] == recent_alert
# Get alerts with severity filter
high_alerts = self.alerter.get_recent_alerts(
minutes=180, severity=AlertSeverity.HIGH
)
assert len(high_alerts) == 1
assert high_alerts[0] == recent_alert
# Get alerts with type filter
dangerous_alerts = self.alerter.get_recent_alerts(
minutes=180, alert_type=AlertType.DANGEROUS_INPUT
)
assert len(dangerous_alerts) == 1
assert dangerous_alerts[0] == old_alert
def test_get_alert_summary(self):
"""Test getting alert summary."""
# Add test alerts
alerts = [
SecurityAlert(
AlertType.DANGEROUS_INPUT, AlertSeverity.HIGH, "Alert 1", {}, "user1"
),
SecurityAlert(
AlertType.RATE_LIMIT_VIOLATION,
AlertSeverity.MEDIUM,
"Alert 2",
{},
"user2",
),
SecurityAlert(
AlertType.DANGEROUS_INPUT, AlertSeverity.HIGH, "Alert 3", {}, "user1"
),
]
self.alerter.alert_history.extend(alerts)
summary = self.alerter.get_alert_summary(minutes=60)
assert summary["total_alerts"] == 3
assert summary["by_severity"]["HIGH"] == 2
assert summary["by_severity"]["MEDIUM"] == 1
assert summary["by_type"]["dangerous_input"] == 2
assert summary["by_type"]["rate_limit_violation"] == 1
assert len(summary["affected_users"]) == 2
assert "user1" in summary["affected_users"]
assert "user2" in summary["affected_users"]
assert summary["latest_alert"] is not None
def test_cleanup_old_data(self):
"""Test cleaning up old tracking data."""
# Add some test data
self.alerter.failure_counts["user1"] = 5
self.alerter.failure_counts["user2"] = 3
# Add many entries to trigger cleanup
for i in range(1001):
self.alerter.failure_counts[f"user_{i}"] = 1
self.alerter.cleanup_old_data(days=7)
# Should have cleared the data
assert len(self.alerter.failure_counts) == 0
class TestAlertingFunctions:
"""Test alerting convenience functions."""
@pytest.mark.asyncio
async def test_alert_authentication_failure(self):
"""Test authentication failure alert function."""
with patch("simplenote_mcp.server.alerting.get_alerter") as mock_get_alerter:
mock_alerter = Mock()
mock_alerter.create_alert = AsyncMock()
mock_get_alerter.return_value = mock_alerter
await alert_authentication_failure(
"test_user", "Invalid password", {"ip": "127.0.0.1"}
)
mock_alerter.create_alert.assert_called_once()
call_args = mock_alerter.create_alert.call_args
assert call_args[0][0] == AlertType.AUTHENTICATION_FAILURE
assert call_args[0][1] == AlertSeverity.MEDIUM
assert "test_user" in call_args[0][2]
assert call_args[1]["user_id"] == "test_user"
@pytest.mark.asyncio
async def test_alert_rate_limit_violation(self):
"""Test rate limit violation alert function."""
with patch("simplenote_mcp.server.alerting.get_alerter") as mock_get_alerter:
mock_alerter = Mock()
mock_alerter.create_alert = AsyncMock()
mock_get_alerter.return_value = mock_alerter
await alert_rate_limit_violation("test_user", 150, 100, {"source": "api"})
mock_alerter.create_alert.assert_called_once()
call_args = mock_alerter.create_alert.call_args
assert call_args[0][0] == AlertType.RATE_LIMIT_VIOLATION
assert call_args[0][1] == AlertSeverity.MEDIUM
assert "150/100" in call_args[0][2]
@pytest.mark.asyncio
async def test_alert_dangerous_input(self):
"""Test dangerous input alert function."""
with patch("simplenote_mcp.server.alerting.get_alerter") as mock_get_alerter:
mock_alerter = Mock()
mock_alerter.create_alert = AsyncMock()
mock_get_alerter.return_value = mock_alerter
await alert_dangerous_input(
"test_user", "note_content", "SELECT * FROM users", {"client": "web"}
)
mock_alerter.create_alert.assert_called_once()
call_args = mock_alerter.create_alert.call_args
assert call_args[0][0] == AlertType.DANGEROUS_INPUT
assert call_args[0][1] == AlertSeverity.HIGH
assert "note_content" in call_args[0][2]
@pytest.mark.asyncio
async def test_alert_suspicious_pattern(self):
"""Test suspicious pattern alert function."""
with patch("simplenote_mcp.server.alerting.get_alerter") as mock_get_alerter:
mock_alerter = Mock()
mock_alerter.create_alert = AsyncMock()
mock_get_alerter.return_value = mock_alerter
await alert_suspicious_pattern(
"test_user",
"Rapid sequential requests",
{"request_count": 200, "time_window": "1 minute"},
{"user_agent": "bot"},
)
mock_alerter.create_alert.assert_called_once()
call_args = mock_alerter.create_alert.call_args
assert call_args[0][0] == AlertType.SUSPICIOUS_PATTERN
assert call_args[0][1] == AlertSeverity.MEDIUM
assert "Rapid sequential requests" in call_args[0][2]
class TestGlobalAlerter:
"""Test global alerter functionality."""
def test_get_global_alerter(self):
"""Test getting global alerter instance."""
alerter1 = get_alerter()
alerter2 = get_alerter()
# Should return the same instance
assert alerter1 is alerter2
assert isinstance(alerter1, SecurityAlerter)
def test_global_alerter_functionality(self):
"""Test global alerter works correctly."""
alerter = get_alerter()
# Should have default configuration
assert alerter.thresholds["failed_auth_threshold"] == 5
assert alerter.thresholds["rate_limit_threshold"] == 3
assert alerter.enable_file_alerts is True
class TestAlertIntegration:
"""Test integration between alerting and other security components."""
@pytest.mark.asyncio
async def test_security_event_triggers_alert(self):
"""Test that security events trigger alerts correctly."""
from simplenote_mcp.server.security import security_validator
# Test dangerous content detection triggers alert
with pytest.raises(SecurityError): # SecurityError expected
security_validator.validate_note_content(
"SELECT * FROM users WHERE 1=1", "test content"
)
# Check that alert was triggered (this tests the integration)
# In a real test, we would mock the alerting system
def test_rate_limit_triggers_alert(self):
"""Test that rate limit violations trigger alerts."""
# Trigger multiple rate limit violations
# This would test the integration with alerting
# In a real test, we would mock the alerting system