#!/usr/bin/env python3
"""
Comprehensive test suite for Vulnerability Intelligence MCP Server
This module tests all 7 vulnerability intelligence tools with proper pytest structure.
"""
import asyncio
import sys
from pathlib import Path
import pytest
# Add parent directory to path so we can import mcp_simple_tool
sys.path.append(str(Path(__file__).parent.parent))
from mcp_simple_tool.tools.cve_lookup import lookup_cve
from mcp_simple_tool.tools.cvss_calculator import calculate_cvss_score
from mcp_simple_tool.tools.epss_lookup import get_epss_score
from mcp_simple_tool.tools.exploit_availability import get_exploit_availability
from mcp_simple_tool.tools.vex_status import get_vex_status
from mcp_simple_tool.tools.vulnerability_search import search_vulnerabilities
from mcp_simple_tool.tools.vulnerability_timeline import (
get_vulnerability_timeline,
)
class TestVulnerabilityIntelligence:
"""Test suite for vulnerability intelligence tools"""
# Test CVE ID - using Log4Shell as it's well-documented
TEST_CVE = "CVE-2021-44228"
TEST_CVSS_VECTOR = "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H"
@pytest.mark.asyncio
async def test_cve_lookup(self):
"""Test CVE details lookup functionality"""
result = await lookup_cve(self.TEST_CVE)
assert len(result) > 0
assert result[0].type == "text"
assert self.TEST_CVE in result[0].text
assert "CVE Vulnerability Report" in result[0].text
print(f"✅ CVE Lookup: {self.TEST_CVE} found")
@pytest.mark.asyncio
async def test_epss_score(self):
"""Test EPSS score lookup functionality"""
result = await get_epss_score(self.TEST_CVE)
assert len(result) > 0
assert result[0].type == "text"
assert "EPSS Score" in result[0].text
assert "FIRST.org" in result[0].text
print(f"✅ EPSS Score: Retrieved for {self.TEST_CVE}")
@pytest.mark.asyncio
async def test_cvss_calculator(self):
"""Test CVSS score calculation functionality"""
result = await calculate_cvss_score(self.TEST_CVSS_VECTOR)
assert len(result) > 0
assert result[0].type == "text"
assert "Base Score" in result[0].text
assert "10.0" in result[0].text # Log4Shell should be 10.0
print("✅ CVSS Calculator: Calculated score for vector")
@pytest.mark.asyncio
async def test_vulnerability_search(self):
"""Test vulnerability search functionality"""
result = await search_vulnerabilities(
keywords="apache", severity="HIGH", date_range="90d"
)
assert len(result) > 0
assert result[0].type == "text"
assert "Search Results" in result[0].text
print("✅ Vulnerability Search: Found Apache vulnerabilities")
@pytest.mark.asyncio
async def test_exploit_availability(self):
"""Test exploit availability checking functionality"""
result = await get_exploit_availability(self.TEST_CVE)
assert len(result) > 0
assert result[0].type == "text"
assert "Exploit Availability Report" in result[0].text
assert "Risk Assessment" in result[0].text
print(f"✅ Exploit Availability: Checked for {self.TEST_CVE}")
@pytest.mark.asyncio
async def test_vulnerability_timeline(self):
"""Test vulnerability timeline functionality"""
result = await get_vulnerability_timeline(self.TEST_CVE)
assert len(result) > 0
assert result[0].type == "text"
assert "Timeline Report" in result[0].text
assert "Published" in result[0].text
print(f"✅ Vulnerability Timeline: Retrieved for {self.TEST_CVE}")
@pytest.mark.asyncio
async def test_vex_status(self):
"""Test VEX status checking functionality"""
result = await get_vex_status(self.TEST_CVE, "Apache Log4j")
assert len(result) > 0
assert result[0].type == "text"
assert "VEX Status Report" in result[0].text
assert "Apache Log4j" in result[0].text
print(f"✅ VEX Status: Checked for {self.TEST_CVE} on Apache Log4j")
async def run_all_tests():
"""Run all vulnerability intelligence tests manually"""
print("🔍 **Running Vulnerability Intelligence Test Suite**\n")
test_suite = TestVulnerabilityIntelligence()
tests = [
("CVE Lookup", test_suite.test_cve_lookup),
("EPSS Score", test_suite.test_epss_score),
("CVSS Calculator", test_suite.test_cvss_calculator),
("Vulnerability Search", test_suite.test_vulnerability_search),
("Exploit Availability", test_suite.test_exploit_availability),
("Vulnerability Timeline", test_suite.test_vulnerability_timeline),
("VEX Status", test_suite.test_vex_status),
]
passed = 0
failed = 0
for test_name, test_func in tests:
try:
print(f"🧪 Testing {test_name}...")
await test_func()
passed += 1
except Exception as e:
print(f"❌ {test_name}: FAILED - {e}")
failed += 1
print()
print(f"📊 **Test Results: {passed} passed, {failed} failed**")
if failed == 0:
print("🎉 **All vulnerability intelligence tools working perfectly!**")
return failed == 0
if __name__ == "__main__":
# Run tests directly
success = asyncio.run(run_all_tests())
sys.exit(0 if success else 1)