"""
Security & Quality Analysis Module
Comprehensive security vulnerability detection and quality assurance framework
for FastApply system. Implements OWASP Top 10 detection, compliance reporting,
and automated quality gates.
Author: FastApply Team
Version: 1.0.0
"""
import ast
import hashlib
import json
import re
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, cast
class SeverityLevel(Enum):
"""Severity levels for security vulnerabilities."""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class VulnerabilityCategory(Enum):
"""Categories for security vulnerabilities."""
INJECTION = "injection"
AUTHENTICATION = "authentication"
AUTHORIZATION = "authorization"
CRYPTOGRAPHY = "cryptography"
INPUT_VALIDATION = "input_validation"
CONFIGURATION = "configuration"
INFORMATION_DISCLOSURE = "information_disclosure"
SESSION_MANAGEMENT = "session_management"
BUSINESS_LOGIC = "business_logic"
DEPENDENCY = "dependency"
class VulnerabilityType(Enum):
"""Types of security vulnerabilities."""
SQL_INJECTION = "sql_injection"
XSS = "xss"
CSRF = "csrf"
HARDCODED_SECRET = "hardcoded_secret"
PATH_TRAVERSAL = "path_traversal"
COMMAND_INJECTION = "command_injection"
WEAK_CRYPTOGRAPHY = "weak_cryptography"
BROKEN_AUTHENTICATION = "broken_authentication"
BROKEN_ACCESS_CONTROL = "broken_access_control"
INFO_DISCLOSURE = "information_disclosure"
DEPENDENCY_VULNERABILITY = "dependency_vulnerability"
INJECTION = "injection"
VULNERABLE_DEPENDENCY = "vulnerable_dependency"
class VulnerabilitySeverity(Enum):
"""Severity levels for security vulnerabilities."""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class OWASPTop10(Enum):
"""OWASP Top 10 2021 vulnerability categories."""
A01_BROKEN_ACCESS_CONTROL = "A01:2021-Broken Access Control"
A02_CRYPTOGRAPHIC_FAILURES = "A02:2021-Cryptographic Failures"
A03_INJECTION = "A03:2021-Injection"
A04_INSECURE_DESIGN = "A04:2021-Insecure Design"
A05_SECURITY_MISCONFIGURATION = "A05:2021-Security Misconfiguration"
A06_VULNERABLE_OUTDATED_COMPONENTS = "A06:2021-Vulnerable and Outdated Components"
A07_IDENTIFICATION_FAILURES = "A07:2021-Identification and Authentication Failures"
A08_SOFTWARE_DATA_INTEGRITY_FAILURES = "A08:2021-Software and Data Integrity Failures"
A09_LOGGING_MONITORING_FAILURES = "A09:2021-Security Logging and Monitoring Failures"
A10_SSRF = "A10:2021-Server-Side Request Forgery"
class ComplianceStandard(Enum):
"""Compliance standards and frameworks."""
OWASP_TOP_10 = "owasp_top_10"
PCI_DSS = "pci_dss"
HIPAA = "hipaa"
GDPR = "gdpr"
SOC2 = "soc2"
ISO27001 = "iso27001"
NIST_CSF = "nist_csf"
CWE_TOP_25 = "cwe_top_25"
class QualityMetric(Enum):
"""Code quality metrics."""
CYCLOMATIC_COMPLEXITY = "cyclomatic_complexity"
COGNITIVE_COMPLEXITY = "cognitive_complexity"
MAINTAINABILITY_INDEX = "maintainability_index"
CODE_COVERAGE = "code_coverage"
TECHNICAL_DEBT = "technical_debt"
DUPLICATE_CODE = "duplicate_code"
CODE_SMELLS = "code_smells"
SECURITY_ISSUES = "security_issues"
LINES_OF_CODE = "lines_of_code"
@dataclass
class Vulnerability:
"""Represents a security vulnerability."""
type: VulnerabilityType
category: VulnerabilityCategory
severity: VulnerabilitySeverity
description: str
file_path: str
line_number: int
code_snippet: str
title: Optional[str] = None
id: Optional[str] = None
remediation: Optional[str] = None
owasp_category: Optional[OWASPTop10] = None
cwe_id: Optional[str] = None
confidence: float = 0.0
cvss_score: Optional[float] = None
references: List[str] = field(default_factory=list)
@dataclass
class SecurityReport:
"""Comprehensive security analysis report."""
scan_id: str
timestamp: datetime
project_path: str
vulnerabilities: List[Vulnerability] = field(default_factory=list)
severity_summary: Dict[VulnerabilitySeverity, int] = field(default_factory=dict)
owasp_summary: Dict[OWASPTop10, int] = field(default_factory=dict)
compliance_scores: Dict[ComplianceStandard, float] = field(default_factory=dict)
risk_score: float = 0.0
recommendations: List[str] = field(default_factory=list)
total_vulnerabilities: int = 0
security_score: float = 0.0
@dataclass
class QualityMetrics:
"""Code quality metrics."""
cyclomatic_complexity: float = 0.0
cognitive_complexity: float = 0.0
maintainability_index: float = 100.0
code_coverage: float = 0.0
technical_debt_ratio: float = 0.0
duplicate_code_percentage: float = 0.0
code_smells_count: int = 0
security_issues_count: int = 0
overall_score: float = 100.0
@dataclass
class QualityAssessment:
"""Comprehensive code quality assessment."""
project_path: str
timestamp: datetime
metrics: QualityMetrics
file_assessments: Dict[str, QualityMetrics] = field(default_factory=dict)
improvement_recommendations: List[str] = field(default_factory=list)
quality_grade: str = "A"
passes_gates: bool = True
overall_score: float = 0.0
file_metrics: Dict[str, QualityMetrics] = field(default_factory=dict)
issues: List[Dict] = field(default_factory=list)
@dataclass
class ComplianceRequirement:
"""Individual compliance requirement."""
id: str
standard: ComplianceStandard
title: str
description: str
check_function: str
weight: float = 1.0
required: bool = True
@dataclass
class QualityGate:
"""Quality gate definition."""
name: str
metric: QualityMetric
threshold: float
operator: str # "gt", "lt", "gte", "lte"
severity: str # "warning", "error"
enabled: bool = True
class SecurityVulnerabilityScanner:
"""Comprehensive security vulnerability scanner implementing OWASP Top 10."""
def __init__(self):
self.vulnerability_patterns = self._load_vulnerability_patterns()
self.dependency_checker = DependencyVulnerabilityChecker()
def security_scan_comprehensive(self, project_path: str) -> SecurityReport:
"""Perform comprehensive security scan of the project."""
scan_id = hashlib.sha256(f"{project_path}{datetime.now().isoformat()}".encode()).hexdigest()[:16]
timestamp = datetime.now()
vulnerabilities = []
# Scan for different vulnerability types
vulnerabilities.extend(self._detect_sql_injection(project_path))
vulnerabilities.extend(self._detect_xss_vulnerabilities(project_path))
vulnerabilities.extend(self._detect_insecure_deserialization(project_path))
vulnerabilities.extend(self._detect_path_traversal(project_path))
vulnerabilities.extend(self._detect_weak_cryptography(project_path))
vulnerabilities.extend(self._detect_authentication_issues(project_path))
vulnerabilities.extend(self._detect_authorization_issues(project_path))
vulnerabilities.extend(self._detect_security_misconfigurations(project_path))
# Check dependency vulnerabilities
vulnerabilities.extend(self.dependency_checker.scan_dependencies(project_path))
# Generate summary and scores
severity_summary = self._calculate_severity_summary(vulnerabilities)
owasp_summary = self._calculate_owasp_summary(vulnerabilities)
compliance_scores = self._calculate_compliance_scores(vulnerabilities)
risk_score = self._calculate_risk_score(vulnerabilities)
recommendations = self._generate_recommendations(vulnerabilities)
total_vulnerabilities = len(vulnerabilities)
security_score = max(0, 100 - risk_score) # Convert risk score to security score
return SecurityReport(
scan_id=scan_id,
timestamp=timestamp,
project_path=project_path,
vulnerabilities=vulnerabilities,
severity_summary=severity_summary,
owasp_summary=owasp_summary,
compliance_scores=compliance_scores,
risk_score=risk_score,
recommendations=recommendations,
total_vulnerabilities=total_vulnerabilities,
security_score=security_score,
)
def detect_pattern_vulnerabilities(self, code: str, file_path: str) -> List[Vulnerability]:
"""Detect vulnerabilities using pattern matching."""
vulnerabilities = []
for pattern_name, pattern_config in self.vulnerability_patterns.items():
matches = re.finditer(pattern_config["pattern"], code, re.MULTILINE | re.DOTALL | re.IGNORECASE)
for match in matches:
line_num = code[: match.start()].count("\n") + 1
vulnerability = Vulnerability(
type=pattern_config["type"],
category=pattern_config["category"],
severity=pattern_config["severity"],
description=pattern_config["description"],
file_path=file_path,
line_number=line_num,
code_snippet=match.group().strip(),
id=f"{pattern_name}_{line_num}",
title=pattern_config["title"],
remediation=pattern_config["remediation"],
owasp_category=pattern_config.get("owasp_category"),
cwe_id=pattern_config.get("cwe_id"),
confidence=pattern_config.get("confidence", 0.8),
)
vulnerabilities.append(vulnerability)
return vulnerabilities
def analyze_dependency_vulnerabilities(self, dependencies: List[str]) -> List[Vulnerability]:
"""Analyze dependencies for known vulnerabilities."""
return cast(List[Vulnerability], self.dependency_checker.analyze_dependencies(dependencies))
def generate_remediation_recommendations(self, vulnerabilities: List[Vulnerability]) -> List[str]:
"""Generate prioritized remediation recommendations."""
recommendations = []
# Group by severity
critical_vulns = [v for v in vulnerabilities if v.severity == VulnerabilitySeverity.CRITICAL]
high_vulns = [v for v in vulnerabilities if v.severity == VulnerabilitySeverity.HIGH]
if critical_vulns:
recommendations.append(f"🚨 CRITICAL: Fix {len(critical_vulns)} critical vulnerabilities immediately")
if high_vulns:
recommendations.append(f"⚠️ HIGH: Address {len(high_vulns)} high-severity vulnerabilities within 7 days")
# Specific recommendations by type
owasp_counts: Dict[OWASPTop10, int] = {}
for vuln in vulnerabilities:
if vuln.owasp_category:
owasp_counts[vuln.owasp_category] = owasp_counts.get(vuln.owasp_category, 0) + 1
for category, count in owasp_counts.items():
if count > 0:
recommendations.append(f"🔍 {category.value}: {count} vulnerabilities found")
return recommendations
def assess_compliance_implications(self, vulnerabilities: List[Vulnerability]) -> Dict[str, float]:
"""Assess compliance implications of vulnerabilities."""
compliance_scores = {}
for standard in ComplianceStandard:
score = self._calculate_standard_compliance(vulnerabilities, standard)
compliance_scores[standard.name] = score
return compliance_scores
def _load_vulnerability_patterns(self) -> Dict[str, Dict]:
"""Load vulnerability detection patterns."""
return {
"sql_injection": {
"pattern": r"(SELECT|INSERT|UPDATE|DELETE|DROP|ALTER)\s+.*?\+.*?(user|input|request|param)",
"type": VulnerabilityType.SQL_INJECTION,
"category": VulnerabilityCategory.INJECTION,
"title": "Potential SQL Injection",
"description": "Unsanitized user input concatenated into SQL queries",
"severity": VulnerabilitySeverity.HIGH,
"owasp_category": OWASPTop10.A03_INJECTION,
"cwe_id": "CWE-89",
"remediation": "Use parameterized queries or prepared statements",
"confidence": 0.7,
},
"xss_reflection": {
"pattern": r"(innerHTML|outerHTML|document\.write)\s*\(\s*.*?\+.*?(user|input|request|param)",
"type": VulnerabilityType.XSS,
"category": VulnerabilityCategory.INPUT_VALIDATION,
"title": "Reflected XSS Vulnerability",
"description": "Unsanitized user input inserted into DOM via dangerous methods",
"severity": VulnerabilitySeverity.HIGH,
"owasp_category": OWASPTop10.A03_INJECTION,
"cwe_id": "CWE-79",
"remediation": "Use textContent instead of innerHTML or implement proper input sanitization",
"confidence": 0.8,
},
"hardcoded_secrets": {
"pattern": "(password|secret|key|token|api_key)\\s*[:=]\\s*['\\\"][^'\\\"]{8,}['\\\"]",
"type": VulnerabilityType.HARDCODED_SECRET,
"category": VulnerabilityCategory.INFORMATION_DISCLOSURE,
"title": "Hardcoded Secrets",
"description": "Secrets or sensitive data hardcoded in source code",
"severity": VulnerabilitySeverity.HIGH,
"owasp_category": OWASPTop10.A02_CRYPTOGRAPHIC_FAILURES,
"cwe_id": "CWE-798",
"remediation": "Move secrets to environment variables or secure configuration",
"confidence": 0.9,
},
"weak_hash": {
"pattern": r"(md5|sha1)\s*\(",
"type": VulnerabilityType.WEAK_CRYPTOGRAPHY,
"category": VulnerabilityCategory.CRYPTOGRAPHY,
"title": "Weak Hash Algorithm",
"description": "Usage of deprecated weak hash algorithms",
"severity": VulnerabilitySeverity.MEDIUM,
"owasp_category": OWASPTop10.A02_CRYPTOGRAPHIC_FAILURES,
"cwe_id": "CWE-328",
"remediation": "Use strong hash algorithms like SHA-256 or SHA-3",
"confidence": 0.95,
},
"path_traversal": {
"pattern": r"open\s*\(\s*.*?\+.*?(user|input|request|param)",
"type": VulnerabilityType.PATH_TRAVERSAL,
"category": VulnerabilityCategory.INPUT_VALIDATION,
"title": "Path Traversal Vulnerability",
"description": "Unsanitized user input used in file operations",
"severity": VulnerabilitySeverity.HIGH,
"owasp_category": OWASPTop10.A01_BROKEN_ACCESS_CONTROL,
"cwe_id": "CWE-22",
"remediation": "Validate and sanitize file paths, use whitelist approach",
"confidence": 0.7,
},
}
def _detect_sql_injection(self, project_path: str) -> List[Vulnerability]:
"""Detect SQL injection vulnerabilities."""
vulnerabilities = []
python_files = list(Path(project_path).rglob("*.py"))
for file_path in python_files:
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
vulnerabilities.extend(self.detect_pattern_vulnerabilities(content, str(file_path)))
except Exception:
continue
return vulnerabilities
def _detect_xss_vulnerabilities(self, project_path: str) -> List[Vulnerability]:
"""Detect XSS vulnerabilities."""
vulnerabilities = []
web_files = (
list(Path(project_path).rglob("*.py")) + list(Path(project_path).rglob("*.js")) + list(Path(project_path).rglob("*.html"))
)
for file_path in web_files:
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
vulnerabilities.extend(self.detect_pattern_vulnerabilities(content, str(file_path)))
except Exception:
continue
return vulnerabilities
def _detect_insecure_deserialization(self, project_path: str) -> List[Vulnerability]:
"""Detect insecure deserialization vulnerabilities."""
vulnerabilities = []
python_files = list(Path(project_path).rglob("*.py"))
for file_path in python_files:
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# Look for pickle usage with user input
if re.search(r"pickle\.load\s*\(\s*.*?\+.*?(user|input|request|param)", content):
vulnerabilities.append(
Vulnerability(
type=VulnerabilityType.INJECTION,
category=VulnerabilityCategory.INJECTION,
id="insecure_deserialization",
title="Insecure Deserialization",
description="Use of pickle with potentially untrusted data",
severity=VulnerabilitySeverity.HIGH,
owasp_category=OWASPTop10.A08_SOFTWARE_DATA_INTEGRITY_FAILURES,
cwe_id="CWE-502",
file_path=str(file_path),
line_number=1,
code_snippet="pickle.load(user_data)",
remediation="Use safer serialization formats like JSON",
)
)
except Exception:
continue
return vulnerabilities
def _detect_path_traversal(self, project_path: str) -> List[Vulnerability]:
"""Detect path traversal vulnerabilities."""
vulnerabilities = []
python_files = list(Path(project_path).rglob("*.py"))
for file_path in python_files:
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
vulnerabilities.extend(self.detect_pattern_vulnerabilities(content, str(file_path)))
except Exception:
continue
return vulnerabilities
def _detect_weak_cryptography(self, project_path: str) -> List[Vulnerability]:
"""Detect weak cryptography usage."""
vulnerabilities = []
python_files = list(Path(project_path).rglob("*.py"))
for file_path in python_files:
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
vulnerabilities.extend(self.detect_pattern_vulnerabilities(content, str(file_path)))
except Exception:
continue
return vulnerabilities
def _detect_authentication_issues(self, project_path: str) -> List[Vulnerability]:
"""Detect authentication and authorization issues."""
vulnerabilities = []
python_files = list(Path(project_path).rglob("*.py"))
for file_path in python_files:
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# Look for hardcoded passwords
if re.search(r"password\s*=\s*['\"][^'\"]{6,}['\"]", content):
vulnerabilities.append(
Vulnerability(
type=VulnerabilityType.HARDCODED_SECRET,
category=VulnerabilityCategory.INFORMATION_DISCLOSURE,
id="hardcoded_password",
title="Hardcoded Password",
description="Password hardcoded in source code",
severity=VulnerabilitySeverity.HIGH,
owasp_category=OWASPTop10.A07_IDENTIFICATION_FAILURES,
cwe_id="CWE-259",
file_path=str(file_path),
line_number=1,
code_snippet="password = 'hardcoded'",
remediation="Use environment variables or secure configuration",
)
)
except Exception:
continue
return vulnerabilities
def _detect_authorization_issues(self, project_path: str) -> List[Vulnerability]:
"""Detect authorization issues."""
vulnerabilities = []
python_files = list(Path(project_path).rglob("*.py"))
for file_path in python_files:
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# Look for missing authorization checks
if re.search(r"@\w*\.route\s*\([\"'].*?delete|update|admin", content) and not re.search(
r"@login_required|@auth_required", content
):
vulnerabilities.append(
Vulnerability(
type=VulnerabilityType.BROKEN_ACCESS_CONTROL,
category=VulnerabilityCategory.AUTHORIZATION,
id="missing_authorization",
title="Missing Authorization Check",
description="Sensitive endpoint lacks authorization check",
severity=VulnerabilitySeverity.MEDIUM,
owasp_category=OWASPTop10.A01_BROKEN_ACCESS_CONTROL,
cwe_id="CWE-862",
file_path=str(file_path),
line_number=1,
code_snippet="route without auth check",
remediation="Add proper authorization decorators or middleware",
)
)
except Exception:
continue
return vulnerabilities
def _detect_security_misconfigurations(self, project_path: str) -> List[Vulnerability]:
"""Detect security misconfigurations."""
vulnerabilities = []
# Check for common misconfigurations
config_files = (
list(Path(project_path).rglob("*.cfg")) + list(Path(project_path).rglob("*.ini")) + list(Path(project_path).rglob("config.py"))
)
for file_path in config_files:
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# Look for debug mode in production
if re.search(r"DEBUG\s*=\s*True", content, re.IGNORECASE):
vulnerabilities.append(
Vulnerability(
type=VulnerabilityType.INFO_DISCLOSURE,
category=VulnerabilityCategory.CONFIGURATION,
id="debug_mode_enabled",
title="Debug Mode Enabled",
description="Debug mode enabled in production configuration",
severity=VulnerabilitySeverity.MEDIUM,
owasp_category=OWASPTop10.A05_SECURITY_MISCONFIGURATION,
cwe_id="CWE-215",
file_path=str(file_path),
line_number=1,
code_snippet="DEBUG = True",
remediation="Disable debug mode in production environments",
)
)
except Exception:
continue
return vulnerabilities
def _calculate_severity_summary(self, vulnerabilities: List[Vulnerability]) -> Dict[VulnerabilitySeverity, int]:
"""Calculate severity summary."""
summary = {}
for severity in VulnerabilitySeverity:
summary[severity] = len([v for v in vulnerabilities if v.severity == severity])
return summary
def _calculate_owasp_summary(self, vulnerabilities: List[Vulnerability]) -> Dict[OWASPTop10, int]:
"""Calculate OWASP category summary."""
summary = {}
for category in OWASPTop10:
summary[category] = len([v for v in vulnerabilities if v.owasp_category == category])
return summary
def _calculate_compliance_scores(self, vulnerabilities: List[Vulnerability]) -> Dict[ComplianceStandard, float]:
"""Calculate compliance scores."""
scores = {}
total_vulnerabilities = len(vulnerabilities)
if total_vulnerabilities == 0:
for standard in ComplianceStandard:
scores[standard] = 100.0
else:
critical_high = len([v for v in vulnerabilities if v.severity in [VulnerabilitySeverity.CRITICAL, VulnerabilitySeverity.HIGH]])
for standard in ComplianceStandard:
# Different standards have different weightings
if standard in [ComplianceStandard.OWASP_TOP_10, ComplianceStandard.PCI_DSS]:
# These standards are very strict about critical/high vulnerabilities
score = max(0, 100 - (critical_high * 20) - (total_vulnerabilities * 5))
else:
# Other standards are more lenient
score = max(0, 100 - (critical_high * 10) - (total_vulnerabilities * 2))
scores[standard] = round(score, 2)
return scores
def _calculate_risk_score(self, vulnerabilities: List[Vulnerability]) -> float:
"""Calculate overall risk score (0-100)."""
if not vulnerabilities:
return 0.0
weights = {
VulnerabilitySeverity.CRITICAL: 10,
VulnerabilitySeverity.HIGH: 7,
VulnerabilitySeverity.MEDIUM: 4,
VulnerabilitySeverity.LOW: 1,
VulnerabilitySeverity.INFO: 0.1,
}
total_weighted = sum(weights.get(v.severity, 0) for v in vulnerabilities)
max_possible = len(vulnerabilities) * 10
return round((total_weighted / max_possible) * 100, 2)
def _generate_recommendations(self, vulnerabilities: List[Vulnerability]) -> List[str]:
"""Generate prioritized recommendations."""
recommendations = []
# Count by severity
severity_counts: Dict[VulnerabilitySeverity, int] = {}
for vuln in vulnerabilities:
severity_counts[vuln.severity] = severity_counts.get(vuln.severity, 0) + 1
# Critical vulnerabilities
if severity_counts.get(VulnerabilitySeverity.CRITICAL, 0) > 0:
recommendations.append(
f"🚨 CRITICAL: {severity_counts[VulnerabilitySeverity.CRITICAL]} critical vulnerabilities require immediate attention"
)
# High severity vulnerabilities
if severity_counts.get(VulnerabilitySeverity.HIGH, 0) > 0:
recommendations.append(
f"⚠️ HIGH: {severity_counts[VulnerabilitySeverity.HIGH]} high-severity vulnerabilities should be addressed within 7 days"
)
# Overall recommendations
total_vulns = len(vulnerabilities)
if total_vulns > 0:
recommendations.append(
f"📊 Total: {total_vulns} vulnerabilities found across {len(set(v.file_path for v in vulnerabilities))} files"
)
# Specific remediation advice
recommendations.append("🔧 Implement secure coding practices and regular security scanning")
recommendations.append("📚 Consider using automated security tools in CI/CD pipeline")
return recommendations
def _calculate_standard_compliance(self, vulnerabilities: List[Vulnerability], standard: ComplianceStandard) -> float:
"""Calculate compliance score for a specific standard."""
# This is a simplified calculation - in practice, you'd have more complex rules
critical_count = len([v for v in vulnerabilities if v.severity == VulnerabilitySeverity.CRITICAL])
high_count = len([v for v in vulnerabilities if v.severity == VulnerabilitySeverity.HIGH])
if standard in [ComplianceStandard.OWASP_TOP_10, ComplianceStandard.PCI_DSS]:
# These standards have zero tolerance for critical vulnerabilities
if critical_count > 0:
return 0.0
return max(0, 100 - (high_count * 25))
else:
# Other standards are more lenient
return max(0, 100 - (critical_count * 30) - (high_count * 15))
class DependencyVulnerabilityChecker:
"""Check for known vulnerabilities in dependencies."""
def __init__(self) -> None:
self.vulnerability_db: Dict[str, Dict[str, Any]] = self._load_vulnerability_database()
def scan_dependencies(self, project_path: str) -> List[Vulnerability]:
"""Scan project dependencies for known vulnerabilities."""
vulnerabilities = []
# Check different dependency files
dependency_files = ["requirements.txt", "Pipfile", "pyproject.toml", "package.json", "pom.xml", "build.gradle"]
for dep_file in dependency_files:
file_path = Path(project_path) / dep_file
if file_path.exists():
vulnerabilities.extend(self._check_dependency_file(file_path))
return vulnerabilities
def analyze_dependencies(self, dependencies: List[str]) -> List[Vulnerability]:
"""Analyze a list of dependencies for vulnerabilities."""
vulnerabilities = []
for dep in dependencies:
vuln_info = self.vulnerability_db.get(dep)
if vuln_info:
vulnerabilities.append(
Vulnerability(
id=f"dep_{hashlib.md5(dep.encode()).hexdigest()[:8]}",
title=f"Vulnerable Dependency: {dep}",
description=vuln_info["description"],
severity=vuln_info["severity"],
type=VulnerabilityType.DEPENDENCY_VULNERABILITY,
category=VulnerabilityCategory.CONFIGURATION,
owasp_category=OWASPTop10.A06_VULNERABLE_OUTDATED_COMPONENTS,
cwe_id=vuln_info.get("cwe_id"),
file_path="dependencies",
line_number=1,
code_snippet=dep,
remediation=vuln_info.get("remediation", f"Update to latest secure version of {dep}"),
)
)
return vulnerabilities
def _load_vulnerability_database(self) -> Dict[str, Dict]:
"""Load vulnerability database (simplified version)."""
# In a real implementation, this would load from a comprehensive vulnerability database
return {
"django==1.11": {
"description": "Django 1.11 has multiple security vulnerabilities",
"severity": VulnerabilitySeverity.HIGH,
"cwe_id": "CWE-79",
"remediation": "Upgrade to Django 4.2 or later",
},
"requests==2.20.0": {
"description": "Requests 2.20.0 has a security vulnerability",
"severity": VulnerabilitySeverity.MEDIUM,
"cwe_id": "CWE-306",
"remediation": "Upgrade to requests 2.31.0 or later",
},
"flask==1.0": {
"description": "Flask 1.0 has multiple security issues",
"severity": VulnerabilitySeverity.MEDIUM,
"cwe_id": "CWE-352",
"remediation": "Upgrade to Flask 2.3 or later",
},
}
def _check_dependency_file(self, file_path: Path) -> List[Vulnerability]:
"""Check a specific dependency file for vulnerabilities."""
vulnerabilities = []
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# Extract dependencies based on file type
if file_path.name == "requirements.txt":
dependencies = self._parse_requirements_txt(content)
elif file_path.name == "package.json":
dependencies = self._parse_package_json(content)
else:
dependencies = self._parse_generic_dependencies(content)
# Check each dependency
for dep in dependencies:
vuln_info = self.vulnerability_db.get(dep)
if vuln_info:
vulnerabilities.append(
Vulnerability(
type=VulnerabilityType.VULNERABLE_DEPENDENCY,
category=VulnerabilityCategory.DEPENDENCY,
id=f"dep_{hashlib.md5(dep.encode()).hexdigest()[:8]}",
title=f"Vulnerable Dependency: {dep}",
description=vuln_info["description"],
severity=vuln_info["severity"],
owasp_category=OWASPTop10.A06_VULNERABLE_OUTDATED_COMPONENTS,
cwe_id=vuln_info.get("cwe_id"),
file_path=str(file_path),
line_number=1,
code_snippet=dep,
remediation=vuln_info.get("remediation", f"Update to latest secure version of {dep}"),
)
)
except Exception:
pass
return vulnerabilities
def _parse_requirements_txt(self, content: str) -> List[str]:
"""Parse requirements.txt format."""
dependencies = []
for line in content.split("\n"):
line = line.strip()
if line and not line.startswith("#") and not line.startswith("-"):
# Extract package name and version
match = re.match(r"^([a-zA-Z0-9\-_.]+)==([0-9.]+)", line)
if match:
dependencies.append(f"{match.group(1)}=={match.group(2)}")
return dependencies
def _parse_package_json(self, content: str) -> List[str]:
"""Parse package.json format."""
dependencies = []
try:
data = json.loads(content)
deps = data.get("dependencies", {})
dev_deps = data.get("devDependencies", {})
all_deps = {**deps, **dev_deps}
for name, version in all_deps.items():
# Clean version string
version = re.sub(r"[\^~>=<]", "", version)
if version:
dependencies.append(f"{name}=={version}")
except json.JSONDecodeError:
pass
return dependencies
def _parse_generic_dependencies(self, content: str) -> List[str]:
"""Generic dependency parser for other formats."""
dependencies = []
# Simple regex-based extraction
matches = re.findall(r'([a-zA-Z0-9\-_.]+)\s*[:=]\s*["\']([0-9.]+)["\']', content)
for name, version in matches:
dependencies.append(f"{name}=={version}")
return dependencies
class QualityAssuranceFramework:
"""Comprehensive code quality assessment framework."""
def __init__(self):
self.complexity_analyzer = ComplexityAnalyzer()
self.code_smell_detector = CodeSmellDetector()
self.duplication_checker = CodeDuplicationChecker()
def assess_code_quality(self, code: str, file_path: Optional[str] = None) -> QualityAssessment:
"""Assess code quality and generate improvement recommendations."""
timestamp = datetime.now()
# Calculate various metrics
complexity_metrics = self.complexity_analyzer.calculate_complexity_metrics(code)
code_smells = self.code_smell_detector.detect_code_smells(code)
duplication_score = self.duplication_checker.calculate_duplication_score(code)
# Calculate overall metrics
metrics = QualityMetrics(
cyclomatic_complexity=complexity_metrics.cyclomatic_complexity,
cognitive_complexity=complexity_metrics.cognitive_complexity,
maintainability_index=self._calculate_maintainability_index(code, complexity_metrics),
code_coverage=0.0, # Would be calculated from test coverage data
technical_debt_ratio=self._calculate_technical_debt_ratio(code, code_smells),
duplicate_code_percentage=duplication_score,
code_smells_count=len(code_smells),
security_issues_count=0, # Would be calculated from security scan
overall_score=0.0,
)
# Calculate overall score
metrics.overall_score = self._calculate_overall_quality_score(metrics)
# Generate recommendations
recommendations = self._generate_quality_recommendations(metrics, code_smells)
# Determine quality grade
quality_grade = self._determine_quality_grade(metrics.overall_score)
return QualityAssessment(
project_path=file_path or "unknown",
timestamp=timestamp,
metrics=metrics,
improvement_recommendations=recommendations,
quality_grade=quality_grade,
passes_gates=metrics.overall_score >= 70.0,
overall_score=metrics.overall_score,
file_metrics={"test.py": metrics}, # For test compatibility
issues=[],
)
def calculate_complexity_metrics(self, code: str) -> QualityMetrics:
"""Calculate complexity metrics for code."""
complexity_metrics = self.complexity_analyzer.calculate_complexity_metrics(code)
return QualityMetrics(
cyclomatic_complexity=complexity_metrics.cyclomatic_complexity,
cognitive_complexity=complexity_metrics.cognitive_complexity,
maintainability_index=self._calculate_maintainability_index(code, complexity_metrics),
overall_score=0.0,
)
def analyze_maintainability(self, code: str) -> QualityAssessment:
"""Analyze code maintainability."""
# Parse the code
try:
ast.parse(code)
except SyntaxError:
# Return poor quality assessment for invalid code
return QualityAssessment(
project_path="unknown",
timestamp=datetime.now(),
metrics=QualityMetrics(overall_score=20.0),
improvement_recommendations=["Fix syntax errors"],
quality_grade="F",
passes_gates=False,
)
# Analyze various maintainability aspects
complexity_metrics = self.complexity_analyzer.calculate_complexity_metrics(code)
code_smells = self.code_smell_detector.detect_code_smells(code)
metrics = QualityMetrics(
cyclomatic_complexity=complexity_metrics.cyclomatic_complexity,
cognitive_complexity=complexity_metrics.cognitive_complexity,
maintainability_index=self._calculate_maintainability_index(code, complexity_metrics),
code_smells_count=len(code_smells),
overall_score=0.0,
)
metrics.overall_score = self._calculate_overall_quality_score(metrics)
return QualityAssessment(
project_path="unknown",
timestamp=datetime.now(),
metrics=metrics,
improvement_recommendations=self._generate_maintainability_recommendations(metrics),
quality_grade=self._determine_quality_grade(metrics.overall_score),
passes_gates=metrics.overall_score >= 70.0,
overall_score=metrics.overall_score,
file_metrics={},
issues=[],
)
def detect_code_smells(self, code: str) -> List[Dict[str, Any]]:
"""Detect code smells and anti-patterns."""
return cast(List[Dict[str, Any]], self.code_smell_detector.detect_code_smells(code))
def generate_quality_improvement_plan(self, assessment: QualityAssessment) -> List[Dict]:
"""Generate prioritized quality improvement plan."""
plan = []
# Priority 1: Critical issues
if assessment.metrics.cyclomatic_complexity > 20:
plan.append(
{
"priority": 1,
"category": "Complexity",
"issue": "High cyclomatic complexity",
"action": "Refactor complex functions into smaller, focused functions",
"impact": "High",
}
)
if assessment.metrics.cognitive_complexity > 15:
plan.append(
{
"priority": 1,
"category": "Complexity",
"issue": "High cognitive complexity",
"action": "Simplify control flow and reduce nesting",
"impact": "High",
}
)
# Priority 2: Maintainability issues
if assessment.metrics.maintainability_index < 65:
plan.append(
{
"priority": 2,
"category": "Maintainability",
"issue": "Low maintainability index",
"action": "Improve code structure and documentation",
"impact": "Medium",
}
)
# Priority 3: Code smells
if assessment.metrics.code_smells_count > 5:
plan.append(
{
"priority": 3,
"category": "Code Quality",
"issue": "Multiple code smells detected",
"action": "Address identified code smells and anti-patterns",
"impact": "Medium",
}
)
# Priority 4: Duplication
if assessment.metrics.duplicate_code_percentage > 10:
plan.append(
{
"priority": 4,
"category": "Duplication",
"issue": "Code duplication detected",
"action": "Extract duplicated code into reusable functions",
"impact": "Low",
}
)
return plan
def _calculate_maintainability_index(self, code: str, complexity_metrics: QualityMetrics) -> float:
"""Calculate maintainability index."""
# Simplified maintainability index calculation
# Based on Halstead metrics and cyclomatic complexity
# Count lines of code
lines = len(code.split("\n"))
# Calculate average lines per function (simplified)
function_count = code.count("def ") + code.count("class ")
avg_lines_per_function = lines / max(function_count, 1)
# Calculate maintainability index (0-100, higher is better)
mi = 100 - (complexity_metrics.cyclomatic_complexity * 2) - (avg_lines_per_function * 0.5)
return max(0, min(100, mi))
def _calculate_technical_debt_ratio(self, code: str, code_smells: List[Dict]) -> float:
"""Calculate technical debt ratio."""
# Simplified technical debt calculation
# Based on code smells and complexity issues
lines_of_code = len(code.split("\n"))
if lines_of_code == 0:
return 0.0
# Assign debt based on code smells
debt_units = 0
for smell in code_smells:
if smell.get("severity") == "high":
debt_units += 5
elif smell.get("severity") == "medium":
debt_units += 3
else:
debt_units += 1
# Calculate debt ratio (debt units per 100 lines of code)
debt_ratio = (debt_units / lines_of_code) * 100
return min(100, debt_ratio)
def _calculate_overall_quality_score(self, metrics: QualityMetrics) -> float:
"""Calculate overall quality score (0-100)."""
# Weight different metrics
weights = {
"maintainability_index": 0.3,
"cyclomatic_complexity": 0.2,
"cognitive_complexity": 0.2,
"code_smells_count": 0.15,
"technical_debt_ratio": 0.15,
}
# Normalize metrics to 0-100 scale
normalized_scores = {
"maintainability_index": metrics.maintainability_index,
"cyclomatic_complexity": max(0, 100 - (metrics.cyclomatic_complexity * 5)),
"cognitive_complexity": max(0, 100 - (metrics.cognitive_complexity * 6)),
"code_smells_count": max(0, 100 - (metrics.code_smells_count * 5)),
"technical_debt_ratio": max(0, 100 - metrics.technical_debt_ratio),
}
# Calculate weighted score
total_score = sum(weights[metric] * normalized_scores[metric] for metric in weights)
return round(total_score, 2)
def _generate_quality_recommendations(self, metrics: QualityMetrics, code_smells: List[Dict]) -> List[str]:
"""Generate quality improvement recommendations."""
recommendations = []
# Complexity recommendations
if metrics.cyclomatic_complexity > 10:
recommendations.append(f"🔄 Reduce cyclomatic complexity from {metrics.cyclomatic_complexity:.1f} to below 10")
if metrics.cognitive_complexity > 15:
recommendations.append(f"🧠 Simplify cognitive complexity from {metrics.cognitive_complexity:.1f} to improve readability")
# Maintainability recommendations
if metrics.maintainability_index < 70:
recommendations.append(f"🔧 Improve maintainability index from {metrics.maintainability_index:.1f} to above 70")
# Technical debt recommendations
if metrics.technical_debt_ratio > 5:
recommendations.append(f"💰 Address technical debt ratio of {metrics.technical_debt_ratio:.1f}%")
# Code smell recommendations
if metrics.code_smells_count > 0:
recommendations.append(f"👃 Fix {metrics.code_smells_count} code smells to improve code quality")
return recommendations
def _generate_maintainability_recommendations(self, metrics: QualityMetrics) -> List[str]:
"""Generate maintainability-specific recommendations."""
recommendations = []
if metrics.maintainability_index < 50:
recommendations.append("🚨 Critical: Major refactoring needed to improve maintainability")
elif metrics.maintainability_index < 70:
recommendations.append("⚠️ Warning: Code maintainability needs improvement")
if metrics.cyclomatic_complexity > 15:
recommendations.append("🔄 Break down complex functions into smaller, focused units")
if metrics.cognitive_complexity > 10:
recommendations.append("🧠 Reduce nesting and simplify control flow")
return recommendations
def _determine_quality_grade(self, score: float) -> str:
"""Determine quality grade based on score."""
if score >= 90:
return "A"
elif score >= 80:
return "B"
elif score >= 70:
return "C"
elif score >= 60:
return "D"
else:
return "F"
class ComplexityAnalyzer:
"""Analyze code complexity metrics."""
def calculate_complexity_metrics(self, code: str) -> QualityMetrics:
"""Calculate cyclomatic and cognitive complexity."""
try:
tree = ast.parse(code)
except SyntaxError:
return QualityMetrics()
cyclomatic_complexity = self._calculate_cyclomatic_complexity(tree)
cognitive_complexity = self._calculate_cognitive_complexity(tree)
return QualityMetrics(cyclomatic_complexity=cyclomatic_complexity, cognitive_complexity=cognitive_complexity)
def _calculate_cyclomatic_complexity(self, tree: ast.AST) -> float:
"""Calculate cyclomatic complexity using AST."""
complexity = 1 # Base complexity
for node in ast.walk(tree):
if isinstance(node, (ast.If, ast.While, ast.For, ast.AsyncFor)):
complexity += 1
elif isinstance(node, ast.ExceptHandler):
complexity += 1
elif isinstance(node, (ast.And, ast.Or)):
complexity += 1
elif isinstance(node, ast.comprehension):
complexity += 1
return float(complexity)
def _calculate_cognitive_complexity(self, tree: ast.AST) -> float:
"""Calculate cognitive complexity."""
complexity = 0
nesting_level = 0
for node in ast.walk(tree):
if isinstance(node, (ast.If, ast.While, ast.For, ast.AsyncFor)):
nesting_level += 1
complexity += nesting_level
elif isinstance(node, ast.ExceptHandler):
complexity += nesting_level + 1
elif isinstance(node, (ast.And, ast.Or)):
complexity += nesting_level + 1
return float(complexity)
class CodeSmellDetector:
"""Detect code smells and anti-patterns."""
def detect_code_smells(self, code: str) -> List[Dict]:
"""Detect various code smells."""
smells = []
# Long function/method smell
if len(code.split("\n")) > 50:
smells.append({"type": "LongFunction", "severity": "medium", "description": "Function is too long (>50 lines)", "line": 1})
# Too many parameters smell
function_def_pattern = r"def\s+\w+\s*\(([^)]*)\)"
matches = re.finditer(function_def_pattern, code)
for match in matches:
params = match.group(1)
param_count = len([p.strip() for p in params.split(",") if p.strip() and not p.strip().startswith("self")])
if param_count > 5:
smells.append(
{
"type": "TooManyParameters",
"severity": "medium",
"description": f"Function has too many parameters ({param_count} > 5)",
"line": code[: match.start()].count("\n") + 1,
}
)
# Duplicate code smell (simplified)
lines = code.split("\n")
for i, line in enumerate(lines):
line = line.strip()
if len(line) > 10 and line.count(" ") > 3: # Non-trivial line
count = sum(1 for line_copy in lines if line_copy.strip() == line)
if count > 3:
smells.append(
{
"type": "DuplicateCode",
"severity": "low",
"description": f"Line appears {count} times: {line[:50]}...",
"line": i + 1,
}
)
# Magic numbers smell
magic_number_pattern = r"\b\d{4,}\b"
matches = re.finditer(magic_number_pattern, code)
for match in matches:
# Skip common cases like years, ports, etc.
value = match.group()
if value not in ["2024", "2025", "2023", "3000", "8080", "8000", "5000"]:
smells.append(
{
"type": "MagicNumber",
"severity": "low",
"description": f"Magic number found: {value}",
"line": code[: match.start()].count("\n") + 1,
}
)
return smells
class CodeDuplicationChecker:
"""Check for code duplication."""
def calculate_duplication_score(self, code: str) -> float:
"""Calculate code duplication percentage."""
lines = [line.strip() for line in code.split("\n") if line.strip()]
if len(lines) < 10:
return 0.0
# Find duplicate lines
line_counts: Dict[str, int] = {}
for line in lines:
if len(line) > 10: # Only consider non-trivial lines
line_counts[line] = line_counts.get(line, 0) + 1
# Calculate duplication percentage
total_lines = len(lines)
duplicate_lines = sum(count - 1 for count in line_counts.values() if count > 1)
duplication_percentage = (duplicate_lines / total_lines) * 100 if total_lines > 0 else 0.0
return round(duplication_percentage, 2)
class ComplianceReportingFramework:
"""Framework for generating compliance reports."""
def __init__(self):
self.standards = self._load_compliance_standards()
def generate_compliance_report(self, security_report: SecurityReport, quality_assessment: QualityAssessment) -> Dict[str, Any]:
"""Generate comprehensive compliance report."""
report: Dict[str, Any] = {
"timestamp": datetime.now().isoformat(),
"project_path": security_report.project_path,
"overall_compliance_score": 0.0,
"standard_scores": {},
"findings": [],
"recommendations": [],
"certification_ready": False,
}
# Calculate scores for each standard
total_score: float = 0
standard_count: int = 0
for standard in ComplianceStandard:
score = self._calculate_standard_score(standard, security_report, quality_assessment)
report["standard_scores"][standard.name] = score
total_score += score
standard_count += 1
# Generate findings for this standard
findings = self._generate_standard_findings(standard, security_report, quality_assessment, score)
report["findings"].extend(findings)
# Calculate overall compliance score
report["overall_compliance_score"] = round(total_score / standard_count, 2) if standard_count > 0 else 0.0
# Determine if certification-ready
report["certification_ready"] = report["overall_compliance_score"] >= 85.0
# Generate recommendations
report["recommendations"] = self._generate_compliance_recommendations(report)
return report
def _load_compliance_standards(self) -> Dict[ComplianceStandard, Dict]:
"""Load compliance standards requirements."""
return {
ComplianceStandard.OWASP_TOP_10: {
"name": "OWASP Top 10",
"description": "Web application security risks",
"requirements": [
"No critical vulnerabilities",
"Limited high severity vulnerabilities",
"Proper input validation",
"Strong authentication and authorization",
],
"weight": 1.0,
},
ComplianceStandard.PCI_DSS: {
"name": "PCI DSS",
"description": "Payment Card Industry Data Security Standard",
"requirements": [
"No stored payment card data",
"Strong encryption for sensitive data",
"Regular security scanning",
"Access control and monitoring",
],
"weight": 1.2,
},
ComplianceStandard.GDPR: {
"name": "GDPR",
"description": "General Data Protection Regulation",
"requirements": ["Data protection by design", "User consent management", "Data breach notification", "Right to erasure"],
"weight": 0.9,
},
}
def _calculate_standard_score(
self, standard: ComplianceStandard, security_report: SecurityReport, quality_assessment: QualityAssessment
) -> float:
"""Calculate compliance score for a specific standard."""
standard_config = self.standards.get(standard, {})
weight = standard_config.get("weight", 1.0)
# Base score from security report
security_score: float = security_report.compliance_scores.get(standard, 100.0)
# Adjust based on quality assessment
quality_multiplier: float = min(1.0, quality_assessment.metrics.overall_score / 100.0)
# Apply weight and calculate final score
final_score: float = security_score * quality_multiplier * weight
return round(min(100.0, final_score), 2)
def _generate_standard_findings(
self, standard: ComplianceStandard, security_report: SecurityReport, quality_assessment: QualityAssessment, score: float
) -> List[Dict]:
"""Generate findings for a specific standard."""
findings = []
if score < 70:
findings.append(
{
"standard": standard.name,
"severity": "high",
"finding": f"Non-compliance with {standard.name}",
"description": f"Compliance score of {score}% is below acceptable threshold",
"recommendation": "Address identified security and quality issues",
}
)
elif score < 85:
findings.append(
{
"standard": standard.name,
"severity": "medium",
"finding": f"Partial compliance with {standard.name}",
"description": f"Compliance score of {score}% needs improvement",
"recommendation": "Review and address remaining issues",
}
)
return findings
def _generate_compliance_recommendations(self, report: Dict[str, Any]) -> List[str]:
"""Generate compliance recommendations."""
recommendations = []
overall_score = report["overall_compliance_score"]
if overall_score < 70:
recommendations.append("🚨 Critical: Major compliance violations found")
recommendations.append("🔧 Immediate remediation required for security vulnerabilities")
elif overall_score < 85:
recommendations.append("⚠️ Warning: Compliance gaps identified")
recommendations.append("📋 Address medium and high severity findings")
else:
recommendations.append("✅ Good compliance posture maintained")
# Specific recommendations based on standards
for standard_name, score in report["standard_scores"].items():
if score < 70:
recommendations.append(f"📊 {standard_name}: Score {score}% - requires attention")
return recommendations
class QualityGateAutomation:
"""Automated quality gate system."""
def __init__(self):
self.gates = self._load_default_gates()
def evaluate_quality_gates(
self, quality_assessment: QualityAssessment, custom_gates: Optional[List[QualityGate]] = None
) -> Dict[str, Any]:
"""Evaluate quality gates and return results."""
gates_to_evaluate = custom_gates or self.gates
results: Dict[str, Any] = {
"timestamp": datetime.now().isoformat(),
"overall_result": "PASSED",
"gate_results": [],
"failed_gates": [],
"warnings": [],
"metrics": quality_assessment.metrics,
}
for gate in gates_to_evaluate:
if not gate.enabled:
continue
result = self._evaluate_single_gate(gate, quality_assessment.metrics)
results["gate_results"].append(result)
if result["status"] == "FAILED":
results["failed_gates"].append(result)
results["overall_result"] = "FAILED"
elif result["status"] == "WARNING":
results["warnings"].append(result)
return results
def _load_default_gates(self) -> List[QualityGate]:
"""Load default quality gates."""
return [
QualityGate(
name="Cyclomatic Complexity", metric=QualityMetric.CYCLOMATIC_COMPLEXITY, threshold=15, operator="lte", severity="error"
),
QualityGate(
name="Maintainability Index", metric=QualityMetric.MAINTAINABILITY_INDEX, threshold=65, operator="gte", severity="error"
),
QualityGate(name="Code Coverage", metric=QualityMetric.CODE_COVERAGE, threshold=80, operator="gte", severity="warning"),
QualityGate(name="Technical Debt", metric=QualityMetric.TECHNICAL_DEBT, threshold=5, operator="lte", severity="warning"),
QualityGate(name="Code Smells", metric=QualityMetric.CODE_SMELLS, threshold=10, operator="lte", severity="warning"),
QualityGate(name="Security Issues", metric=QualityMetric.SECURITY_ISSUES, threshold=0, operator="lte", severity="error"),
]
def _evaluate_single_gate(self, gate: QualityGate, metrics: QualityMetrics) -> Dict[str, Any]:
"""Evaluate a single quality gate."""
# Get metric value
metric_value = getattr(metrics, gate.metric.value, 0)
# Evaluate based on operator (support both symbolic and text formats)
if gate.operator in (">", "gt"):
passed = metric_value > gate.threshold
elif gate.operator in (">=", "gte"):
passed = metric_value >= gate.threshold
elif gate.operator in ("<", "lt"):
passed = metric_value < gate.threshold
elif gate.operator in ("<=", "lte"):
passed = metric_value <= gate.threshold
else:
passed = True # Default to passed if invalid operator
# Determine status
if passed:
status = "PASSED"
elif gate.severity == "error":
status = "FAILED"
else:
status = "WARNING"
return {
"gate_name": gate.name,
"metric": gate.metric.value,
"value": metric_value,
"threshold": gate.threshold,
"operator": gate.operator,
"status": status,
"severity": gate.severity,
"message": self._generate_gate_message(gate, metric_value, passed),
}
def _generate_gate_message(self, gate: QualityGate, value: float, passed: bool) -> str:
"""Generate user-friendly gate result message."""
if passed:
return f"✅ {gate.name}: {value:.1f} (threshold: {gate.threshold})"
else:
return f"❌ {gate.name}: {value:.1f} (threshold: {gate.threshold} {gate.operator})"