Skip to main content
Glama
aws-samples

CFM Tips - Cost Optimization MCP Server

by aws-samples
governance_analyzer.py43.2 kB
""" S3 Governance Analyzer This analyzer performs S3 governance compliance checking to ensure buckets have appropriate lifecycle policies and comply with organizational standards. Analyzes: - Buckets missing lifecycle policies - Versioned buckets without lifecycle policies for non-current versions - Buckets missing multipart upload cleanup rules - Compliance with organizational governance standards """ import asyncio import logging from datetime import datetime, timedelta from typing import Dict, List, Any, Optional from ..base_analyzer import BaseAnalyzer logger = logging.getLogger(__name__) class GovernanceAnalyzer(BaseAnalyzer): """ Analyzer for S3 governance compliance checking. Validates bucket configurations against governance standards and identifies compliance violations with specific remediation steps. """ def __init__(self, s3_service=None, pricing_service=None, storage_lens_service=None): """ Initialize GovernanceAnalyzer. Args: s3_service: S3Service instance for AWS S3 operations pricing_service: S3Pricing instance for cost calculations storage_lens_service: StorageLensService instance for Storage Lens data """ super().__init__(s3_service, pricing_service, storage_lens_service) self.analysis_type = "governance" # Default governance standards self.default_standards = { "require_lifecycle_policies": True, "require_versioning_lifecycle": True, "require_multipart_cleanup": True, "max_multipart_age_days": 7, "require_encryption": False, # Optional standard "require_public_access_block": False, # Optional standard "allowed_storage_classes": [ "STANDARD", "STANDARD_IA", "ONEZONE_IA", "GLACIER", "DEEP_ARCHIVE", "INTELLIGENT_TIERING" ] } async def analyze(self, **kwargs) -> Dict[str, Any]: """ Execute S3 governance compliance analysis. Args: **kwargs: Analysis parameters including: - region: AWS region to analyze - bucket_names: Optional list of specific buckets to analyze - governance_standards: Optional custom governance standards - include_remediation: Whether to include detailed remediation steps Returns: Dictionary containing governance compliance analysis results """ context = self.prepare_analysis_context(**kwargs) try: self.logger.info(f"Starting S3 governance compliance analysis for region: {context.get('region', 'all')}") # Initialize results structure analysis_results = { "status": "success", "analysis_type": self.analysis_type, "context": context, "data": { "governance_violations": [], "compliant_buckets": [], "violation_summary": {}, "compliance_score": 0, "total_buckets_analyzed": 0, "standards_applied": {} }, "data_sources": [], "execution_time": 0, "timestamp": datetime.now().isoformat() } start_time = datetime.now() # Get governance standards (custom or default) governance_standards = kwargs.get('governance_standards', self.default_standards) analysis_results["data"]["standards_applied"] = governance_standards # Get list of buckets to analyze buckets_to_analyze = await self._get_buckets_to_analyze(context) if not buckets_to_analyze: self.logger.warning("No buckets found to analyze") analysis_results["data"]["total_buckets_analyzed"] = 0 return analysis_results analysis_results["data"]["total_buckets_analyzed"] = len(buckets_to_analyze) analysis_results["data_sources"].append("s3_api") # Execute governance checks in parallel governance_tasks = [ self._check_bucket_governance(bucket_name, governance_standards) for bucket_name in buckets_to_analyze ] # Execute all governance checks bucket_results = await asyncio.gather(*governance_tasks, return_exceptions=True) # Process results violations = [] compliant_buckets = [] violation_counts = {} for i, result in enumerate(bucket_results): bucket_name = buckets_to_analyze[i] if isinstance(result, Exception): self.logger.warning(f"Governance check failed for bucket {bucket_name}: {result}") violations.append({ "bucket_name": bucket_name, "violation_type": "analysis_error", "severity": "high", "description": f"Failed to analyze bucket: {str(result)}", "remediation_steps": [ "Check bucket permissions and access", "Verify bucket exists and is accessible", "Review IAM permissions for governance checks" ] }) continue if result.get("status") == "success": bucket_violations = result.get("violations", []) if bucket_violations: violations.extend(bucket_violations) # Count violation types for violation in bucket_violations: violation_type = violation.get("violation_type", "unknown") violation_counts[violation_type] = violation_counts.get(violation_type, 0) + 1 else: compliant_buckets.append({ "bucket_name": bucket_name, "compliance_checks_passed": result.get("checks_passed", []), "last_checked": datetime.now().isoformat() }) # Store results analysis_results["data"]["governance_violations"] = violations analysis_results["data"]["compliant_buckets"] = compliant_buckets analysis_results["data"]["violation_summary"] = violation_counts # Calculate compliance score total_buckets = len(buckets_to_analyze) compliant_count = len(compliant_buckets) compliance_score = (compliant_count / total_buckets * 100) if total_buckets > 0 else 100 analysis_results["data"]["compliance_score"] = round(compliance_score, 2) # Calculate execution time end_time = datetime.now() analysis_results["execution_time"] = (end_time - start_time).total_seconds() self.logger.info( f"Completed S3 governance analysis in {analysis_results['execution_time']:.2f} seconds. " f"Compliance score: {compliance_score:.1f}% ({compliant_count}/{total_buckets} buckets compliant)" ) return analysis_results except Exception as e: self.logger.error(f"Error in governance analysis: {str(e)}") return self.handle_analysis_error(e, context) async def _get_buckets_to_analyze(self, context: Dict[str, Any]) -> List[str]: """ Get list of buckets to analyze. Args: context: Analysis context Returns: List of bucket names to analyze """ try: # If specific buckets are requested, use those if context.get('bucket_names'): self.logger.info(f"Analyzing specific buckets: {context['bucket_names']}") return context['bucket_names'] # Otherwise, get all buckets if not self.s3_service: self.logger.error("S3Service not available for bucket discovery") return [] buckets_result = await self.s3_service.list_buckets() if buckets_result.get("status") == "success": # Fix: S3Service returns buckets under data.Buckets, not buckets buckets_data = buckets_result.get("data", {}).get("Buckets", []) bucket_names = [bucket["Name"] for bucket in buckets_data] self.logger.info(f"Found {len(bucket_names)} buckets to analyze") return bucket_names else: self.logger.error(f"Failed to list buckets: {buckets_result.get('message', 'Unknown error')}") return [] except Exception as e: self.logger.error(f"Error getting buckets to analyze: {str(e)}") return [] async def _check_bucket_governance(self, bucket_name: str, standards: Dict[str, Any]) -> Dict[str, Any]: """ Check governance compliance for a single bucket. Args: bucket_name: Name of the bucket to check standards: Governance standards to apply Returns: Dictionary containing governance check results """ try: self.logger.debug(f"Checking governance compliance for bucket: {bucket_name}") violations = [] checks_passed = [] # Execute all governance checks in parallel tasks = [ self._check_lifecycle_policy(bucket_name, standards), self._check_versioning_lifecycle(bucket_name, standards), self._check_multipart_cleanup(bucket_name, standards) ] # Add optional checks if enabled if standards.get("require_encryption"): tasks.append(self._check_encryption(bucket_name, standards)) if standards.get("require_public_access_block"): tasks.append(self._check_public_access_block(bucket_name, standards)) # Execute all checks check_results = await asyncio.gather(*tasks, return_exceptions=True) # Process check results for result in check_results: if isinstance(result, Exception): self.logger.warning(f"Governance check failed for {bucket_name}: {result}") violations.append({ "bucket_name": bucket_name, "violation_type": "check_error", "severity": "medium", "description": f"Governance check failed: {str(result)}", "remediation_steps": [ "Verify bucket permissions", "Check IAM policies for governance operations" ] }) continue if result.get("compliant"): checks_passed.append(result.get("check_type")) else: violations.append(result.get("violation")) return { "status": "success", "bucket_name": bucket_name, "violations": violations, "checks_passed": checks_passed, "total_checks": len(tasks), "compliant": len(violations) == 0 } except Exception as e: self.logger.error(f"Error checking governance for bucket {bucket_name}: {str(e)}") return { "status": "error", "bucket_name": bucket_name, "error": str(e) } async def _check_lifecycle_policy(self, bucket_name: str, standards: Dict[str, Any]) -> Dict[str, Any]: """ Check if bucket has lifecycle policy (Requirement 8.1). Args: bucket_name: Name of the bucket to check standards: Governance standards Returns: Dictionary containing lifecycle policy check result """ try: if not standards.get("require_lifecycle_policies", True): return {"compliant": True, "check_type": "lifecycle_policy", "skipped": True} if not self.s3_service: raise Exception("S3Service not available") lifecycle_result = await self.s3_service.get_bucket_lifecycle(bucket_name) if lifecycle_result.get("status") == "success": lifecycle_config = lifecycle_result.get("lifecycle_configuration") if lifecycle_config and lifecycle_config.get("Rules"): # Bucket has lifecycle policy return { "compliant": True, "check_type": "lifecycle_policy", "details": { "rules_count": len(lifecycle_config["Rules"]), "has_lifecycle_policy": True } } else: # Bucket missing lifecycle policy return { "compliant": False, "check_type": "lifecycle_policy", "violation": { "bucket_name": bucket_name, "violation_type": "missing_lifecycle_policy", "severity": "medium", "description": f"Bucket '{bucket_name}' does not have a lifecycle policy configured", "remediation_steps": [ f"Configure lifecycle policy for bucket '{bucket_name}'", "Define rules for transitioning objects to cheaper storage classes", "Set expiration rules for temporary or log data", "Consider intelligent tiering for unpredictable access patterns" ], "aws_cli_example": f"aws s3api put-bucket-lifecycle-configuration --bucket {bucket_name} --lifecycle-configuration file://lifecycle-policy.json" } } else: # Error getting lifecycle configuration error_msg = lifecycle_result.get("message", "Unknown error") if "NoSuchLifecycleConfiguration" in error_msg: # No lifecycle policy configured return { "compliant": False, "check_type": "lifecycle_policy", "violation": { "bucket_name": bucket_name, "violation_type": "missing_lifecycle_policy", "severity": "medium", "description": f"Bucket '{bucket_name}' does not have a lifecycle policy configured", "remediation_steps": [ f"Configure lifecycle policy for bucket '{bucket_name}'", "Define rules for transitioning objects to cheaper storage classes", "Set expiration rules for temporary or log data" ] } } else: raise Exception(f"Failed to get lifecycle configuration: {error_msg}") except Exception as e: self.logger.error(f"Error checking lifecycle policy for {bucket_name}: {str(e)}") return { "compliant": False, "check_type": "lifecycle_policy", "violation": { "bucket_name": bucket_name, "violation_type": "lifecycle_check_error", "severity": "low", "description": f"Could not verify lifecycle policy for bucket '{bucket_name}': {str(e)}", "remediation_steps": [ "Check IAM permissions for s3:GetLifecycleConfiguration", "Verify bucket exists and is accessible" ] } } async def _check_versioning_lifecycle(self, bucket_name: str, standards: Dict[str, Any]) -> Dict[str, Any]: """ Check if versioned buckets have lifecycle policies for non-current versions (Requirement 8.2). Args: bucket_name: Name of the bucket to check standards: Governance standards Returns: Dictionary containing versioning lifecycle check result """ try: if not standards.get("require_versioning_lifecycle", True): return {"compliant": True, "check_type": "versioning_lifecycle", "skipped": True} if not self.s3_service: raise Exception("S3Service not available") # Check if versioning is enabled versioning_result = await self.s3_service.get_bucket_versioning(bucket_name) if versioning_result.get("status") != "success": error_msg = versioning_result.get("message", "Unknown error") raise Exception(f"Failed to get versioning configuration: {error_msg}") versioning_config = versioning_result.get("versioning_configuration", {}) versioning_status = versioning_config.get("Status", "Disabled") if versioning_status not in ["Enabled", "Suspended"]: # Versioning not enabled, check passes return { "compliant": True, "check_type": "versioning_lifecycle", "details": { "versioning_enabled": False, "check_not_applicable": True } } # Versioning is enabled, check for lifecycle policy with non-current version rules lifecycle_result = await self.s3_service.get_bucket_lifecycle(bucket_name) if lifecycle_result.get("status") == "success": lifecycle_config = lifecycle_result.get("lifecycle_configuration") if lifecycle_config and lifecycle_config.get("Rules"): # Check if any rule handles non-current versions has_noncurrent_rules = False for rule in lifecycle_config["Rules"]: if (rule.get("NoncurrentVersionTransitions") or rule.get("NoncurrentVersionExpiration")): has_noncurrent_rules = True break if has_noncurrent_rules: return { "compliant": True, "check_type": "versioning_lifecycle", "details": { "versioning_enabled": True, "has_noncurrent_version_rules": True } } else: return { "compliant": False, "check_type": "versioning_lifecycle", "violation": { "bucket_name": bucket_name, "violation_type": "missing_versioning_lifecycle", "severity": "high", "description": f"Versioned bucket '{bucket_name}' lacks lifecycle rules for non-current versions", "remediation_steps": [ f"Add non-current version lifecycle rules to bucket '{bucket_name}'", "Configure NoncurrentVersionTransition to move old versions to cheaper storage", "Set NoncurrentVersionExpiration to delete very old versions", "Consider keeping 2-5 recent versions and transitioning older ones" ], "cost_impact": "High - Non-current versions accumulate storage costs without cleanup" } } else: # No lifecycle policy at all return { "compliant": False, "check_type": "versioning_lifecycle", "violation": { "bucket_name": bucket_name, "violation_type": "missing_versioning_lifecycle", "severity": "high", "description": f"Versioned bucket '{bucket_name}' has no lifecycle policy for version management", "remediation_steps": [ f"Create lifecycle policy for versioned bucket '{bucket_name}'", "Add rules for non-current version transitions and expiration", "Prevent unlimited accumulation of object versions" ], "cost_impact": "High - Unlimited version accumulation can cause significant cost increases" } } else: # Error or no lifecycle configuration error_msg = lifecycle_result.get("message", "Unknown error") if "NoSuchLifecycleConfiguration" in error_msg: return { "compliant": False, "check_type": "versioning_lifecycle", "violation": { "bucket_name": bucket_name, "violation_type": "missing_versioning_lifecycle", "severity": "high", "description": f"Versioned bucket '{bucket_name}' has no lifecycle policy", "remediation_steps": [ f"Create lifecycle policy for versioned bucket '{bucket_name}'", "Add rules for non-current version management" ] } } else: raise Exception(f"Failed to get lifecycle configuration: {error_msg}") except Exception as e: self.logger.error(f"Error checking versioning lifecycle for {bucket_name}: {str(e)}") return { "compliant": False, "check_type": "versioning_lifecycle", "violation": { "bucket_name": bucket_name, "violation_type": "versioning_lifecycle_check_error", "severity": "low", "description": f"Could not verify versioning lifecycle for bucket '{bucket_name}': {str(e)}", "remediation_steps": [ "Check IAM permissions for s3:GetBucketVersioning and s3:GetLifecycleConfiguration", "Verify bucket exists and is accessible" ] } } async def _check_multipart_cleanup(self, bucket_name: str, standards: Dict[str, Any]) -> Dict[str, Any]: """ Check if bucket has lifecycle rules for incomplete multipart uploads (Requirement 8.3). Args: bucket_name: Name of the bucket to check standards: Governance standards Returns: Dictionary containing multipart cleanup check result """ try: if not standards.get("require_multipart_cleanup", True): return {"compliant": True, "check_type": "multipart_cleanup", "skipped": True} if not self.s3_service: raise Exception("S3Service not available") # Check for lifecycle policy with multipart upload cleanup lifecycle_result = await self.s3_service.get_bucket_lifecycle(bucket_name) if lifecycle_result.get("status") == "success": lifecycle_config = lifecycle_result.get("lifecycle_configuration") if lifecycle_config and lifecycle_config.get("Rules"): # Check if any rule handles incomplete multipart uploads has_multipart_rules = False max_age_days = standards.get("max_multipart_age_days", 7) for rule in lifecycle_config["Rules"]: abort_config = rule.get("AbortIncompleteMultipartUpload") if abort_config and abort_config.get("DaysAfterInitiation"): days_after = abort_config["DaysAfterInitiation"] if days_after <= max_age_days: has_multipart_rules = True break if has_multipart_rules: return { "compliant": True, "check_type": "multipart_cleanup", "details": { "has_multipart_cleanup_rules": True, "max_age_compliant": True } } else: # Check if there are any incomplete multipart uploads multipart_result = await self.s3_service.get_multipart_uploads(bucket_name, max_results=1) has_incomplete_uploads = False if (multipart_result.get("status") == "success" and multipart_result.get("uploads")): has_incomplete_uploads = len(multipart_result["uploads"]) > 0 severity = "high" if has_incomplete_uploads else "medium" description = ( f"Bucket '{bucket_name}' lacks multipart upload cleanup rules" + (" and has incomplete uploads" if has_incomplete_uploads else "") ) return { "compliant": False, "check_type": "multipart_cleanup", "violation": { "bucket_name": bucket_name, "violation_type": "missing_multipart_cleanup", "severity": severity, "description": description, "remediation_steps": [ f"Add multipart upload cleanup rule to bucket '{bucket_name}' lifecycle policy", f"Set AbortIncompleteMultipartUpload to {max_age_days} days or less", "Prevent accumulation of incomplete upload storage costs", "Consider setting cleanup to 1-3 days for most use cases" ], "has_incomplete_uploads": has_incomplete_uploads, "aws_cli_example": f"aws s3api put-bucket-lifecycle-configuration --bucket {bucket_name} --lifecycle-configuration file://multipart-cleanup-policy.json" } } else: # No lifecycle policy at all return { "compliant": False, "check_type": "multipart_cleanup", "violation": { "bucket_name": bucket_name, "violation_type": "missing_multipart_cleanup", "severity": "medium", "description": f"Bucket '{bucket_name}' has no lifecycle policy for multipart upload cleanup", "remediation_steps": [ f"Create lifecycle policy for bucket '{bucket_name}'", "Add AbortIncompleteMultipartUpload rule", "Set cleanup period to 7 days or less" ] } } else: # Error or no lifecycle configuration error_msg = lifecycle_result.get("message", "Unknown error") if "NoSuchLifecycleConfiguration" in error_msg: return { "compliant": False, "check_type": "multipart_cleanup", "violation": { "bucket_name": bucket_name, "violation_type": "missing_multipart_cleanup", "severity": "medium", "description": f"Bucket '{bucket_name}' has no lifecycle policy for multipart cleanup", "remediation_steps": [ f"Create lifecycle policy for bucket '{bucket_name}'", "Add multipart upload cleanup rules" ] } } else: raise Exception(f"Failed to get lifecycle configuration: {error_msg}") except Exception as e: self.logger.error(f"Error checking multipart cleanup for {bucket_name}: {str(e)}") return { "compliant": False, "check_type": "multipart_cleanup", "violation": { "bucket_name": bucket_name, "violation_type": "multipart_cleanup_check_error", "severity": "low", "description": f"Could not verify multipart cleanup for bucket '{bucket_name}': {str(e)}", "remediation_steps": [ "Check IAM permissions for s3:GetLifecycleConfiguration and s3:ListMultipartUploads", "Verify bucket exists and is accessible" ] } } async def _check_encryption(self, bucket_name: str, standards: Dict[str, Any]) -> Dict[str, Any]: """ Check if bucket has encryption enabled (optional governance check). Args: bucket_name: Name of the bucket to check standards: Governance standards Returns: Dictionary containing encryption check result """ try: if not self.s3_service: raise Exception("S3Service not available") # This would require implementing get_bucket_encryption in S3Service # For now, return a placeholder return { "compliant": True, "check_type": "encryption", "details": { "check_implemented": False, "note": "Encryption check not yet implemented" } } except Exception as e: self.logger.error(f"Error checking encryption for {bucket_name}: {str(e)}") return { "compliant": True, "check_type": "encryption", "details": {"error": str(e)} } async def _check_public_access_block(self, bucket_name: str, standards: Dict[str, Any]) -> Dict[str, Any]: """ Check if bucket has public access block enabled (optional governance check). Args: bucket_name: Name of the bucket to check standards: Governance standards Returns: Dictionary containing public access block check result """ try: if not self.s3_service: raise Exception("S3Service not available") # This would require implementing get_public_access_block in S3Service # For now, return a placeholder return { "compliant": True, "check_type": "public_access_block", "details": { "check_implemented": False, "note": "Public access block check not yet implemented" } } except Exception as e: self.logger.error(f"Error checking public access block for {bucket_name}: {str(e)}") return { "compliant": True, "check_type": "public_access_block", "details": {"error": str(e)} } def get_recommendations(self, analysis_results: Dict[str, Any]) -> List[Dict[str, Any]]: """ Generate governance recommendations from analysis results. Args: analysis_results: Results from the analyze method Returns: List of governance recommendation dictionaries """ recommendations = [] try: data = analysis_results.get("data", {}) violations = data.get("governance_violations", []) violation_summary = data.get("violation_summary", {}) compliance_score = data.get("compliance_score", 100) # High-level compliance recommendation if compliance_score < 100: recommendations.append( self.create_recommendation( rec_type="governance_compliance", priority="high" if compliance_score < 50 else "medium", title=f"Improve S3 Governance Compliance (Current: {compliance_score:.1f}%)", description=( f"Your S3 infrastructure has a governance compliance score of {compliance_score:.1f}%. " f"Address {len(violations)} governance violations to improve cost control and operational efficiency." ), implementation_effort="medium", action_items=[ "Review and address all governance violations", "Implement standardized lifecycle policies across buckets", "Establish governance monitoring and alerting", "Create governance policy templates for new buckets" ] ) ) # Specific recommendations by violation type for violation_type, count in violation_summary.items(): if violation_type == "missing_lifecycle_policy": recommendations.append( self.create_recommendation( rec_type="lifecycle_policy", priority="medium", title=f"Implement Lifecycle Policies ({count} buckets affected)", description=( f"{count} buckets are missing lifecycle policies, which can lead to " "unnecessary storage costs and poor data management." ), implementation_effort="low", action_items=[ "Create standardized lifecycle policy templates", "Apply lifecycle policies to all buckets without them", "Set up automated policy application for new buckets", "Monitor lifecycle policy effectiveness" ] ) ) elif violation_type == "missing_versioning_lifecycle": recommendations.append( self.create_recommendation( rec_type="versioning_lifecycle", priority="high", title=f"Add Version Management Rules ({count} buckets affected)", description=( f"{count} versioned buckets lack proper version management, " "which can cause exponential storage cost growth." ), potential_savings=count * 50.0, # Estimate $50/month per bucket implementation_effort="low", action_items=[ "Add NoncurrentVersionTransition rules to move old versions to cheaper storage", "Set NoncurrentVersionExpiration to delete very old versions", "Consider keeping only 2-5 recent versions", "Monitor version accumulation and costs" ] ) ) elif violation_type == "missing_multipart_cleanup": recommendations.append( self.create_recommendation( rec_type="multipart_cleanup", priority="medium", title=f"Implement Multipart Upload Cleanup ({count} buckets affected)", description=( f"{count} buckets lack multipart upload cleanup rules, " "which can accumulate storage costs from incomplete uploads." ), potential_savings=count * 10.0, # Estimate $10/month per bucket implementation_effort="low", action_items=[ "Add AbortIncompleteMultipartUpload rules to lifecycle policies", "Set cleanup period to 7 days or less", "Monitor incomplete multipart uploads", "Educate teams on proper multipart upload practices" ] ) ) # Bucket-specific recommendations for high-severity violations high_severity_buckets = [ v for v in violations if v.get("severity") == "high" ] if high_severity_buckets: bucket_names = list(set(v.get("bucket_name") for v in high_severity_buckets)) recommendations.append( self.create_recommendation( rec_type="urgent_governance", priority="high", title=f"Urgent Governance Issues ({len(bucket_names)} buckets)", description=( f"High-severity governance violations detected in buckets: {', '.join(bucket_names[:5])}" + (f" and {len(bucket_names) - 5} more" if len(bucket_names) > 5 else "") ), affected_resources=bucket_names, implementation_effort="medium", action_items=[ "Immediately review high-severity violations", "Prioritize versioned buckets without version management", "Implement emergency lifecycle policies where needed", "Set up monitoring for future violations" ] ) ) # Best practices recommendation if compliance_score > 80: recommendations.append( self.create_recommendation( rec_type="governance_best_practices", priority="low", title="Enhance Governance Best Practices", description=( "Your governance compliance is good. Consider implementing " "advanced governance features for even better cost control." ), implementation_effort="medium", action_items=[ "Implement automated governance monitoring", "Set up cost allocation tags for better tracking", "Consider AWS Config rules for continuous compliance", "Establish governance metrics and reporting" ] ) ) except Exception as e: self.logger.error(f"Error generating governance recommendations: {str(e)}") recommendations.append( self.create_recommendation( rec_type="error_resolution", priority="medium", title="Governance Analysis Error", description=f"Error generating recommendations: {str(e)}", implementation_effort="low", action_items=[ "Review governance analysis logs", "Check AWS permissions and connectivity", "Retry governance analysis" ] ) ) return recommendations

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server