Skip to main content
Glama
kebabmane

Amazon Security Lake MCP Server

by kebabmane
test_ocsf_validators.py13.7 kB
import pytest from src.asl_mcp_server.ocsf.validators import OCSFValidator class TestOCSFValidator: """Test cases for OCSF validation functionality""" @pytest.fixture def validator(self): """Create validator instance for testing""" return OCSFValidator() def test_valid_event_validation(self, validator, sample_ocsf_event): """Test validation of a valid OCSF event""" is_valid, errors = validator.validate_event(sample_ocsf_event) assert is_valid is True assert len(errors) == 0 def test_invalid_event_validation(self, validator, invalid_ocsf_event): """Test validation of an invalid OCSF event""" is_valid, errors = validator.validate_event(invalid_ocsf_event) assert is_valid is False assert len(errors) > 0 # Check for specific error types error_messages = " ".join(errors) assert "Required field" in error_messages assert "missing" in error_messages def test_required_fields_validation(self, validator): """Test validation of required fields""" # Missing all required fields empty_event = {} is_valid, errors = validator.validate_event(empty_event) assert is_valid is False required_fields = ["time", "type_name", "type_uid", "class_name", "class_uid", "metadata"] for field in required_fields: assert any(field in error for error in errors) def test_field_type_validation(self, validator): """Test validation of field data types""" event_with_wrong_types = { "time": "2024-01-15T10:30:00Z", "type_name": "Network Activity", "type_uid": "should_be_int", # Wrong type "class_name": "Network Activity", "class_uid": 4001, "severity_id": "should_be_int", # Wrong type "metadata": { "version": "1.1.0", "product": {"name": "Test"} } } is_valid, errors = validator.validate_event(event_with_wrong_types) assert is_valid is False assert any("type_uid" in error and "should be int" in error for error in errors) assert any("severity_id" in error and "should be int" in error for error in errors) def test_timestamp_validation(self, validator): """Test timestamp field validation""" # Valid timestamp event_valid_time = { "time": "2024-01-15T10:30:00Z", "type_name": "Test", "type_uid": 1001, "class_name": "Test", "class_uid": 1001, "metadata": {"version": "1.0", "product": {"name": "Test"}} } is_valid, errors = validator.validate_event(event_valid_time) assert is_valid is True # Invalid timestamp format event_invalid_time = event_valid_time.copy() event_invalid_time["time"] = "not-a-timestamp" is_valid, errors = validator.validate_event(event_invalid_time) assert is_valid is False assert any("timestamp format" in error.lower() for error in errors) def test_metadata_validation(self, validator): """Test metadata structure validation""" base_event = { "time": "2024-01-15T10:30:00Z", "type_name": "Test", "type_uid": 1001, "class_name": "Test", "class_uid": 1001 } # Missing metadata event_no_metadata = base_event.copy() is_valid, errors = validator.validate_event(event_no_metadata) assert is_valid is False assert any("Required field 'metadata'" in error for error in errors) # Invalid metadata type event_invalid_metadata = base_event.copy() event_invalid_metadata["metadata"] = "should_be_dict" is_valid, errors = validator.validate_event(event_invalid_metadata) assert is_valid is False assert any("Metadata must be a dictionary" in error for error in errors) # Missing required metadata fields event_incomplete_metadata = base_event.copy() event_incomplete_metadata["metadata"] = {"version": "1.0"} # Missing product is_valid, errors = validator.validate_event(event_incomplete_metadata) assert is_valid is False assert any("Metadata missing required field 'product'" in error for error in errors) def test_cloud_context_validation(self, validator): """Test cloud context validation""" base_event = { "time": "2024-01-15T10:30:00Z", "type_name": "Test", "type_uid": 1001, "class_name": "Test", "class_uid": 1001, "metadata": {"version": "1.0", "product": {"name": "Test"}} } # Valid cloud context event_valid_cloud = base_event.copy() event_valid_cloud["cloud"] = { "account": {"uid": "123456789012"}, "region": "us-east-1" } is_valid, errors = validator.validate_event(event_valid_cloud) assert is_valid is True # Invalid cloud context type event_invalid_cloud = base_event.copy() event_invalid_cloud["cloud"] = "should_be_dict" is_valid, errors = validator.validate_event(event_invalid_cloud) assert is_valid is False assert any("Cloud context must be a dictionary" in error for error in errors) def test_endpoint_validation(self, validator): """Test endpoint validation""" base_event = { "time": "2024-01-15T10:30:00Z", "type_name": "Test", "type_uid": 1001, "class_name": "Test", "class_uid": 1001, "metadata": {"version": "1.0", "product": {"name": "Test"}} } # Valid endpoint event_valid_endpoint = base_event.copy() event_valid_endpoint["src_endpoint"] = { "ip": "192.168.1.1", "port": 8080 } is_valid, errors = validator.validate_event(event_valid_endpoint) assert is_valid is True # Invalid IP address event_invalid_ip = base_event.copy() event_invalid_ip["src_endpoint"] = {"ip": "invalid-ip"} is_valid, errors = validator.validate_event(event_invalid_ip) assert is_valid is False assert any("Invalid IP address" in error for error in errors) # Invalid port number event_invalid_port = base_event.copy() event_invalid_port["dst_endpoint"] = {"port": 99999} is_valid, errors = validator.validate_event(event_invalid_port) assert is_valid is False assert any("Invalid port number" in error for error in errors) def test_finding_validation(self, validator): """Test finding structure validation""" base_event = { "time": "2024-01-15T10:30:00Z", "type_name": "Test", "type_uid": 1001, "class_name": "Test", "class_uid": 1001, "metadata": {"version": "1.0", "product": {"name": "Test"}} } # Valid finding event_valid_finding = base_event.copy() event_valid_finding["finding"] = { "uid": "finding-123", "title": "Test Finding", "types": ["malware", "network"] } is_valid, errors = validator.validate_event(event_valid_finding) assert is_valid is True # Missing finding UID event_no_uid = base_event.copy() event_no_uid["finding"] = {"title": "Test"} is_valid, errors = validator.validate_event(event_no_uid) assert is_valid is False assert any("Finding missing required 'uid' field" in error for error in errors) def test_security_lake_record_validation(self, validator, sample_ocsf_event): """Test complete Security Lake record validation""" result = validator.validate_security_lake_record(sample_ocsf_event) assert result["is_valid"] is True assert len(result["errors"]) == 0 assert "ocsf_compliance" in result assert "suggestions" in result assert "warnings" in result # Check OCSF compliance scoring compliance = result["ocsf_compliance"] assert "score" in compliance assert "compliance_level" in compliance assert "found_fields" in compliance assert compliance["score"] > 0 def test_ocsf_compliance_scoring(self, validator): """Test OCSF compliance scoring logic""" # Minimal event minimal_event = { "time": "2024-01-15T10:30:00Z", "type_name": "Test", "type_uid": 1001, "class_name": "Test", "class_uid": 1001, "metadata": {"version": "1.0", "product": {"name": "Test"}} } compliance = validator._check_ocsf_compliance(minimal_event) assert compliance["compliance_level"] in ["MINIMAL", "LOW"] # More complete event complete_event = minimal_event.copy() complete_event.update({ "severity": "Medium", "severity_id": 3, "activity_name": "Test Activity", "cloud": {"account": {"uid": "123456789012"}}, "src_endpoint": {"ip": "192.168.1.1"} }) compliance = validator._check_ocsf_compliance(complete_event) assert compliance["score"] > 50 assert compliance["compliance_level"] in ["MEDIUM", "HIGH"] def test_improvement_suggestions(self, validator): """Test improvement suggestions generation""" minimal_event = { "type_name": "Test", "type_uid": 1001, "class_name": "Test", "class_uid": 1001, "metadata": {"version": "1.0", "product": {"name": "Test"}} } suggestions = validator._generate_improvement_suggestions(minimal_event, []) assert len(suggestions) > 0 assert any("time" in suggestion for suggestion in suggestions) assert any("severity" in suggestion for suggestion in suggestions) def test_warning_detection(self, validator): """Test warning detection for non-critical issues""" event_with_issues = { "time": "2024-01-15T10:30:00Z", "eventTime": "2024-01-15T10:30:00Z", # Deprecated field "type_name": "Test", "type_uid": 1001, "class_name": "Test", "class_uid": 1001, "severity": "High", "severity_id": 2, # Mismatch with severity "metadata": {"version": "1.0", "product": {"name": "Test"}} } warnings = validator._check_for_warnings(event_with_issues) assert len(warnings) > 0 assert any("eventTime" in warning for warning in warnings) assert any("severity" in warning and "doesn't match" in warning for warning in warnings) def test_ip_validation_helper(self, validator): """Test IP address validation helper method""" assert validator._is_valid_ip("192.168.1.1") is True assert validator._is_valid_ip("2001:db8::1") is True assert validator._is_valid_ip("invalid-ip") is False assert validator._is_valid_ip("999.999.999.999") is False def test_severity_id_mapping(self, validator): """Test severity ID mapping helper""" assert validator._get_expected_severity_id("Critical") == 5 assert validator._get_expected_severity_id("High") == 4 assert validator._get_expected_severity_id("Medium") == 3 assert validator._get_expected_severity_id("Low") == 2 assert validator._get_expected_severity_id("Informational") == 1 assert validator._get_expected_severity_id("Unknown") == 99 def test_future_timestamp_warning(self, validator): """Test detection of future timestamps""" from datetime import datetime, timedelta future_time = (datetime.utcnow() + timedelta(days=2)).isoformat() + "Z" event_future_time = { "time": future_time, "type_name": "Test", "type_uid": 1001, "class_name": "Test", "class_uid": 1001, "metadata": {"version": "1.0", "product": {"name": "Test"}} } is_valid, errors = validator.validate_event(event_future_time) assert is_valid is False assert any("days in the future" in error for error in errors) def test_validation_error_accumulation(self, validator): """Test that validation errors are properly accumulated""" completely_invalid_event = { "type_uid": "not_an_int", "severity_id": "not_an_int", "time": "invalid-timestamp", "metadata": "not_a_dict", "cloud": "not_a_dict", "src_endpoint": {"ip": "invalid-ip", "port": -1} } is_valid, errors = validator.validate_event(completely_invalid_event) assert is_valid is False assert len(errors) >= 5 # Should have multiple validation errors # Verify different types of errors are present error_text = " ".join(errors) assert "Required field" in error_text assert "should be int" in error_text assert "timestamp format" in error_text assert "must be a dictionary" in error_text

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kebabmane/asl-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server