Skip to main content
Glama
apolosan

Design Patterns MCP Server

by apolosan
ai-governance-ethics-patterns.json.backup42 kB
{ "patterns": [ { "id": "bias-mitigation", "name": "Bias Mitigation Pattern", "category": "AI Governance", "description": "Systematically identifies, measures, and reduces bias in AI systems to ensure fair and equitable outcomes across different demographic groups", "problem": "AI models trained on biased data produce discriminatory outcomes, leading to unfair treatment of protected groups and erosion of trust", "solution": "Implement comprehensive bias detection, measurement, and mitigation strategies throughout the ML pipeline using fairness-aware algorithms and regular audits", "when_to_use": [ "Training AI models for decision-making systems", "Processing sensitive data about protected groups", "Building recommendation or classification systems", "Regulatory compliance requirements" ], "benefits": [ "Fair and equitable AI outcomes", "Regulatory compliance", "Improved model reliability", "Enhanced user trust", "Reduced legal and ethical risks" ], "drawbacks": [ "Increased computational complexity", "Potential trade-offs with accuracy", "Requires domain expertise", "Continuous monitoring needed" ], "use_cases": [ "Hiring and recruitment systems", "Loan approval algorithms", "Criminal justice risk assessment", "Healthcare diagnostic tools", "Content recommendation systems" ], "related_patterns": ["Fairness Pattern", "Interpretability Pattern", "AI Governance Pattern"], "complexity": "High", "tags": ["ai-governance", "ethics", "fairness", "bias", "compliance", "responsible-ai"], "code_examples": [ { "language": "python", "code": "import pandas as pd import numpy as np from sklearn.metrics import classification_report class BiasDetector: def __init__(self, protected_attribute='gender', privileged_group=1): self.protected_attr = protected_attribute self.privileged = privileged_group def detect_bias(self, X, y, protected_attributes): # Calculate selection rates by group privileged_mask = (protected_attributes[self.protected_attr] == self.privileged) unprivileged_mask = (protected_attributes[self.protected_attr] != self.privileged) privileged_rate = y[privileged_mask].mean() unprivileged_rate = y[unprivileged_mask].mean() disparate_impact = unprivileged_rate / privileged_rate if privileged_rate > 0 else 0 return { 'disparate_impact': disparate_impact, 'privileged_rate': privileged_rate, 'unprivileged_rate': unprivileged_rate } # Usage detector = BiasDetector() data = pd.read_csv('loan_data.csv') bias_metrics = detector.detect_bias(data.drop('approved', axis=1), data['approved'], data[['gender']]) print(f'Disparate Impact: {bias_metrics[\"disparate_impact\"]:.3f}')" }, { "language": "python", "code": "from fairlearn.metrics import demographic_parity_difference import numpy as np def evaluate_fairness(y_true, y_pred, sensitive_features): # Calculate demographic parity difference parity_diff = demographic_parity_difference(y_true, y_pred, sensitive_features=sensitive_features) # Calculate selection rates by group groups = sensitive_features.unique() selection_rates = {} for group in groups: mask = sensitive_features == group selection_rates[str(group)] = y_pred[mask].mean() return { 'demographic_parity_difference': parity_diff, 'selection_rates_by_group': selection_rates } # Usage fairness_metrics = evaluate_fairness(y_test, predictions, test_data['gender']) print(f'Demographic Parity Difference: {fairness_metrics[\"demographic_parity_difference\"]:.3f}')" } ] }, { "id": "fairness-pattern", "name": "Fairness Pattern", "category": "AI Governance", "description": "Ensures AI systems provide equitable outcomes across different demographic groups by implementing fairness metrics, constraints, and monitoring throughout the ML lifecycle", "problem": "AI systems can perpetuate or amplify societal inequalities when not designed with fairness considerations, leading to discriminatory outcomes", "solution": "Integrate fairness metrics, constraints, and monitoring into ML pipelines using fairness-aware algorithms, regular audits, and stakeholder feedback loops", "when_to_use": [ "Building decision-making AI systems", "Processing data about protected characteristics", "High-stakes applications (healthcare, finance, justice)", "Systems with potential societal impact" ], "benefits": [ "Equitable treatment across groups", "Regulatory compliance", "Improved model robustness", "Enhanced social acceptance", "Reduced discrimination risks" ], "drawbacks": [ "Accuracy-fairness trade-offs", "Increased complexity", "Context-dependent fairness definitions", "Continuous monitoring required" ], "use_cases": [ "Automated hiring systems", "Credit scoring algorithms", "Predictive policing tools", "Healthcare resource allocation", "Educational assessment systems" ], "related_patterns": ["Bias Mitigation Pattern", "Interpretability Pattern", "AI Governance Pattern"], "complexity": "High", "tags": ["ai-governance", "ethics", "fairness", "equity", "responsible-ai", "compliance"], "code_examples": [ { "language": "python", "code": "import numpy as np import pandas as pd def calculate_fairness_metrics(y_true, y_pred, sensitive_features): metrics = {} # Calculate metrics for each group for group in sensitive_features.unique(): mask = sensitive_features == group group_metrics = { 'selection_rate': y_pred[mask].mean(), 'accuracy': (y_true[mask] == y_pred[mask]).mean() } metrics[str(group)] = group_metrics # Calculate disparities groups = list(metrics.keys()) if len(groups) >= 2: selection_rates = [metrics[g]['selection_rate'] for g in groups] metrics['selection_rate_disparity'] = max(selection_rates) - min(selection_rates) return metrics # Usage fairness_metrics = calculate_fairness_metrics(y_test, predictions, test_data['gender']) print(f'Selection Rate Disparity: {fairness_metrics.get(\"selection_rate_disparity\", 0):.3f}')" }, { "language": "python", "code": "from fairlearn.postprocessing import ThresholdOptimizer from sklearn.ensemble import RandomForestClassifier class FairnessConstrainedModel: def __init__(self, fairness_constraint='demographic_parity'): self.fairness_constraint = fairness_constraint self.postprocessor = None def fit(self, X_train, y_train, sensitive_features): self.postprocessor = ThresholdOptimizer( estimator=RandomForestClassifier(random_state=42), constraints=self.fairness_constraint ) self.postprocessor.fit(X_train, y_train, sensitive_features=sensitive_features) return self def predict(self, X, sensitive_features=None): return self.postprocessor.predict(X, sensitive_features=sensitive_features) # Usage fair_model = FairnessConstrainedModel() fair_model.fit(X_train, y_train, sensitive_features_train) fair_predictions = fair_model.predict(X_test, sensitive_features_test)" } ] }, { "id": "interpretability-pattern", "name": "Interpretability Pattern", "category": "AI Governance", "description": "Makes AI model decisions transparent and understandable to humans through explanation techniques, enabling trust, debugging, and regulatory compliance", "problem": "Complex AI models act as 'black boxes' making it impossible to understand decision-making processes, leading to lack of trust and regulatory non-compliance", "solution": "Implement interpretability techniques like feature importance analysis, partial dependence plots, SHAP values, and LIME explanations throughout the ML pipeline", "when_to_use": [ "High-stakes decision-making systems", "Regulatory compliance requirements", "Model debugging and validation", "Building user trust in AI systems", "Bias detection and mitigation" ], "benefits": [ "Increased model transparency", "Easier debugging and validation", "Regulatory compliance", "Enhanced user trust", "Better model understanding" ], "drawbacks": [ "Computational overhead", "May reduce model accuracy", "Complex implementation", "Interpretation limitations for complex models" ], "use_cases": [ "Financial risk assessment models", "Medical diagnosis systems", "Autonomous vehicle decisions", "Credit scoring algorithms", "Content moderation systems" ], "related_patterns": ["Bias Mitigation Pattern", "Fairness Pattern", "AI Governance Pattern"], "complexity": "Medium", "tags": ["ai-governance", "ethics", "interpretability", "explainability", "xai", "responsible-ai"], "code_examples": [ { "language": "python", "code": "import shap from sklearn.ensemble import RandomForestClassifier class InterpretableModel: def __init__(self): self.model = RandomForestClassifier(random_state=42) self.explainer = None def fit(self, X_train, y_train): self.model.fit(X_train, y_train) self.explainer = shap.TreeExplainer(self.model) return self def explain_prediction(self, X_instance): shap_values = self.explainer.shap_values(X_instance) return dict(zip(X_instance.columns, shap_values[1][0])) # Usage model = InterpretableModel() model.fit(X_train, y_train) explanation = model.explain_prediction(X_test.iloc[[0]]) print('Feature contributions:', explanation)" }, { "language": "python", "code": "import shap from sklearn.neural_network import MLPClassifier class InterpretableNN: def __init__(self): self.model = MLPClassifier(hidden_layer_sizes=(64, 32), random_state=42) def fit(self, X_train, y_train): self.model.fit(X_train, y_train) return self def explain_with_shap(self, X_background, X_instance): explainer = shap.KernelExplainer(self.model.predict_proba, X_background) shap_values = explainer.shap_values(X_instance) return shap_values # Usage nn_model = InterpretableNN() nn_model.fit(X_train, y_train) shap_values = nn_model.explain_with_shap(X_train[:100], X_test[:1]) print('SHAP values shape:', shap_values.shape)" } ] }, { "id": "ai-privacy-pattern", "name": "AI Privacy Pattern", "category": "AI Governance", "description": "Implements privacy-preserving techniques in AI systems including federated learning, differential privacy, and homomorphic encryption to protect sensitive data", "problem": "AI systems require access to sensitive personal data, creating privacy risks, regulatory non-compliance, and potential data breaches", "solution": "Apply privacy-preserving AI techniques like federated learning, differential privacy, secure multi-party computation, and homomorphic encryption", "when_to_use": [ "Processing sensitive personal data", "Multi-party collaborative AI", "Regulatory compliance (GDPR, CCPA)", "Healthcare and financial applications", "Cross-organizational data sharing" ], "benefits": [ "Enhanced data privacy protection", "Regulatory compliance", "Reduced data breach risks", "Enable collaborative AI", "Maintain model utility with privacy" ], "drawbacks": [ "Increased computational complexity", "Potential accuracy trade-offs", "Higher infrastructure costs", "Complex implementation and maintenance" ], "use_cases": [ "Healthcare predictive models", "Financial risk assessment", "Personalized recommendations", "IoT data analytics", "Cross-border data collaboration" ], "related_patterns": ["Bias Mitigation Pattern", "AI Governance Pattern", "Federated Learning"], "complexity": "High", "tags": ["ai-governance", "privacy", "security", "federated-learning", "differential-privacy", "gdpr"], "code_examples": [ { "language": "python", "code": "import numpy as np from sklearn.linear_model import LogisticRegression class DifferentialPrivacy: def __init__(self, epsilon=1.0): self.epsilon = epsilon def add_noise(self, data, sensitivity=1.0): # Add Laplace noise for differential privacy scale = sensitivity / self.epsilon noise = np.random.laplace(0, scale, data.shape) return data + noise def train_private_model(self, X_train, y_train): # Add noise to gradients during training model = LogisticRegression(random_state=42) model.fit(X_train, y_train) return model # Usage dp = DifferentialPrivacy(epsilon=0.5) noisy_data = dp.add_noise(X_train.values) private_model = dp.train_private_model(noisy_data, y_train)" }, { "language": "python", "code": "import numpy as np class FederatedLearning: def __init__(self, num_clients=3): self.num_clients = num_clients self.client_models = [] def split_data(self, X, y): # Split data across clients client_data = [] indices = np.random.permutation(len(X)) chunk_size = len(X) // self.num_clients for i in range(self.num_clients): start_idx = i * chunk_size end_idx = (i + 1) * chunk_size if i < self.num_clients - 1 else len(X) client_indices = indices[start_idx:end_idx] client_data.append((X[client_indices], y[client_indices])) return client_data def aggregate_models(self, client_updates): # Simple federated averaging avg_weights = {} for param_name in client_updates[0].keys(): avg_weights[param_name] = np.mean([ update[param_name] for update in client_updates ], axis=0) return avg_weights # Usage fl = FederatedLearning(num_clients=3) client_data = fl.split_data(X_train, y_train) # Each client would train locally, then send updates # global_model = fl.aggregate_models(client_updates)" } ] }, { "id": "ai-governance-pattern", "name": "AI Governance Pattern", "category": "AI Governance", "description": "Establishes comprehensive governance frameworks for AI systems including policies, monitoring, auditing, and compliance mechanisms to ensure responsible AI development and deployment", "problem": "AI systems lack standardized governance leading to inconsistent practices, ethical concerns, regulatory non-compliance, and potential harm to users and society", "solution": "Implement comprehensive AI governance framework with policies, monitoring, auditing, risk assessment, and continuous improvement processes", "when_to_use": [ "Large-scale AI system development", "High-risk AI applications", "Regulatory compliance requirements", "Multi-stakeholder AI projects", "Enterprise AI governance" ], "benefits": [ "Regulatory compliance", "Risk mitigation", "Ethical AI practices", "Stakeholder trust", "Continuous improvement" ], "drawbacks": [ "High implementation complexity", "Resource intensive", "Continuous monitoring required", "Evolving regulatory landscape" ], "use_cases": [ "Financial services AI systems", "Healthcare AI applications", "Autonomous vehicle systems", "Government AI initiatives", "Enterprise AI platforms" ], "related_patterns": ["Bias Mitigation Pattern", "Fairness Pattern", "Interpretability Pattern", "AI Privacy Pattern"], "complexity": "High", "tags": ["ai-governance", "ethics", "compliance", "risk-management", "responsible-ai", "auditing"], "code_examples": [ { "language": "python", "code": "import datetime from typing import Dict, List class AIGovernanceFramework: def __init__(self, organization_name: str): self.organization_name = organization_name self.policies = {} self.risk_assessments = [] self.audit_logs = [] def define_policies(self, policies: Dict): self.policies = policies self._log_event('POLICY_DEFINITION', f'Defined {len(policies)} policies') def assess_risk(self, model_info: Dict) -> Dict: risk_score = 0.0 if model_info.get('type') == 'deep_learning': risk_score += 0.5 if model_info.get('critical_system'): risk_score += 0.3 assessment = { 'risk_score': risk_score, 'level': 'HIGH' if risk_score > 0.5 else 'LOW', 'recommendations': ['Regular monitoring', 'Bias audits'] if risk_score > 0.3 else [] } self.risk_assessments.append(assessment) return assessment def _log_event(self, event_type: str, description: str): event = { 'timestamp': datetime.datetime.now().isoformat(), 'type': event_type, 'description': description } self.audit_logs.append(event) # Usage framework = AIGovernanceFramework('AI Corp') framework.define_policies({ 'data_governance': 'Encrypt sensitive data', 'model_monitoring': 'Track performance metrics' }) risk = framework.assess_risk({'type': 'deep_learning', 'critical_system': True}) print(f'Risk Level: {risk[\"level\"]}') }, { "language": "python", "code": "import time class PerformanceMonitor: def __init__(self): self.metrics = [] def log_metric(self, metric_name, value): self.metrics.append({ 'timestamp': time.time(), 'metric': metric_name, 'value': value }) def check_thresholds(self, thresholds): alerts = [] for metric in self.metrics[-10:]: # Check recent metrics if metric['metric'] in thresholds: threshold = thresholds[metric['metric']] if metric['value'] > threshold.get('max', float('inf')): alerts.append(f\"{metric['metric']} exceeded threshold\") return alerts # Usage monitor = PerformanceMonitor() monitor.log_metric('accuracy', 0.85) alerts = monitor.check_thresholds({'accuracy': {'max': 0.90}}) print('Alerts:', alerts)" } ] } ] } \"\"\"Define AI governance policies\"\"\" required_policies = [ 'data_governance', 'model_development', 'deployment_standards', 'monitoring_requirements', 'incident_response', 'ethical_guidelines' ] for policy in required_policies: if policy not in policies: raise ValueError(f\"Required policy '{policy}' not defined\") self.framework.policies = policies self.framework.last_updated = datetime.datetime.now().isoformat() # Log policy definition self._log_audit_event( 'POLICY_DEFINITION', f\"Policies defined for {len(policies)} categories\", {'policy_categories': list(policies.keys())} ) def conduct_risk_assessment(self, model_info: Dict, data_info: Dict) -> Dict: \"\"\"Conduct comprehensive risk assessment\"\"\" risk_factors = { 'data_sensitivity': self._assess_data_risk(data_info), 'model_complexity': self._assess_model_risk(model_info), 'deployment_scale': self._assess_deployment_risk(model_info), 'regulatory_compliance': self._assess_regulatory_risk(model_info, data_info) } # Calculate overall risk score overall_risk = sum(risk_factors.values()) / len(risk_factors) assessment = { 'timestamp': datetime.datetime.now().isoformat(), 'model_id': model_info.get('id', 'unknown'), 'risk_factors': risk_factors, 'overall_risk_score': overall_risk, 'risk_level': self._categorize_risk(overall_risk), 'recommendations': self._generate_recommendations(risk_factors) } self.framework.risk_assessments.append(assessment) return assessment def _assess_data_risk(self, data_info: Dict) -> float: \"\"\"Assess data-related risks\"\"\" risk_score = 0.0 # Sensitivity factors if data_info.get('contains_pii', False): risk_score += 0.8 if data_info.get('contains_sensitive_health', False): risk_score += 1.0 if data_info.get('contains_financial', False): risk_score += 0.7 # Volume factors volume = data_info.get('volume', 0) if volume > 1000000: # Large datasets risk_score += 0.3 # Quality factors if data_info.get('quality_score', 1.0) < 0.7: risk_score += 0.4 return min(risk_score, 1.0) def _assess_model_risk(self, model_info: Dict) -> float: \"\"\"Assess model-related risks\"\"\" risk_score = 0.0 # Complexity factors model_type = model_info.get('type', '').lower() if 'deep' in model_type or 'transformer' in model_type: risk_score += 0.6 elif 'ensemble' in model_type: risk_score += 0.3 # Interpretability factors if not model_info.get('interpretable', True): risk_score += 0.5 # Performance factors accuracy = model_info.get('accuracy', 0.5) if accuracy < 0.7: risk_score += 0.4 return min(risk_score, 1.0) def _assess_deployment_risk(self, model_info: Dict) -> float: \"\"\"Assess deployment-related risks\"\"\" risk_score = 0.0 # Scale factors users_affected = model_info.get('potential_users', 0) if users_affected > 100000: risk_score += 0.8 elif users_affected > 10000: risk_score += 0.5 # Criticality factors if model_info.get('critical_system', False): risk_score += 0.7 return min(risk_score, 1.0) def _assess_regulatory_risk(self, model_info: Dict, data_info: Dict) -> float: \"\"\"Assess regulatory compliance risks\"\"\" risk_score = 0.0 # GDPR factors if data_info.get('eu_users', False): risk_score += 0.6 # Industry-specific regulations industry = model_info.get('industry', '').lower() if 'healthcare' in industry: risk_score += 0.8 elif 'finance' in industry: risk_score += 0.7 return min(risk_score, 1.0) def _categorize_risk(self, risk_score: float) -> str: \"\"\"Categorize risk level\"\"\" if risk_score >= 0.8: return 'CRITICAL' elif risk_score >= 0.6: return 'HIGH' elif risk_score >= 0.4: return 'MEDIUM' else: return 'LOW' def _generate_recommendations(self, risk_factors: Dict) -> List[str]: \"\"\"Generate risk mitigation recommendations\"\"\" recommendations = [] if risk_factors['data_sensitivity'] > 0.5: recommendations.append(\"Implement enhanced data protection measures\") recommendations.append(\"Conduct regular privacy impact assessments\") if risk_factors['model_complexity'] > 0.5: recommendations.append(\"Implement model interpretability techniques\") recommendations.append(\"Add comprehensive model validation tests\") if risk_factors['deployment_scale'] > 0.5: recommendations.append(\"Implement gradual rollout strategy\") recommendations.append(\"Set up comprehensive monitoring and alerting\") if risk_factors['regulatory_compliance'] > 0.5: recommendations.append(\"Engage legal and compliance teams early\") recommendations.append(\"Implement audit trails and documentation\") return recommendations def start_monitoring(self, model, data_stream, monitoring_config: Dict): \"\"\"Start continuous monitoring of AI system\"\"\" self.monitoring_active = True # Initialize monitoring components self.performance_monitor = PerformanceMonitor(monitoring_config) self.bias_monitor = BiasMonitor(monitoring_config) self.drift_detector = DriftDetector(monitoring_config) print(\"AI Governance monitoring started\") # In a real implementation, this would run in a separate thread/process # monitoring data streams and model performance def conduct_audit(self, audit_scope: str, auditor: str) -> Dict: \"\"\"Conduct governance audit\"\"\" audit_id = hashlib.sha256( f\"{audit_scope}_{datetime.datetime.now().isoformat()}\".encode() ).hexdigest()[:16] audit = { 'audit_id': audit_id, 'timestamp': datetime.datetime.now().isoformat(), 'scope': audit_scope, 'auditor': auditor, 'findings': self._perform_audit_checks(audit_scope), 'recommendations': [], 'status': 'IN_PROGRESS' } self.framework.audit_logs.append(audit) return audit def _perform_audit_checks(self, scope: str) -> List[Dict]: \"\"\"Perform automated audit checks\"\"\" findings = [] if scope == 'full': # Check policy compliance if not self.framework.policies: findings.append({ 'severity': 'HIGH', 'category': 'POLICY', 'finding': 'No governance policies defined', 'recommendation': 'Define comprehensive AI governance policies' }) # Check recent risk assessments recent_assessments = [ a for a in self.framework.risk_assessments if (datetime.datetime.now() - datetime.datetime.fromisoformat(a['timestamp'])).days < 90 ] if len(recent_assessments) == 0: findings.append({ 'severity': 'MEDIUM', 'category': 'RISK_ASSESSMENT', 'finding': 'No recent risk assessments conducted', 'recommendation': 'Conduct quarterly risk assessments' }) return findings def _log_audit_event(self, event_type: str, description: str, details: Dict = None): \"\"\"Log audit event\"\"\" event = { 'timestamp': datetime.datetime.now().isoformat(), 'event_type': event_type, 'description': description, 'details': details or {} } self.framework.audit_logs.append(event) def generate_compliance_report(self, report_period: str = 'quarterly') -> Dict: \"\"\"Generate compliance report\"\"\" # Calculate compliance metrics total_policies = len(self.framework.policies) total_assessments = len(self.framework.risk_assessments) total_audits = len([a for a in self.framework.audit_logs if 'audit_id' in a]) # Calculate compliance score (simplified) compliance_score = min(100, (total_policies * 20) + (total_assessments * 10) + (total_audits * 15)) report = { 'report_period': report_period, 'generated_date': datetime.datetime.now().isoformat(), 'compliance_score': compliance_score, 'metrics': { 'policies_defined': total_policies, 'risk_assessments': total_assessments, 'audits_conducted': total_audits, 'audit_events': len(self.framework.audit_logs) }, 'status': 'COMPLIANT' if compliance_score >= 70 else 'NON_COMPLIANT' } return report # Usage Example framework = AIGovernanceFramework(organization_name=\"AI Corp\") governance_manager = AIGovernanceManager(framework) # Define governance policies policies = { 'data_governance': { 'data_collection': 'Obtain explicit consent', 'data_retention': 'Minimum necessary period', 'data_security': 'Encryption at rest and in transit' }, 'model_development': { 'version_control': 'Required for all models', 'testing': 'Comprehensive validation required', 'documentation': 'Complete model cards mandatory' }, 'deployment_standards': { ' gradual_rollout': 'Feature flags required', 'monitoring': 'Real-time performance tracking', 'rollback': 'Automated rollback procedures' }, 'monitoring_requirements': { 'performance_metrics': 'Track accuracy, latency, throughput', 'bias_monitoring': 'Regular fairness assessments', 'drift_detection': 'Automated model drift alerts' }, 'incident_response': { 'escalation_procedures': 'Defined response times', 'communication_plan': 'Stakeholder notification protocols', 'recovery_procedures': 'Automated failover systems' }, 'ethical_guidelines': { 'fairness': 'Regular bias audits required', 'transparency': 'Model explanations available', 'accountability': 'Clear responsibility assignment' } } governance_manager.define_policies(policies) # Conduct risk assessment model_info = { 'id': 'fraud_detection_v2', 'type': 'deep_learning', 'accuracy': 0.92, 'industry': 'finance', 'potential_users': 50000, 'critical_system': True } data_info = { 'contains_pii': True, 'contains_financial': True, 'volume': 1000000, 'quality_score': 0.85 } risk_assessment = governance_manager.conduct_risk_assessment(model_info, data_info) print(f\"Risk Assessment Results:\") print(f\"Overall Risk Score: {risk_assessment['overall_risk_score']:.2f}\") print(f\"Risk Level: {risk_assessment['risk_level']}\") print(\"Recommendations:\") for rec in risk_assessment['recommendations']: print(f\" - {rec}\") # Conduct audit audit = governance_manager.conduct_audit('full', 'compliance_team') print(f\"\\nAudit Findings: {len(audit['findings'])}\") # Generate compliance report compliance_report = governance_manager.generate_compliance_report() print(f\"\\nCompliance Score: {compliance_report['compliance_score']}/100\") print(f\"Status: {compliance_report['status']}\")" }, { "language": "python", "code": "import time from typing import Dict, List, Any, Callable import threading import queue class PerformanceMonitor: \"\"\"Real-time performance monitoring for AI systems\"\"\" def __init__(self, config: Dict): self.config = config self.metrics_queue = queue.Queue() self.alerts = [] self.monitoring_thread = None self.is_monitoring = False def start_monitoring(self): \"\"\"Start performance monitoring\"\"\" self.is_monitoring = True self.monitoring_thread = threading.Thread(target=self._monitoring_loop) self.monitoring_thread.daemon = True self.monitoring_thread.start() def stop_monitoring(self): \"\"\"Stop performance monitoring\"\"\" self.is_monitoring = False if self.monitoring_thread: self.monitoring_thread.join(timeout=5) def _monitoring_loop(self): \"\"\"Main monitoring loop\"\"\" while self.is_monitoring: try: # Collect metrics metrics = self._collect_metrics() # Check thresholds alerts = self._check_thresholds(metrics) if alerts: self.alerts.extend(alerts) # Store metrics self.metrics_queue.put(metrics) # Sleep for monitoring interval time.sleep(self.config.get('monitoring_interval', 60)) except Exception as e: print(f\"Monitoring error: {e}\") time.sleep(5) def _collect_metrics(self) -> Dict: \"\"\"Collect performance metrics\"\"\" # In a real implementation, this would collect from actual systems return { 'timestamp': time.time(), 'latency': np.random.normal(100, 10), # Mock latency in ms 'throughput': np.random.normal(1000, 50), # Mock requests per second 'accuracy': np.random.normal(0.95, 0.02), # Mock accuracy 'error_rate': np.random.normal(0.02, 0.005) # Mock error rate } def _check_thresholds(self, metrics: Dict) -> List[Dict]: \"\"\"Check if metrics violate thresholds\"\"\" alerts = [] thresholds = self.config.get('thresholds', {}) for metric_name, value in metrics.items(): if metric_name in thresholds: threshold_config = thresholds[metric_name] if 'max' in threshold_config and value > threshold_config['max']: alerts.append({ 'timestamp': metrics['timestamp'], 'metric': metric_name, 'value': value, 'threshold': threshold_config['max'], 'type': 'MAX_THRESHOLD_VIOLATION', 'severity': threshold_config.get('severity', 'MEDIUM') }) if 'min' in threshold_config and value < threshold_config['min']: alerts.append({ 'timestamp': metrics['timestamp'], 'metric': metric_name, 'value': value, 'threshold': threshold_config['min'], 'type': 'MIN_THRESHOLD_VIOLATION', 'severity': threshold_config.get('severity', 'MEDIUM') }) return alerts def get_recent_metrics(self, hours: int = 1) -> List[Dict]: \"\"\"Get recent metrics\"\"\" cutoff_time = time.time() - (hours * 3600) recent_metrics = [] # Convert queue to list for processing metrics_list = list(self.metrics_queue.queue) for metric in metrics_list: if metric['timestamp'] > cutoff_time: recent_metrics.append(metric) return recent_metrics def get_active_alerts(self) -> List[Dict]: \"\"\"Get currently active alerts\"\"\" return self.alerts[-10:] # Return last 10 alerts class BiasMonitor: \"\"\"Monitor for bias drift in AI systems\"\"\" def __init__(self, config: Dict): self.config = config self.baseline_bias_metrics = {} self.drift_threshold = config.get('drift_threshold', 0.1) def establish_baseline(self, validation_data: pd.DataFrame, sensitive_features: List[str]): \"\"\"Establish baseline bias metrics\"\"\" for feature in sensitive_features: groups = validation_data[feature].unique() for group in groups: mask = validation_data[feature] == group group_data = validation_data[mask] # Calculate baseline metrics (simplified) baseline_metrics = { 'mean_prediction': group_data['prediction'].mean(), 'group_size': len(group_data), 'timestamp': time.time() } self.baseline_bias_metrics[f\"{feature}_{group}\"] = baseline_metrics def detect_bias_drift(self, current_data: pd.DataFrame, sensitive_features: List[str]) -> List[Dict]: \"\"\"Detect bias drift compared to baseline\"\"\" drift_alerts = [] for feature in sensitive_features: groups = current_data[feature].unique() for group in groups: mask = current_data[feature] == group group_data = current_data[mask] baseline_key = f\"{feature}_{group}\" if baseline_key in self.baseline_bias_metrics: baseline = self.baseline_bias_metrics[baseline_key] # Calculate current metrics current_mean = group_data['prediction'].mean() # Check for drift drift = abs(current_mean - baseline['mean_prediction']) if drift > self.drift_threshold: drift_alerts.append({ 'timestamp': time.time(), 'feature': feature, 'group': group, 'baseline_mean': baseline['mean_prediction'], 'current_mean': current_mean, 'drift': drift, 'threshold': self.drift_threshold, 'severity': 'HIGH' if drift > self.drift_threshold * 2 else 'MEDIUM' }) return drift_alerts class DriftDetector: \"\"\"Detect concept drift in AI model performance\"\"\" def __init__(self, config: Dict): self.config = config self.performance_history = [] self.drift_detection_method = config.get('drift_method', 'statistical') def update_performance_history(self, metrics: Dict): \"\"\"Update performance history\"\"\" self.performance_history.append(metrics) # Keep only recent history max_history = self.config.get('max_history', 1000) if len(self.performance_history) > max_history: self.performance_history = self.performance_history[-max_history:] def detect_concept_drift(self) -> Dict: \"\"\"Detect concept drift using statistical methods\"\"\" if len(self.performance_history) < 50: # Need minimum history return {'drift_detected': False, 'confidence': 0.0} # Simple statistical drift detection recent_metrics = self.performance_history[-20:] # Last 20 measurements older_metrics = self.performance_history[-100:-20] # Previous 80 measurements # Compare distributions (simplified) recent_accuracy = np.mean([m.get('accuracy', 0) for m in recent_metrics]) older_accuracy = np.mean([m.get('accuracy', 0) for m in older_metrics]) accuracy_drop = older_accuracy - recent_accuracy # Statistical significance check (simplified) drift_threshold = self.config.get('drift_threshold', 0.05) drift_detected = accuracy_drop > drift_threshold return { 'drift_detected': drift_detected, 'accuracy_drop': accuracy_drop, 'threshold': drift_threshold, 'confidence': min(1.0, accuracy_drop / (drift_threshold * 2)), 'recommendation': 'Retraining recommended' if drift_detected else 'No action needed' } # Usage Example monitoring_config = { 'monitoring_interval': 30, # seconds 'thresholds': { 'latency': {'max': 200, 'severity': 'HIGH'}, 'error_rate': {'max': 0.05, 'severity': 'HIGH'}, 'accuracy': {'min': 0.90, 'severity': 'CRITICAL'} } } # Initialize monitoring components performance_monitor = PerformanceMonitor(monitoring_config) bias_monitor = BiasMonitor({'drift_threshold': 0.1}) drift_detector = DriftDetector({'drift_method': 'statistical', 'drift_threshold': 0.05}) # Start monitoring performance_monitor.start_monitoring() # Establish bias baseline validation_data = pd.DataFrame({ 'feature1': np.random.randn(1000), 'feature2': np.random.randn(1000), 'gender': np.random.choice(['M', 'F'], 1000), 'prediction': np.random.rand(1000) }) bias_monitor.establish_baseline(validation_data, ['gender']) # Simulate monitoring loop for i in range(10): # Simulate new data new_data = pd.DataFrame({ 'feature1': np.random.randn(100), 'feature2': np.random.randn(100), 'gender': np.random.choice(['M', 'F'], 100), 'prediction': np.random.rand(100) }) # Check for bias drift bias_alerts = bias_monitor.detect_bias_drift(new_data, ['gender']) # Update performance history mock_metrics = { 'accuracy': 0.92 + np.random.normal(0, 0.02), 'timestamp': time.time() } drift_detector.update_performance_history(mock_metrics) # Check for concept drift drift_result = drift_detector.detect_concept_drift() print(f\"Iteration {i+1}:\") print(f\" Bias alerts: {len(bias_alerts)}\") print(f\" Concept drift: {drift_result['drift_detected']}\") time.sleep(1) # Get monitoring results recent_metrics = performance_monitor.get_recent_metrics(hours=1) active_alerts = performance_monitor.get_active_alerts() print(f\"\\nMonitoring Summary:\") print(f\"Recent metrics: {len(recent_metrics)}\") print(f\"Active alerts: {len(active_alerts)}\") # Stop monitoring performance_monitor.stop_monitoring()" } ] } ] }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/apolosan/design_patterns_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server