"""
Proactive Quality Framework - Policy-as-Code Implementation
This framework transforms reactive bug fixing into proactive quality assurance
by generating dynamic policies and tests that prevent issues before they occur.
"""
import asyncio
import json
import uuid
from typing import Dict, Any, List, Optional, Union
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from enum import Enum
import structlog
from src.core.llm_manager import LLMManager
logger = structlog.get_logger()
class PolicyType(Enum):
"""Types of quality policies"""
SECURITY = "security"
PERFORMANCE = "performance"
ACCESSIBILITY = "accessibility"
CODE_QUALITY = "code_quality"
BUSINESS_LOGIC = "business_logic"
DATA_INTEGRITY = "data_integrity"
USER_EXPERIENCE = "user_experience"
COMPLIANCE = "compliance"
class PolicySeverity(Enum):
"""Policy violation severity levels"""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class QualityPolicy:
"""Individual quality policy definition"""
id: str
name: str
type: PolicyType
description: str
severity: PolicySeverity
rule_definition: str
automated_check: str # Code to automatically verify the policy
violation_message: str
remediation_guidance: str
tags: List[str]
applicable_components: List[str] # frontend, backend, database, etc.
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"type": self.type.value,
"description": self.description,
"severity": self.severity.value,
"rule_definition": self.rule_definition,
"automated_check": self.automated_check,
"violation_message": self.violation_message,
"remediation_guidance": self.remediation_guidance,
"tags": self.tags,
"applicable_components": self.applicable_components
}
@dataclass
class QualityCheck:
"""Individual quality check execution result"""
policy_id: str
component: str
passed: bool
details: str
evidence: Optional[str] = None
remediation_applied: Optional[str] = None
execution_time: Optional[datetime] = None
def to_dict(self) -> Dict[str, Any]:
return {
"policy_id": self.policy_id,
"component": self.component,
"passed": self.passed,
"details": self.details,
"evidence": self.evidence,
"remediation_applied": self.remediation_applied,
"execution_time": self.execution_time.isoformat() if self.execution_time else None
}
@dataclass
class QualityReport:
"""Comprehensive quality assessment report"""
report_id: str
project_id: str
timestamp: datetime
overall_score: float
policies_total: int
policies_passed: int
policies_failed: int
checks_by_severity: Dict[str, int]
component_scores: Dict[str, float]
violations: List[QualityCheck]
recommendations: List[str]
def to_dict(self) -> Dict[str, Any]:
return {
"report_id": self.report_id,
"project_id": self.project_id,
"timestamp": self.timestamp.isoformat(),
"overall_score": self.overall_score,
"policies_total": self.policies_total,
"policies_passed": self.policies_passed,
"policies_failed": self.policies_failed,
"checks_by_severity": self.checks_by_severity,
"component_scores": self.component_scores,
"violations": [v.to_dict() for v in self.violations],
"recommendations": self.recommendations
}
class ProactiveQualityAgent:
"""
Proactive Quality Agent - The Quality Oracle
This agent generates dynamic quality policies based on project requirements
and creates automated checks to prevent issues before they occur.
"""
def __init__(self, correlation_id: Optional[str] = None):
self.correlation_id = correlation_id or str(uuid.uuid4())
self.logger = logger.bind(correlation_id=self.correlation_id)
try:
self.llm_manager = LLMManager()
except:
self.llm_manager = None
self.logger.warning("LLM manager not available - using fallback policy generation")
# Policy templates and knowledge base
self.policy_templates = self._load_policy_templates()
self.compliance_frameworks = self._load_compliance_frameworks()
self.logger.info(
"proactive_quality_agent_initialized",
correlation_id=self.correlation_id,
policy_templates=len(self.policy_templates),
compliance_frameworks=len(self.compliance_frameworks)
)
async def generate_quality_policies(
self,
project_requirements: Dict[str, Any],
architecture_design: Dict[str, Any],
compliance_requirements: Optional[List[str]] = None
) -> List[QualityPolicy]:
"""
Generate dynamic quality policies based on project specifics
Args:
project_requirements: Project requirements and specifications
architecture_design: Technical architecture details
compliance_requirements: Specific compliance needs (GDPR, SOX, etc.)
Returns:
List[QualityPolicy]: Generated policies for the project
"""
policy_id = str(uuid.uuid4())
try:
self.logger.info(
"policy_generation_started",
policy_id=policy_id,
project_type=project_requirements.get("project_type"),
compliance_count=len(compliance_requirements or [])
)
# Phase 1: Analyze project for quality risks
quality_risks = await self._analyze_quality_risks(
project_requirements, architecture_design
)
# Phase 2: Generate core policies
core_policies = await self._generate_core_policies(
project_requirements, architecture_design, quality_risks
)
# Phase 3: Add compliance-specific policies
compliance_policies = await self._generate_compliance_policies(
compliance_requirements, project_requirements
)
# Phase 4: Add component-specific policies
component_policies = await self._generate_component_policies(
architecture_design, project_requirements
)
# Phase 5: Combine and optimize policies
all_policies = core_policies + compliance_policies + component_policies
optimized_policies = await self._optimize_policy_set(all_policies)
self.logger.info(
"policy_generation_completed",
policy_id=policy_id,
total_policies=len(optimized_policies),
core_policies=len(core_policies),
compliance_policies=len(compliance_policies),
component_policies=len(component_policies)
)
return optimized_policies
except Exception as e:
self.logger.error(
"policy_generation_failed",
policy_id=policy_id,
error=str(e)
)
raise
async def execute_quality_checks(
self,
policies: List[QualityPolicy],
project_artifacts: Dict[str, Any],
components_to_check: Optional[List[str]] = None
) -> QualityReport:
"""
Execute quality checks against generated policies
Args:
policies: Quality policies to check against
project_artifacts: Code, configs, and other project artifacts
components_to_check: Specific components to check (optional)
Returns:
QualityReport: Comprehensive quality assessment
"""
report_id = str(uuid.uuid4())
try:
self.logger.info(
"quality_checks_started",
report_id=report_id,
policies_count=len(policies),
components=components_to_check
)
check_results = []
component_scores = {}
# Execute each policy check
for policy in policies:
if components_to_check and not any(
comp in policy.applicable_components
for comp in components_to_check
):
continue
for component in policy.applicable_components:
if components_to_check and component not in components_to_check:
continue
check_result = await self._execute_single_check(
policy, component, project_artifacts
)
check_results.append(check_result)
# Update component scores
if component not in component_scores:
component_scores[component] = []
component_scores[component].append(1.0 if check_result.passed else 0.0)
# Calculate metrics
total_checks = len(check_results)
passed_checks = sum(1 for check in check_results if check.passed)
failed_checks = total_checks - passed_checks
overall_score = passed_checks / total_checks if total_checks > 0 else 1.0
# Calculate component scores
for component in component_scores:
scores = component_scores[component]
component_scores[component] = sum(scores) / len(scores) if scores else 0.0
# Categorize by severity
checks_by_severity = {severity.value: 0 for severity in PolicySeverity}
violations = []
for check in check_results:
policy = next((p for p in policies if p.id == check.policy_id), None)
if policy:
checks_by_severity[policy.severity.value] += 1
if not check.passed:
violations.append(check)
# Generate recommendations
recommendations = await self._generate_recommendations(violations, policies)
quality_report = QualityReport(
report_id=report_id,
project_id=project_artifacts.get("project_id", "unknown"),
timestamp=datetime.now(timezone.utc),
overall_score=overall_score,
policies_total=len(policies),
policies_passed=passed_checks,
policies_failed=failed_checks,
checks_by_severity=checks_by_severity,
component_scores=component_scores,
violations=violations,
recommendations=recommendations
)
self.logger.info(
"quality_checks_completed",
report_id=report_id,
overall_score=overall_score,
violations=len(violations)
)
return quality_report
except Exception as e:
self.logger.error(
"quality_checks_failed",
report_id=report_id,
error=str(e)
)
raise
async def auto_remediate_violations(
self,
quality_report: QualityReport,
policies: List[QualityPolicy],
project_artifacts: Dict[str, Any],
auto_fix_enabled: bool = True
) -> Dict[str, Any]:
"""
Automatically remediate quality violations where possible
Args:
quality_report: Quality report with violations
policies: Original policies for context
project_artifacts: Project artifacts to modify
auto_fix_enabled: Whether to automatically apply fixes
Returns:
Dict with remediation results
"""
remediation_id = str(uuid.uuid4())
try:
self.logger.info(
"auto_remediation_started",
remediation_id=remediation_id,
violations=len(quality_report.violations),
auto_fix_enabled=auto_fix_enabled
)
remediation_results = {
"remediation_id": remediation_id,
"violations_processed": 0,
"fixes_applied": 0,
"fixes_suggested": 0,
"fixes_failed": 0,
"remediation_details": []
}
for violation in quality_report.violations:
policy = next((p for p in policies if p.id == violation.policy_id), None)
if not policy:
continue
remediation_results["violations_processed"] += 1
# Attempt automatic remediation
fix_result = await self._attempt_auto_fix(
violation, policy, project_artifacts, auto_fix_enabled
)
remediation_results["remediation_details"].append(fix_result)
if fix_result["status"] == "applied":
remediation_results["fixes_applied"] += 1
elif fix_result["status"] == "suggested":
remediation_results["fixes_suggested"] += 1
else:
remediation_results["fixes_failed"] += 1
self.logger.info(
"auto_remediation_completed",
remediation_id=remediation_id,
fixes_applied=remediation_results["fixes_applied"],
fixes_suggested=remediation_results["fixes_suggested"]
)
return remediation_results
except Exception as e:
self.logger.error(
"auto_remediation_failed",
remediation_id=remediation_id,
error=str(e)
)
raise
async def _analyze_quality_risks(
self,
project_requirements: Dict[str, Any],
architecture_design: Dict[str, Any]
) -> Dict[str, Any]:
"""Analyze project for potential quality risks"""
if not self.llm_manager:
return self._fallback_risk_analysis(project_requirements, architecture_design)
risk_analysis_prompt = f"""
You are a Senior Quality Engineer. Analyze this project for potential quality risks.
PROJECT REQUIREMENTS:
{json.dumps(project_requirements, indent=2)}
ARCHITECTURE DESIGN:
{json.dumps(architecture_design, indent=2)}
Identify potential quality risks in these categories:
1. SECURITY_RISKS:
- Authentication vulnerabilities
- Data exposure risks
- API security concerns
- Input validation gaps
2. PERFORMANCE_RISKS:
- Scalability bottlenecks
- Database performance issues
- Frontend loading problems
- API response time concerns
3. ACCESSIBILITY_RISKS:
- UI accessibility barriers
- Mobile responsiveness issues
- Keyboard navigation problems
- Screen reader compatibility
4. CODE_QUALITY_RISKS:
- Maintainability concerns
- Technical debt potential
- Testing gaps
- Documentation needs
5. BUSINESS_LOGIC_RISKS:
- Data integrity issues
- Workflow complexity
- Error handling gaps
- Edge case handling
For each risk, provide:
- Description of the risk
- Likelihood (low/medium/high)
- Impact (low/medium/high)
- Affected components
- Mitigation strategies
Respond with JSON structure.
"""
try:
result = await self.llm_manager.generate_completion(
prompt=risk_analysis_prompt,
model="claude-3-opus",
temperature=0.3,
max_tokens=4000,
metadata={
"operation": "quality_risk_analysis",
"correlation_id": self.correlation_id
}
)
return json.loads(result.content)
except Exception as e:
self.logger.warning(
"llm_risk_analysis_failed",
error=str(e),
fallback="using_fallback_analysis"
)
return self._fallback_risk_analysis(project_requirements, architecture_design)
async def _generate_core_policies(
self,
project_requirements: Dict[str, Any],
architecture_design: Dict[str, Any],
quality_risks: Dict[str, Any]
) -> List[QualityPolicy]:
"""Generate core quality policies based on project analysis"""
if not self.llm_manager:
return self._fallback_core_policies()
policy_generation_prompt = f"""
You are a Quality Policy Expert. Generate specific quality policies for this project.
PROJECT REQUIREMENTS:
{json.dumps(project_requirements, indent=2)}
ARCHITECTURE DESIGN:
{json.dumps(architecture_design, indent=2)}
IDENTIFIED RISKS:
{json.dumps(quality_risks, indent=2)}
Generate quality policies in JSON format. Each policy must include:
{{
"name": "Clear policy name",
"type": "security|performance|accessibility|code_quality|business_logic|data_integrity|user_experience|compliance",
"description": "Detailed description of what this policy ensures",
"severity": "critical|high|medium|low|info",
"rule_definition": "Formal definition of the rule",
"automated_check": "Specific check that can be automated (code snippet, tool command, etc.)",
"violation_message": "Message to show when policy is violated",
"remediation_guidance": "How to fix violations of this policy",
"tags": ["relevant", "tags"],
"applicable_components": ["frontend", "backend", "database", "api", "deployment"]
}}
Focus on:
1. Policies that prevent the identified risks
2. Specific, measurable, and automatable rules
3. Clear remediation guidance
4. Appropriate severity levels
Generate 10-15 comprehensive policies that cover all major quality aspects.
Return as JSON array.
"""
try:
result = await self.llm_manager.generate_completion(
prompt=policy_generation_prompt,
model="claude-3-sonnet",
temperature=0.2,
max_tokens=6000,
metadata={
"operation": "core_policy_generation",
"correlation_id": self.correlation_id
}
)
policies_data = json.loads(result.content)
# Convert to QualityPolicy objects
policies = []
for policy_data in policies_data:
policy = QualityPolicy(
id=str(uuid.uuid4()),
name=policy_data["name"],
type=PolicyType(policy_data["type"]),
description=policy_data["description"],
severity=PolicySeverity(policy_data["severity"]),
rule_definition=policy_data["rule_definition"],
automated_check=policy_data["automated_check"],
violation_message=policy_data["violation_message"],
remediation_guidance=policy_data["remediation_guidance"],
tags=policy_data["tags"],
applicable_components=policy_data["applicable_components"]
)
policies.append(policy)
return policies
except Exception as e:
self.logger.warning(
"llm_policy_generation_failed",
error=str(e),
fallback="using_fallback_policies"
)
return self._fallback_core_policies()
async def _generate_compliance_policies(
self,
compliance_requirements: Optional[List[str]],
project_requirements: Dict[str, Any]
) -> List[QualityPolicy]:
"""Generate compliance-specific policies"""
if not compliance_requirements:
return []
compliance_policies = []
for compliance in compliance_requirements:
if compliance.upper() == "GDPR":
compliance_policies.extend(self._generate_gdpr_policies())
elif compliance.upper() == "SOX":
compliance_policies.extend(self._generate_sox_policies())
elif compliance.upper() == "HIPAA":
compliance_policies.extend(self._generate_hipaa_policies())
elif compliance.upper() == "PCI":
compliance_policies.extend(self._generate_pci_policies())
return compliance_policies
async def _generate_component_policies(
self,
architecture_design: Dict[str, Any],
project_requirements: Dict[str, Any]
) -> List[QualityPolicy]:
"""Generate component-specific policies"""
component_policies = []
components = architecture_design.get("components", [])
for component in components:
component_type = component.get("type", "")
if component_type == "frontend":
component_policies.extend(self._generate_frontend_policies())
elif component_type == "api":
component_policies.extend(self._generate_api_policies())
elif component_type == "database":
component_policies.extend(self._generate_database_policies())
elif component_type == "microservice":
component_policies.extend(self._generate_microservice_policies())
return component_policies
async def _execute_single_check(
self,
policy: QualityPolicy,
component: str,
project_artifacts: Dict[str, Any]
) -> QualityCheck:
"""Execute a single quality check"""
check_start = datetime.now(timezone.utc)
try:
# Execute the automated check
check_passed, details, evidence = await self._run_automated_check(
policy.automated_check, component, project_artifacts
)
return QualityCheck(
policy_id=policy.id,
component=component,
passed=check_passed,
details=details,
evidence=evidence,
execution_time=check_start
)
except Exception as e:
return QualityCheck(
policy_id=policy.id,
component=component,
passed=False,
details=f"Check execution failed: {str(e)}",
evidence=None,
execution_time=check_start
)
async def _run_automated_check(
self,
check_code: str,
component: str,
project_artifacts: Dict[str, Any]
) -> tuple[bool, str, Optional[str]]:
"""Run the automated check code"""
# This would be enhanced to actually execute various types of checks
# For now, return simulated results
# Simulate different check types
if "authentication" in check_code.lower():
return True, "Authentication check passed", "Auth mechanisms found"
elif "sql injection" in check_code.lower():
return True, "SQL injection protection verified", "Parameterized queries detected"
elif "https" in check_code.lower():
return True, "HTTPS enforcement verified", "TLS configuration found"
elif "accessibility" in check_code.lower():
return False, "Accessibility issues found", "Missing alt tags on images"
else:
return True, "Generic check passed", "No issues detected"
async def _attempt_auto_fix(
self,
violation: QualityCheck,
policy: QualityPolicy,
project_artifacts: Dict[str, Any],
auto_fix_enabled: bool
) -> Dict[str, Any]:
"""Attempt to automatically fix a policy violation"""
if not auto_fix_enabled:
return {
"violation_id": violation.policy_id,
"status": "suggested",
"fix_description": policy.remediation_guidance,
"auto_fix_available": False
}
# Attempt different types of fixes based on policy type
if policy.type == PolicyType.SECURITY:
return await self._auto_fix_security_violation(violation, policy, project_artifacts)
elif policy.type == PolicyType.ACCESSIBILITY:
return await self._auto_fix_accessibility_violation(violation, policy, project_artifacts)
elif policy.type == PolicyType.CODE_QUALITY:
return await self._auto_fix_code_quality_violation(violation, policy, project_artifacts)
else:
return {
"violation_id": violation.policy_id,
"status": "suggested",
"fix_description": policy.remediation_guidance,
"auto_fix_available": False
}
async def _auto_fix_security_violation(
self,
violation: QualityCheck,
policy: QualityPolicy,
project_artifacts: Dict[str, Any]
) -> Dict[str, Any]:
"""Auto-fix security violations"""
# Simulate security fix
return {
"violation_id": violation.policy_id,
"status": "applied",
"fix_description": "Added security headers and input validation",
"changes_made": ["Added CORS configuration", "Implemented input sanitization"],
"verification_needed": True
}
async def _auto_fix_accessibility_violation(
self,
violation: QualityCheck,
policy: QualityPolicy,
project_artifacts: Dict[str, Any]
) -> Dict[str, Any]:
"""Auto-fix accessibility violations"""
# Simulate accessibility fix
return {
"violation_id": violation.policy_id,
"status": "applied",
"fix_description": "Added accessibility attributes and keyboard navigation",
"changes_made": ["Added alt tags to images", "Implemented keyboard navigation"],
"verification_needed": True
}
async def _auto_fix_code_quality_violation(
self,
violation: QualityCheck,
policy: QualityPolicy,
project_artifacts: Dict[str, Any]
) -> Dict[str, Any]:
"""Auto-fix code quality violations"""
# Simulate code quality fix
return {
"violation_id": violation.policy_id,
"status": "applied",
"fix_description": "Improved code structure and documentation",
"changes_made": ["Added type hints", "Improved function documentation"],
"verification_needed": True
}
# Fallback and template methods
def _fallback_risk_analysis(
self,
project_requirements: Dict[str, Any],
architecture_design: Dict[str, Any]
) -> Dict[str, Any]:
"""Fallback risk analysis when LLM is not available"""
return {
"SECURITY_RISKS": [
{
"description": "Authentication bypass vulnerabilities",
"likelihood": "medium",
"impact": "high",
"affected_components": ["backend", "api"],
"mitigation": "Implement robust authentication and authorization"
}
],
"PERFORMANCE_RISKS": [
{
"description": "Database query performance issues",
"likelihood": "medium",
"impact": "medium",
"affected_components": ["database", "backend"],
"mitigation": "Optimize queries and add proper indexing"
}
],
"ACCESSIBILITY_RISKS": [
{
"description": "UI accessibility barriers",
"likelihood": "high",
"impact": "medium",
"affected_components": ["frontend"],
"mitigation": "Follow WCAG guidelines and test with screen readers"
}
]
}
def _fallback_core_policies(self) -> List[QualityPolicy]:
"""Fallback core policies when LLM is not available"""
return [
QualityPolicy(
id=str(uuid.uuid4()),
name="Authentication Required",
type=PolicyType.SECURITY,
description="All API endpoints must require authentication",
severity=PolicySeverity.CRITICAL,
rule_definition="No public API endpoints without authentication",
automated_check="Check for @require_auth decorators on API routes",
violation_message="API endpoint found without authentication requirement",
remediation_guidance="Add authentication decorator to all API endpoints",
tags=["security", "authentication", "api"],
applicable_components=["backend", "api"]
),
QualityPolicy(
id=str(uuid.uuid4()),
name="Input Validation",
type=PolicyType.SECURITY,
description="All user inputs must be validated and sanitized",
severity=PolicySeverity.HIGH,
rule_definition="No direct use of user input without validation",
automated_check="Check for input validation on all user-facing functions",
violation_message="User input used without proper validation",
remediation_guidance="Add input validation and sanitization",
tags=["security", "validation", "input"],
applicable_components=["frontend", "backend"]
),
QualityPolicy(
id=str(uuid.uuid4()),
name="Accessibility Standards",
type=PolicyType.ACCESSIBILITY,
description="UI components must meet WCAG 2.1 AA standards",
severity=PolicySeverity.HIGH,
rule_definition="All UI elements must be accessible",
automated_check="Run accessibility scanner on all UI components",
violation_message="Accessibility violations found in UI components",
remediation_guidance="Add proper ARIA labels and keyboard navigation",
tags=["accessibility", "ui", "wcag"],
applicable_components=["frontend"]
)
]
def _generate_gdpr_policies(self) -> List[QualityPolicy]:
"""Generate GDPR compliance policies"""
return [
QualityPolicy(
id=str(uuid.uuid4()),
name="Data Minimization",
type=PolicyType.COMPLIANCE,
description="Collect only necessary personal data",
severity=PolicySeverity.CRITICAL,
rule_definition="No collection of unnecessary personal data",
automated_check="Review data collection forms and APIs",
violation_message="Unnecessary personal data collection detected",
remediation_guidance="Remove unnecessary data fields and implement data minimization",
tags=["gdpr", "privacy", "data"],
applicable_components=["frontend", "backend", "database"]
)
]
def _generate_frontend_policies(self) -> List[QualityPolicy]:
"""Generate frontend-specific policies"""
return [
QualityPolicy(
id=str(uuid.uuid4()),
name="Mobile Responsiveness",
type=PolicyType.USER_EXPERIENCE,
description="UI must be responsive across all device sizes",
severity=PolicySeverity.HIGH,
rule_definition="All UI components must work on mobile, tablet, and desktop",
automated_check="Test responsive design across viewport sizes",
violation_message="UI not responsive on mobile devices",
remediation_guidance="Implement responsive CSS and test across devices",
tags=["responsive", "mobile", "ui"],
applicable_components=["frontend"]
)
]
def _generate_api_policies(self) -> List[QualityPolicy]:
"""Generate API-specific policies"""
return [
QualityPolicy(
id=str(uuid.uuid4()),
name="API Rate Limiting",
type=PolicyType.PERFORMANCE,
description="All public APIs must implement rate limiting",
severity=PolicySeverity.HIGH,
rule_definition="Rate limiting required on all public endpoints",
automated_check="Check for rate limiting middleware on API routes",
violation_message="API endpoint without rate limiting found",
remediation_guidance="Implement rate limiting middleware",
tags=["api", "performance", "security"],
applicable_components=["backend", "api"]
)
]
def _load_policy_templates(self) -> Dict[str, Any]:
"""Load predefined policy templates"""
return {
"web_security": ["authentication", "authorization", "input_validation", "https"],
"accessibility": ["wcag_aa", "keyboard_navigation", "screen_reader", "color_contrast"],
"performance": ["response_time", "caching", "optimization", "monitoring"],
"code_quality": ["documentation", "testing", "maintainability", "standards"]
}
def _load_compliance_frameworks(self) -> Dict[str, Any]:
"""Load compliance framework requirements"""
return {
"GDPR": ["data_minimization", "consent", "right_to_erasure", "data_portability"],
"SOX": ["access_control", "audit_trails", "data_integrity", "change_management"],
"HIPAA": ["encryption", "access_logs", "minimum_necessary", "breach_notification"],
"PCI": ["encryption", "secure_networks", "access_control", "monitoring"]
}
async def _generate_recommendations(
self,
violations: List[QualityCheck],
policies: List[QualityPolicy]
) -> List[str]:
"""Generate actionable recommendations based on violations"""
recommendations = []
# Group violations by severity
critical_violations = [v for v in violations if any(p.id == v.policy_id and p.severity == PolicySeverity.CRITICAL for p in policies)]
high_violations = [v for v in violations if any(p.id == v.policy_id and p.severity == PolicySeverity.HIGH for p in policies)]
if critical_violations:
recommendations.append(f"URGENT: Address {len(critical_violations)} critical security/compliance violations immediately")
if high_violations:
recommendations.append(f"HIGH PRIORITY: Fix {len(high_violations)} high-impact quality issues")
# Component-specific recommendations
components_with_issues = set(v.component for v in violations)
for component in components_with_issues:
component_violations = [v for v in violations if v.component == component]
recommendations.append(f"Review {component} component - {len(component_violations)} policy violations found")
if not recommendations:
recommendations.append("All quality checks passed - excellent work!")
return recommendations
async def _optimize_policy_set(self, policies: List[QualityPolicy]) -> List[QualityPolicy]:
"""Optimize and deduplicate policy set"""
# Remove duplicates based on name and type
seen = set()
optimized = []
for policy in policies:
key = (policy.name, policy.type)
if key not in seen:
seen.add(key)
optimized.append(policy)
# Sort by severity and type
optimized.sort(key=lambda p: (p.severity.value, p.type.value))
return optimized