Skip to main content
Glama
aws-samples

CFM Tips - Cost Optimization MCP Server

by aws-samples
service_orchestrator.py16.6 kB
""" Service Orchestrator for CFM Tips MCP Server Coordinates parallel execution of AWS service calls with session storage. """ import logging import time from typing import Dict, List, Any, Optional, Callable from datetime import datetime from .session_manager import get_session_manager from .parallel_executor import get_parallel_executor, create_task, TaskResult logger = logging.getLogger(__name__) class ServiceOrchestrator: """Orchestrates AWS service calls with parallel execution and session storage.""" def __init__(self, session_id: Optional[str] = None): self.session_manager = get_session_manager() self.parallel_executor = get_parallel_executor() self.session_id = session_id or self.session_manager.create_session() logger.info(f"ServiceOrchestrator initialized with session {self.session_id}") def execute_parallel_analysis(self, service_calls: List[Dict[str, Any]], store_results: bool = True, timeout: float = 60.0) -> Dict[str, Any]: """ Execute multiple AWS service calls in parallel and optionally store results. Args: service_calls: List of service call definitions store_results: Whether to store results in session database timeout: Maximum time to wait for all tasks Returns: Dictionary containing execution results and summary """ start_time = time.time() # Create parallel tasks tasks = [] for i, call_def in enumerate(service_calls): task_id = f"{call_def['service']}_{call_def['operation']}_{i}_{int(time.time())}" task = create_task( task_id=task_id, service=call_def['service'], operation=call_def['operation'], function=call_def['function'], args=call_def.get('args', ()), kwargs=call_def.get('kwargs', {}), timeout=call_def.get('timeout', 30.0), priority=call_def.get('priority', 1) ) tasks.append(task) # Submit tasks for parallel execution task_ids = self.parallel_executor.submit_batch(tasks) # Wait for completion results = self.parallel_executor.wait_for_tasks(task_ids, timeout) # Process results execution_summary = { 'total_tasks': len(task_ids), 'successful': 0, 'failed': 0, 'timeout': 0, 'total_execution_time': time.time() - start_time, 'results': {} } stored_tables = [] for task_id, result in results.items(): execution_summary['results'][task_id] = { 'service': result.service, 'operation': result.operation, 'status': result.status, 'execution_time': result.execution_time, 'error': result.error } # Update counters if result.status == 'success': execution_summary['successful'] += 1 # Store successful results in session database if store_results and result.data: # Create a safe table name by removing special characters safe_service = ''.join(c for c in result.service if c.isalnum() or c == '_') safe_operation = ''.join(c for c in result.operation if c.isalnum() or c == '_') table_name = f"{safe_service}_{safe_operation}_{int(result.timestamp.timestamp())}" # Convert result data to list of dictionaries if needed data_to_store = self._prepare_data_for_storage(result.data, result.service, result.operation) if data_to_store: success = self.session_manager.store_data( self.session_id, table_name, data_to_store ) if success: stored_tables.append(table_name) execution_summary['results'][task_id]['stored_table'] = table_name else: logger.warning(f"Failed to store data for task {task_id}") elif result.status == 'timeout': execution_summary['timeout'] += 1 else: execution_summary['failed'] += 1 execution_summary['stored_tables'] = stored_tables logger.info(f"Parallel analysis completed: {execution_summary['successful']}/{execution_summary['total_tasks']} successful") return execution_summary def _prepare_data_for_storage(self, data: Any, service: str, operation: str) -> List[Dict[str, Any]]: """Prepare data for storage in session database with consistent 'value' column format.""" try: import json # Always store data in a consistent format with 'value' column containing JSON if isinstance(data, dict): if 'data' in data and isinstance(data['data'], dict): # Check if data contains lists of items (like underutilized_instances) data_dict = data['data'] result = [] # Look for common list fields that should be stored as individual records list_fields = ['underutilized_instances', 'unused_volumes', 'idle_instances', 'underutilized_functions'] for field in list_fields: if field in data_dict and isinstance(data_dict[field], list): # Store each item in the list as a separate record for item in data_dict[field]: result.append({ 'value': json.dumps(item), 'service': service, 'operation': operation, 'item_type': field }) break # Only process the first matching field # If no list fields found, store the entire data dict if not result: result.append({ 'value': json.dumps(data_dict), 'service': service, 'operation': operation }) return result elif 'data' in data and isinstance(data['data'], list): # Standard service response format - store each item as JSON in value column result = [] for item in data['data']: result.append({ 'value': json.dumps(item), 'service': service, 'operation': operation }) return result else: # Store entire dictionary as JSON return [{ 'value': json.dumps(data), 'service': service, 'operation': operation }] elif isinstance(data, list): # Store each list item as JSON in value column result = [] for i, item in enumerate(data): result.append({ 'value': json.dumps(item), 'service': service, 'operation': operation, 'index': i }) return result else: # Single value - store as JSON return [{ 'value': json.dumps(data), 'service': service, 'operation': operation }] except Exception as e: logger.error(f"Error preparing data for storage: {e}") return [] def query_session_data(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]: """Execute SQL query on session data.""" try: return self.session_manager.execute_query(self.session_id, query, params) except Exception as e: logger.error(f"Error executing session query: {e}") return [] def get_stored_tables(self) -> List[str]: """Get list of tables stored in the session.""" try: results = self.query_session_data( "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" ) return [row['name'] for row in results] except Exception as e: logger.error(f"Error getting stored tables: {e}") return [] def _fix_wildcard_query(self, query_sql: str) -> str: """Fix wildcard table names in SQL queries by replacing with actual table names.""" try: # Get all table names in the session stored_tables = self.get_stored_tables() # Common wildcard patterns to replace wildcard_patterns = [ 'ec2_underutilized_instances_*', 'ebs_underutilized_volumes_*', 'ebs_unused_volumes_*', 'rds_underutilized_instances_*', 'rds_idle_instances_*', 'lambda_underutilized_functions_*', 'lambda_unused_functions_*' ] fixed_query = query_sql for pattern in wildcard_patterns: if pattern in fixed_query: # Find matching table names prefix = pattern.replace('_*', '_') matching_tables = [table for table in stored_tables if table.startswith(prefix)] if matching_tables: # Use the first matching table (most recent) actual_table = matching_tables[0] fixed_query = fixed_query.replace(pattern, f'"{actual_table}"') logger.debug(f"Replaced {pattern} with {actual_table}") else: # If no matching table, create a dummy query that returns empty results logger.warning(f"No matching table found for pattern {pattern}") # Replace the entire FROM clause with a dummy table fixed_query = fixed_query.replace( f"FROM {pattern}", "FROM (SELECT NULL as value WHERE 1=0)" ) return fixed_query except Exception as e: logger.error(f"Error fixing wildcard query: {e}") return query_sql def aggregate_results(self, aggregation_queries: List[Dict[str, str]]) -> Dict[str, Any]: """Execute aggregation queries across stored data.""" aggregated_results = {} for query_def in aggregation_queries: query_name = query_def['name'] query_sql = query_def['query'] try: # Fix wildcard patterns in the query fixed_query = self._fix_wildcard_query(query_sql) results = self.query_session_data(fixed_query) aggregated_results[query_name] = { 'status': 'success', 'data': results, 'row_count': len(results) } except Exception as e: aggregated_results[query_name] = { 'status': 'error', 'error': str(e), 'data': [] } logger.error(f"Error executing aggregation query '{query_name}': {e}") return aggregated_results def create_comprehensive_report(self, service_calls: List[Dict[str, Any]], aggregation_queries: Optional[List[Dict[str, str]]] = None) -> Dict[str, Any]: """Create a comprehensive report with parallel execution and aggregation.""" # Execute parallel analysis execution_results = self.execute_parallel_analysis(service_calls) # Execute aggregation queries if provided aggregated_data = {} if aggregation_queries: aggregated_data = self.aggregate_results(aggregation_queries) # Create comprehensive report report = { 'report_metadata': { 'session_id': self.session_id, 'generated_at': datetime.now().isoformat(), 'execution_summary': execution_results, 'stored_tables': self.get_stored_tables() }, 'service_results': execution_results['results'], 'aggregated_analysis': aggregated_data, 'recommendations': self._generate_recommendations(execution_results, aggregated_data) } return report def _generate_recommendations(self, execution_results: Dict[str, Any], aggregated_data: Dict[str, Any]) -> List[Dict[str, Any]]: """Generate recommendations based on analysis results.""" recommendations = [] # Analyze execution results for patterns successful_services = [] failed_services = [] for task_id, result in execution_results['results'].items(): if result['status'] == 'success': successful_services.append(result['service']) else: failed_services.append(result['service']) # Generate recommendations based on successful analyses if successful_services: recommendations.append({ 'type': 'analysis_success', 'priority': 'info', 'title': 'Successful Service Analysis', 'description': f"Successfully analyzed {len(set(successful_services))} AWS services", 'services': list(set(successful_services)) }) # Generate recommendations for failed analyses if failed_services: recommendations.append({ 'type': 'analysis_failure', 'priority': 'warning', 'title': 'Failed Service Analysis', 'description': f"Failed to analyze {len(set(failed_services))} AWS services", 'services': list(set(failed_services)), 'action': 'Review service permissions and configuration' }) # Add aggregation-based recommendations for query_name, query_result in aggregated_data.items(): if query_result['status'] == 'success' and query_result['row_count'] > 0: recommendations.append({ 'type': 'data_insight', 'priority': 'medium', 'title': f'Data Available: {query_name}', 'description': f"Found {query_result['row_count']} records for analysis", 'query': query_name }) return recommendations def cleanup_session(self): """Clean up the current session.""" try: self.session_manager.close_session(self.session_id) logger.info(f"Cleaned up session {self.session_id}") except Exception as e: logger.error(f"Error cleaning up session {self.session_id}: {e}") def get_session_info(self) -> Dict[str, Any]: """Get information about the current session.""" return self.session_manager.get_session_info(self.session_id)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server