import pytest
from src.asl_mcp_server.ocsf.validators import OCSFValidator
class TestOCSFValidator:
"""Test cases for OCSF validation functionality"""
@pytest.fixture
def validator(self):
"""Create validator instance for testing"""
return OCSFValidator()
def test_valid_event_validation(self, validator, sample_ocsf_event):
"""Test validation of a valid OCSF event"""
is_valid, errors = validator.validate_event(sample_ocsf_event)
assert is_valid is True
assert len(errors) == 0
def test_invalid_event_validation(self, validator, invalid_ocsf_event):
"""Test validation of an invalid OCSF event"""
is_valid, errors = validator.validate_event(invalid_ocsf_event)
assert is_valid is False
assert len(errors) > 0
# Check for specific error types
error_messages = " ".join(errors)
assert "Required field" in error_messages
assert "missing" in error_messages
def test_required_fields_validation(self, validator):
"""Test validation of required fields"""
# Missing all required fields
empty_event = {}
is_valid, errors = validator.validate_event(empty_event)
assert is_valid is False
required_fields = ["time", "type_name", "type_uid", "class_name", "class_uid", "metadata"]
for field in required_fields:
assert any(field in error for error in errors)
def test_field_type_validation(self, validator):
"""Test validation of field data types"""
event_with_wrong_types = {
"time": "2024-01-15T10:30:00Z",
"type_name": "Network Activity",
"type_uid": "should_be_int", # Wrong type
"class_name": "Network Activity",
"class_uid": 4001,
"severity_id": "should_be_int", # Wrong type
"metadata": {
"version": "1.1.0",
"product": {"name": "Test"}
}
}
is_valid, errors = validator.validate_event(event_with_wrong_types)
assert is_valid is False
assert any("type_uid" in error and "should be int" in error for error in errors)
assert any("severity_id" in error and "should be int" in error for error in errors)
def test_timestamp_validation(self, validator):
"""Test timestamp field validation"""
# Valid timestamp
event_valid_time = {
"time": "2024-01-15T10:30:00Z",
"type_name": "Test",
"type_uid": 1001,
"class_name": "Test",
"class_uid": 1001,
"metadata": {"version": "1.0", "product": {"name": "Test"}}
}
is_valid, errors = validator.validate_event(event_valid_time)
assert is_valid is True
# Invalid timestamp format
event_invalid_time = event_valid_time.copy()
event_invalid_time["time"] = "not-a-timestamp"
is_valid, errors = validator.validate_event(event_invalid_time)
assert is_valid is False
assert any("timestamp format" in error.lower() for error in errors)
def test_metadata_validation(self, validator):
"""Test metadata structure validation"""
base_event = {
"time": "2024-01-15T10:30:00Z",
"type_name": "Test",
"type_uid": 1001,
"class_name": "Test",
"class_uid": 1001
}
# Missing metadata
event_no_metadata = base_event.copy()
is_valid, errors = validator.validate_event(event_no_metadata)
assert is_valid is False
assert any("Required field 'metadata'" in error for error in errors)
# Invalid metadata type
event_invalid_metadata = base_event.copy()
event_invalid_metadata["metadata"] = "should_be_dict"
is_valid, errors = validator.validate_event(event_invalid_metadata)
assert is_valid is False
assert any("Metadata must be a dictionary" in error for error in errors)
# Missing required metadata fields
event_incomplete_metadata = base_event.copy()
event_incomplete_metadata["metadata"] = {"version": "1.0"} # Missing product
is_valid, errors = validator.validate_event(event_incomplete_metadata)
assert is_valid is False
assert any("Metadata missing required field 'product'" in error for error in errors)
def test_cloud_context_validation(self, validator):
"""Test cloud context validation"""
base_event = {
"time": "2024-01-15T10:30:00Z",
"type_name": "Test",
"type_uid": 1001,
"class_name": "Test",
"class_uid": 1001,
"metadata": {"version": "1.0", "product": {"name": "Test"}}
}
# Valid cloud context
event_valid_cloud = base_event.copy()
event_valid_cloud["cloud"] = {
"account": {"uid": "123456789012"},
"region": "us-east-1"
}
is_valid, errors = validator.validate_event(event_valid_cloud)
assert is_valid is True
# Invalid cloud context type
event_invalid_cloud = base_event.copy()
event_invalid_cloud["cloud"] = "should_be_dict"
is_valid, errors = validator.validate_event(event_invalid_cloud)
assert is_valid is False
assert any("Cloud context must be a dictionary" in error for error in errors)
def test_endpoint_validation(self, validator):
"""Test endpoint validation"""
base_event = {
"time": "2024-01-15T10:30:00Z",
"type_name": "Test",
"type_uid": 1001,
"class_name": "Test",
"class_uid": 1001,
"metadata": {"version": "1.0", "product": {"name": "Test"}}
}
# Valid endpoint
event_valid_endpoint = base_event.copy()
event_valid_endpoint["src_endpoint"] = {
"ip": "192.168.1.1",
"port": 8080
}
is_valid, errors = validator.validate_event(event_valid_endpoint)
assert is_valid is True
# Invalid IP address
event_invalid_ip = base_event.copy()
event_invalid_ip["src_endpoint"] = {"ip": "invalid-ip"}
is_valid, errors = validator.validate_event(event_invalid_ip)
assert is_valid is False
assert any("Invalid IP address" in error for error in errors)
# Invalid port number
event_invalid_port = base_event.copy()
event_invalid_port["dst_endpoint"] = {"port": 99999}
is_valid, errors = validator.validate_event(event_invalid_port)
assert is_valid is False
assert any("Invalid port number" in error for error in errors)
def test_finding_validation(self, validator):
"""Test finding structure validation"""
base_event = {
"time": "2024-01-15T10:30:00Z",
"type_name": "Test",
"type_uid": 1001,
"class_name": "Test",
"class_uid": 1001,
"metadata": {"version": "1.0", "product": {"name": "Test"}}
}
# Valid finding
event_valid_finding = base_event.copy()
event_valid_finding["finding"] = {
"uid": "finding-123",
"title": "Test Finding",
"types": ["malware", "network"]
}
is_valid, errors = validator.validate_event(event_valid_finding)
assert is_valid is True
# Missing finding UID
event_no_uid = base_event.copy()
event_no_uid["finding"] = {"title": "Test"}
is_valid, errors = validator.validate_event(event_no_uid)
assert is_valid is False
assert any("Finding missing required 'uid' field" in error for error in errors)
def test_security_lake_record_validation(self, validator, sample_ocsf_event):
"""Test complete Security Lake record validation"""
result = validator.validate_security_lake_record(sample_ocsf_event)
assert result["is_valid"] is True
assert len(result["errors"]) == 0
assert "ocsf_compliance" in result
assert "suggestions" in result
assert "warnings" in result
# Check OCSF compliance scoring
compliance = result["ocsf_compliance"]
assert "score" in compliance
assert "compliance_level" in compliance
assert "found_fields" in compliance
assert compliance["score"] > 0
def test_ocsf_compliance_scoring(self, validator):
"""Test OCSF compliance scoring logic"""
# Minimal event
minimal_event = {
"time": "2024-01-15T10:30:00Z",
"type_name": "Test",
"type_uid": 1001,
"class_name": "Test",
"class_uid": 1001,
"metadata": {"version": "1.0", "product": {"name": "Test"}}
}
compliance = validator._check_ocsf_compliance(minimal_event)
assert compliance["compliance_level"] in ["MINIMAL", "LOW"]
# More complete event
complete_event = minimal_event.copy()
complete_event.update({
"severity": "Medium",
"severity_id": 3,
"activity_name": "Test Activity",
"cloud": {"account": {"uid": "123456789012"}},
"src_endpoint": {"ip": "192.168.1.1"}
})
compliance = validator._check_ocsf_compliance(complete_event)
assert compliance["score"] > 50
assert compliance["compliance_level"] in ["MEDIUM", "HIGH"]
def test_improvement_suggestions(self, validator):
"""Test improvement suggestions generation"""
minimal_event = {
"type_name": "Test",
"type_uid": 1001,
"class_name": "Test",
"class_uid": 1001,
"metadata": {"version": "1.0", "product": {"name": "Test"}}
}
suggestions = validator._generate_improvement_suggestions(minimal_event, [])
assert len(suggestions) > 0
assert any("time" in suggestion for suggestion in suggestions)
assert any("severity" in suggestion for suggestion in suggestions)
def test_warning_detection(self, validator):
"""Test warning detection for non-critical issues"""
event_with_issues = {
"time": "2024-01-15T10:30:00Z",
"eventTime": "2024-01-15T10:30:00Z", # Deprecated field
"type_name": "Test",
"type_uid": 1001,
"class_name": "Test",
"class_uid": 1001,
"severity": "High",
"severity_id": 2, # Mismatch with severity
"metadata": {"version": "1.0", "product": {"name": "Test"}}
}
warnings = validator._check_for_warnings(event_with_issues)
assert len(warnings) > 0
assert any("eventTime" in warning for warning in warnings)
assert any("severity" in warning and "doesn't match" in warning for warning in warnings)
def test_ip_validation_helper(self, validator):
"""Test IP address validation helper method"""
assert validator._is_valid_ip("192.168.1.1") is True
assert validator._is_valid_ip("2001:db8::1") is True
assert validator._is_valid_ip("invalid-ip") is False
assert validator._is_valid_ip("999.999.999.999") is False
def test_severity_id_mapping(self, validator):
"""Test severity ID mapping helper"""
assert validator._get_expected_severity_id("Critical") == 5
assert validator._get_expected_severity_id("High") == 4
assert validator._get_expected_severity_id("Medium") == 3
assert validator._get_expected_severity_id("Low") == 2
assert validator._get_expected_severity_id("Informational") == 1
assert validator._get_expected_severity_id("Unknown") == 99
def test_future_timestamp_warning(self, validator):
"""Test detection of future timestamps"""
from datetime import datetime, timedelta
future_time = (datetime.utcnow() + timedelta(days=2)).isoformat() + "Z"
event_future_time = {
"time": future_time,
"type_name": "Test",
"type_uid": 1001,
"class_name": "Test",
"class_uid": 1001,
"metadata": {"version": "1.0", "product": {"name": "Test"}}
}
is_valid, errors = validator.validate_event(event_future_time)
assert is_valid is False
assert any("days in the future" in error for error in errors)
def test_validation_error_accumulation(self, validator):
"""Test that validation errors are properly accumulated"""
completely_invalid_event = {
"type_uid": "not_an_int",
"severity_id": "not_an_int",
"time": "invalid-timestamp",
"metadata": "not_a_dict",
"cloud": "not_a_dict",
"src_endpoint": {"ip": "invalid-ip", "port": -1}
}
is_valid, errors = validator.validate_event(completely_invalid_event)
assert is_valid is False
assert len(errors) >= 5 # Should have multiple validation errors
# Verify different types of errors are present
error_text = " ".join(errors)
assert "Required field" in error_text
assert "should be int" in error_text
assert "timestamp format" in error_text
assert "must be a dictionary" in error_text