by OpenAdaptAI
# omnimcp/omniparser/ """Deployment module for OmniParser on AWS EC2 with on-demand startup and ALARM-BASED auto-shutdown.""" import datetime import os import subprocess import time import json import io import zipfile from typing import Tuple # Added for type hinting consistency from botocore.exceptions import ClientError from loguru import logger import boto3 import fire import paramiko # Assuming config is imported correctly from omnimcp.config from omnimcp.config import config # Constants for AWS resource names LAMBDA_FUNCTION_NAME = f"{config.PROJECT_NAME}-auto-shutdown" IAM_ROLE_NAME = ( f"{config.PROJECT_NAME}-lambda-role" # Role for the auto-shutdown Lambda ) CLEANUP_ON_FAILURE = False # Set to True to attempt cleanup even if start fails def create_key_pair( key_name: str = config.AWS_EC2_KEY_NAME, key_path: str = config.AWS_EC2_KEY_PATH ) -> str | None: """Create an EC2 key pair.""" ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) try:"Attempting to create key pair: {key_name}") key_pair = ec2_client.create_key_pair(KeyName=key_name) private_key = key_pair["KeyMaterial"] # Ensure directory exists if key_path includes directories os.makedirs(os.path.dirname(key_path), exist_ok=True) with open(key_path, "w") as key_file: key_file.write(private_key) os.chmod(key_path, 0o400) # Set read-only permissions"Key pair {key_name} created and saved to {key_path}") return key_name except ClientError as e: if e.response["Error"]["Code"] == "InvalidKeyPair.Duplicate": logger.warning( f"Key pair '{key_name}' already exists in AWS. Attempting to delete and recreate." ) try: ec2_client.delete_key_pair(KeyName=key_name)"Deleted existing key pair '{key_name}' from AWS.") # Retry creation return create_key_pair(key_name, key_path) except ClientError as e_del: logger.error( f"Failed to delete existing key pair '{key_name}': {e_del}" ) return None else: logger.error(f"Error creating key pair {key_name}: {e}") return None def get_or_create_security_group_id(ports: list[int] = [22, config.PORT]) -> str | None: """Get existing security group or create a new one.""" ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) sg_name = config.AWS_EC2_SECURITY_GROUP ip_permissions = [ { "IpProtocol": "tcp", "FromPort": port, "ToPort": port, "IpRanges": [ {"CidrIp": ""} ], # Allows access from any IP, adjust if needed } for port in ports ] try: response = ec2_client.describe_security_groups(GroupNames=[sg_name]) security_group_id = response["SecurityGroups"][0]["GroupId"]"Security group '{sg_name}' already exists: {security_group_id}") # Ensure desired rules exist (idempotent check) existing_permissions = response["SecurityGroups"][0].get("IpPermissions", []) current_ports_open = set() for perm in existing_permissions: if perm.get("IpProtocol") == "tcp" and any( ip_range == {"CidrIp": ""} for ip_range in perm.get("IpRanges", []) ): current_ports_open.add(perm.get("FromPort")) for required_perm in ip_permissions: port_to_open = required_perm["FromPort"] if port_to_open not in current_ports_open: try: f"Attempting to add inbound rule for port {port_to_open}..." ) ec2_client.authorize_security_group_ingress( GroupId=security_group_id, IpPermissions=[required_perm] )"Added inbound rule for port {port_to_open}") except ClientError as e_auth: # Handle race condition or other errors if ( e_auth.response["Error"]["Code"] == "InvalidPermission.Duplicate" ): f"Rule for port {port_to_open} likely added concurrently or already exists." ) else: logger.error( f"Error adding rule for port {port_to_open}: {e_auth}" ) else:"Rule for port {port_to_open} already exists.") return security_group_id except ClientError as e: if e.response["Error"]["Code"] == "InvalidGroup.NotFound":"Security group '{sg_name}' not found. Creating...") try: response = ec2_client.create_security_group( GroupName=sg_name, Description=f"Security group for {config.PROJECT_NAME} deployment", TagSpecifications=[ { "ResourceType": "security-group", "Tags": [{"Key": "Name", "Value": config.PROJECT_NAME}], } ], ) security_group_id = response["GroupId"] f"Created security group '{sg_name}' with ID: {security_group_id}" ) # Add rules after creation time.sleep(5) # Brief wait for SG propagation ec2_client.authorize_security_group_ingress( GroupId=security_group_id, IpPermissions=ip_permissions )"Added inbound rules for ports {ports}") return security_group_id except ClientError as e_create: logger.error(f"Error creating security group '{sg_name}': {e_create}") return None else: logger.error(f"Error describing security group '{sg_name}': {e}") return None def deploy_ec2_instance( ami: str = config.AWS_EC2_AMI, instance_type: str = config.AWS_EC2_INSTANCE_TYPE, project_name: str = config.PROJECT_NAME, key_name: str = config.AWS_EC2_KEY_NAME, disk_size: int = config.AWS_EC2_DISK_SIZE, ) -> Tuple[str | None, str | None]: """ Deploy a new EC2 instance or start/return an existing usable one. Ignores instances that are shutting-down or terminated. Args: ami: AMI ID to use for the instance. instance_type: EC2 instance type. project_name: Name tag for the instance. key_name: Name of the key pair to use. disk_size: Size of the root volume in GB. Returns: Tuple[str | None, str | None]: Instance ID and public IP if successful, otherwise (None, None). """ ec2 = boto3.resource("ec2", region_name=config.AWS_REGION) ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) key_path = config.AWS_EC2_KEY_PATH # Local path for the key instance_id = None instance_ip = None usable_instance_found = False try: f"Checking for existing usable EC2 instance tagged: Name={project_name}" ) # Filter for states we can potentially reuse or wait for instances = ec2.instances.filter( Filters=[ {"Name": "tag:Name", "Values": [project_name]}, { "Name": "instance-state-name", "Values": ["pending", "running", "stopped"], }, ] ) # Find the most recently launched instance in a usable state sorted_instances = sorted( list(instances), key=lambda i: i.launch_time, reverse=True ) if sorted_instances: candidate_instance = sorted_instances[0] instance_id = state = candidate_instance.state["Name"] f"Found most recent potentially usable instance {instance_id} in state: {state}" ) # Check if local key file exists before trying to use/start instance if not os.path.exists(key_path): logger.error( f"Local SSH key file {key_path} not found for existing instance {instance_id}." ) logger.error( "Cannot proceed with existing instance without the key. Will attempt to create a new instance." ) # Force creation of a new instance by setting usable_instance_found to False usable_instance_found = False # Reset instance_id/ip as we cannot use this one instance_id = None instance_ip = None else: # Key exists, proceed with state handling if state == "running": instance_ip = candidate_instance.public_ip_address if not instance_ip: logger.warning( f"Instance {instance_id} is running but has no public IP. Waiting briefly..." ) try: # Short wait, maybe IP assignment is delayed waiter = ec2_client.get_waiter("instance_running") waiter.wait( InstanceIds=[instance_id], WaiterConfig={"Delay": 5, "MaxAttempts": 6}, ) # Wait up to 30s candidate_instance.reload() instance_ip = candidate_instance.public_ip_address if not instance_ip: raise RuntimeError( "Instance running but failed to get Public IP." ) f"Successfully obtained Public IP for running instance: {instance_ip}" ) usable_instance_found = True except Exception as e_wait_ip: logger.error( f"Failed to get Public IP for running instance {instance_id}: {e_wait_ip}" ) # Fall through to create new instance else: f"Reusing running instance: ID={instance_id}, IP={instance_ip}" ) usable_instance_found = True elif state == "stopped": f"Attempting to start existing stopped instance: ID={instance_id}" ) try: ec2_client.start_instances(InstanceIds=[instance_id]) waiter = ec2_client.get_waiter("instance_running")"Waiting for instance to reach 'running' state...") waiter.wait( InstanceIds=[instance_id], WaiterConfig={"Delay": 15, "MaxAttempts": 40}, ) # Standard wait candidate_instance.reload() instance_ip = candidate_instance.public_ip_address if not instance_ip: raise RuntimeError( f"Instance {instance_id} started but has no public IP." ) f"Instance started successfully: ID={instance_id}, IP={instance_ip}" ) usable_instance_found = True except Exception as e_start: logger.error( f"Failed to start or wait for stopped instance {instance_id}: {e_start}" ) # Fall through to create new instance elif state == "pending": f"Instance {instance_id} is pending. Waiting until running..." ) try: waiter = ec2_client.get_waiter("instance_running") waiter.wait( InstanceIds=[instance_id], WaiterConfig={"Delay": 15, "MaxAttempts": 40}, ) # Standard wait candidate_instance.reload() instance_ip = candidate_instance.public_ip_address if not instance_ip: raise RuntimeError( "Instance reached running state but has no public IP" ) f"Instance now running: ID={instance_id}, IP={instance_ip}" ) usable_instance_found = True except Exception as e_wait: logger.error( f"Error waiting for pending instance {instance_id}: {e_wait}" ) # Fall through to create new instance # --- If usable instance found and prepared, return its details --- if usable_instance_found and instance_id and instance_ip:"Using existing/started instance {instance_id}") return instance_id, instance_ip # --- No usable existing instance found, proceed to create a new one --- "No usable existing instance found or prepared. Creating a new instance..." ) instance_id = None # Reset in case candidate failed instance_ip = None security_group_id = get_or_create_security_group_id() if not security_group_id: logger.error("Unable to get/create security group ID. Aborting deployment.") return None, None # Create new key pair (delete old local file and AWS key pair first) try: key_name_to_use = key_name # Use function arg or config default if os.path.exists(key_path):"Removing existing local key file {key_path}") os.remove(key_path) try: f"Attempting to delete key pair '{key_name_to_use}' from AWS (if exists)..." ) ec2_client.delete_key_pair(KeyName=key_name_to_use)"Deleted existing key pair '{key_name_to_use}' from AWS.") except ClientError as e: # Ignore if key not found, log other errors if e.response["Error"]["Code"] != "InvalidKeyPair.NotFound": logger.warning( f"Could not delete key pair '{key_name_to_use}' from AWS: {e}" ) else:"Key pair '{key_name_to_use}' not found in AWS.") # Create the new key pair if not create_key_pair(key_name_to_use, key_path): raise RuntimeError("Failed to create new key pair") except Exception as e: logger.error(f"Error managing key pair: {e}") return None, None # Create new EC2 instance try: ebs_config = { "DeviceName": "/dev/sda1", "Ebs": { "VolumeSize": disk_size, "VolumeType": "gp3", "DeleteOnTermination": True, "Iops": 3000, "Throughput": 125, }, } f"Launching new EC2 instance (AMI: {ami}, Type: {instance_type})..." ) new_instance_resource = ec2.create_instances( ImageId=ami, MinCount=1, MaxCount=1, InstanceType=instance_type, KeyName=key_name_to_use, SecurityGroupIds=[security_group_id], BlockDeviceMappings=[ebs_config], TagSpecifications=[ { "ResourceType": "instance", "Tags": [{"Key": "Name", "Value": project_name}], }, { "ResourceType": "volume", "Tags": [{"Key": "Name", "Value": f"{project_name}-root-vol"}], }, ], )[0] instance_id ="New instance {instance_id} created. Waiting until running...") new_instance_resource.wait_until_running( WaiterConfig={"Delay": 15, "MaxAttempts": 40} ) new_instance_resource.reload() instance_ip = new_instance_resource.public_ip_address if not instance_ip: raise RuntimeError( f"Instance {instance_id} started but has no public IP." )"New instance running: ID={instance_id}, IP={instance_ip}") return instance_id, instance_ip # Return new instance details except Exception as e: logger.error(f"Failed to create or wait for new EC2 instance: {e}") if instance_id: # If instance was created but failed later try: logger.warning( f"Attempting to terminate partially created/failed instance {instance_id}" ) ec2_client.terminate_instances(InstanceIds=[instance_id])"Issued terminate for {instance_id}") except Exception as term_e: logger.error( f"Failed to terminate failed instance {instance_id}: {term_e}" ) return None, None # Return failure except Exception as outer_e: # Catch any unexpected errors in the overall logic logger.error( f"Unexpected error during instance deployment/discovery: {outer_e}", exc_info=True, ) return None, None # TODO: Wait for Unattended Upgrades: Add an explicit wait or a loop checking # for the lock file (/var/lib/dpkg/lock-frontend) before running apt-get # install. E.g., while sudo fuser /var/lib/dpkg/lock-frontend >/dev/null 2>&1; # do echo 'Waiting for apt lock...'; sleep 10; done. This is more robust. def configure_ec2_instance( instance_id: str, instance_ip: str, max_ssh_retries: int = 20, ssh_retry_delay: int = 20, max_cmd_retries: int = 20, cmd_retry_delay: int = 20, ) -> bool: """Configure the specified EC2 instance (install Docker, etc.).""""Starting configuration for instance {instance_id} at {instance_ip}") try: key_path = config.AWS_EC2_KEY_PATH if not os.path.exists(key_path): logger.error( f"Key file not found at {key_path}. Cannot configure instance." ) return False key = paramiko.RSAKey.from_private_key_file(key_path) except Exception as e: logger.error(f"Failed to load SSH key {key_path}: {e}") return False ssh_client = None # Initialize to None try: ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # --- SSH Connection Logic ---"Attempting SSH connection...") ssh_retries = 0 while ssh_retries < max_ssh_retries: try: ssh_client.connect( hostname=instance_ip, username=config.AWS_EC2_USER, pkey=key, timeout=20, ) logger.success("SSH connection established.") break # Exit loop on success except Exception as e: ssh_retries += 1 logger.warning( f"SSH connection attempt {ssh_retries}/{max_ssh_retries} failed: {e}" ) if ssh_retries < max_ssh_retries: f"Retrying SSH connection in {ssh_retry_delay} seconds..." ) time.sleep(ssh_retry_delay) else: logger.error( "Maximum SSH connection attempts reached. Configuration aborted." ) return False # Return failure # --- Instance Setup Commands --- commands = [ "sudo apt-get update -y", "sudo apt-get install -y ca-certificates curl gnupg apt-transport-https", # Ensure https transport "sudo install -m 0755 -d /etc/apt/keyrings", # Use non-deprecated method for adding Docker GPG key with non-interactive flags "curl -fsSL | sudo gpg --dearmor --batch --yes -o /etc/apt/keyrings/docker.gpg", "sudo chmod a+r /etc/apt/keyrings/docker.gpg", ( # Use lsb_release for codename reliably 'echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] ' ' $(lsb_release -cs) stable" | ' "sudo tee /etc/apt/sources.list.d/docker.list > /dev/null" ), "sudo apt-get update -y", # Install specific components needed "sudo apt-get install -y docker-ce docker-ce-cli docker-buildx-plugin docker-compose-plugin", "sudo systemctl start docker", "sudo systemctl enable docker", # Add user to docker group - requires new login/session to take effect for user directly, but sudo works f"sudo usermod -aG docker {config.AWS_EC2_USER}", ] for command in commands: #"Executing: {command}") # execute_command already logs # Use execute_command helper for better output handling and retries execute_command( ssh_client, command, max_retries=max_cmd_retries, retry_delay=cmd_retry_delay, ) logger.success("Instance OS configuration commands completed.") return True # Configuration successful except Exception as e: logger.error(f"Failed during instance configuration: {e}", exc_info=True) return False # Configuration failed finally: if ssh_client: ssh_client.close()"SSH connection closed during configure_ec2_instance.") def execute_command( ssh_client: paramiko.SSHClient, command: str, max_retries: int = 20, retry_delay: int = 10, timeout: int = config.COMMAND_TIMEOUT, # Use timeout from config ) -> Tuple[int, str, str]: # Return status, stdout, stderr """Execute a command via SSH with retries for specific errors.""" f"Executing SSH command: {command[:100]}{'...' if len(command) > 100 else ''}" ) attempt = 0 while attempt < max_retries: attempt += 1 try: stdin, stdout, stderr = ssh_client.exec_command( command, timeout=timeout, get_pty=False, # Try without PTY first ) # It's crucial to wait for the command to finish *before* reading streams fully exit_status = # Read output streams completely after command exit stdout_output ="utf-8", errors="replace").strip() stderr_output ="utf-8", errors="replace").strip() if stdout_output: logger.debug(f"STDOUT:\n{stdout_output}") if stderr_output: if exit_status == 0: logger.warning(f"STDERR (Exit Status 0):\n{stderr_output}") else: logger.error( f"STDERR (Exit Status {exit_status}):\n{stderr_output}" ) # Check exit status and potential retry conditions if exit_status == 0: logger.success( f"Command successful (attempt {attempt}): {command[:50]}..." ) return exit_status, stdout_output, stderr_output # Success # Specific Retry Condition: dpkg lock if ( "Could not get lock" in stderr_output or "dpkg frontend is locked" in stderr_output ): logger.warning( f"Command failed due to dpkg lock (attempt {attempt}/{max_retries}). Retrying in {retry_delay}s..." ) if attempt < max_retries: time.sleep(retry_delay) continue # Go to next attempt else: # Max retries reached for lock error_msg = f"Command failed after {max_retries} attempts due to dpkg lock: {command}" logger.error(error_msg) raise RuntimeError(error_msg) # Final failure after retries else: # Other non-zero exit status, fail immediately error_msg = f"Command failed with exit status {exit_status} (attempt {attempt}): {command}" logger.error(error_msg) raise RuntimeError(error_msg) # Final failure except Exception as e: # Catch other potential errors like timeouts logger.error(f"Exception during command execution (attempt {attempt}): {e}") if attempt < max_retries:"Retrying command after exception in {retry_delay}s...") time.sleep(retry_delay) else: logger.error( f"Command failed after {max_retries} attempts due to exception: {command}" ) raise # Reraise the last exception # This line should not be reachable if logic is correct raise RuntimeError(f"Command failed after exhausting retries: {command}") # Updated create_auto_shutdown_infrastructure function def create_auto_shutdown_infrastructure(instance_id: str) -> None: """Create CloudWatch Alarm and Lambda function for CPU inactivity based auto-shutdown.""" lambda_client = boto3.client("lambda", region_name=config.AWS_REGION) iam_client = boto3.client("iam", region_name=config.AWS_REGION) cloudwatch_client = boto3.client("cloudwatch", region_name=config.AWS_REGION) role_name = IAM_ROLE_NAME # Use constant lambda_function_name = LAMBDA_FUNCTION_NAME alarm_name = ( f"{config.PROJECT_NAME}-CPU-Low-Alarm-{instance_id}" # Unique alarm name )"Setting up auto-shutdown infrastructure (Alarm-based)...") # --- Create or Get IAM Role --- role_arn = None try: assume_role_policy = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": ""}, "Action": "sts:AssumeRole", } ], }"Attempting to create/get IAM role: {role_name}") response = iam_client.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(assume_role_policy) ) role_arn = response["Role"]["Arn"]"Created IAM role {role_name}. Attaching policies...") # Attach policies needed by Lambda iam_client.attach_role_policy( RoleName=role_name, PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", ) iam_client.attach_role_policy( RoleName=role_name, PolicyArn="arn:aws:iam::aws:policy/AmazonEC2ReadOnlyAccess", ) iam_client.attach_role_policy( RoleName=role_name, PolicyArn="arn:aws:iam::aws:policy/AmazonEC2FullAccess" ) # Consider reducing scope later"Attached policies to IAM role {role_name}")"Waiting for IAM role propagation...") time.sleep(15) # Increased wait time for IAM propagation except ClientError as e: if e.response["Error"]["Code"] == "EntityAlreadyExists":"IAM role {role_name} already exists, retrieving ARN...") try: response = iam_client.get_role(RoleName=role_name) role_arn = response["Role"]["Arn"] # Optional: Verify/attach policies if needed, though typically done at creation except ClientError as get_e: logger.error(f"Failed to get existing IAM role {role_name}: {get_e}") logger.error( "Cannot proceed with auto-shutdown setup without IAM role ARN." ) return # Stop setup else: logger.error(f"Error creating/getting IAM role {role_name}: {e}") logger.error("Cannot proceed with auto-shutdown setup without IAM role.") return # Stop setup if not role_arn: logger.error("Failed to obtain IAM role ARN. Aborting auto-shutdown setup.") return # Inside the lambda_code f-string: lambda_code = """ import boto3 import os import json INSTANCE_ID = os.environ.get('INSTANCE_ID') # AWS_REGION = os.environ.get('AWS_REGION') # <-- Remove this line print(f"Lambda invoked. Checking instance: {INSTANCE_ID}") # Removed region here def lambda_handler(event, context): if not INSTANCE_ID: # <-- Modified check print("Error: INSTANCE_ID environment variable not set.") return {'statusCode': 500, 'body': json.dumps('Configuration error')} # boto3 automatically uses the Lambda execution region if not specified ec2 = boto3.client('ec2') # <-- Removed region_name=AWS_REGION print(f"Inactivity Alarm triggered for instance: {INSTANCE_ID}. Checking state...") # ... rest of the lambda code remains the same ... try: response = ec2.describe_instances(InstanceIds=[INSTANCE_ID]) # ... (existing logic) ... except Exception as e: print(f"Error interacting with EC2 for instance {INSTANCE_ID}: {str(e)}") return {'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}')} """ # --- Create or Update Lambda Function --- lambda_arn = None # Initialize try:"Preparing Lambda function code for {lambda_function_name}...") zip_buffer = io.BytesIO() with zipfile.ZipFile( zip_buffer, "w", zipfile.ZIP_DEFLATED ) as zip_file: # Use 'w' for new zip zip_file.writestr("", lambda_code.encode("utf-8")) zip_content = zip_buffer.getvalue() env_vars = {"Variables": {"INSTANCE_ID": instance_id}} try: f"Checking for existing Lambda function: {lambda_function_name}" ) func_config = lambda_client.get_function_configuration( FunctionName=lambda_function_name ) lambda_arn = func_config["FunctionArn"] # Get ARN if exists"Found existing Lambda. Updating code and configuration...") lambda_client.update_function_code( FunctionName=lambda_function_name, ZipFile=zip_content ) lambda_client.update_function_configuration( FunctionName=lambda_function_name, Role=role_arn, Environment=env_vars, Timeout=30, MemorySize=128, )"Updated Lambda function: {lambda_function_name}") except ClientError as e: if e.response["Error"]["Code"] == "ResourceNotFoundException": f"Lambda function {lambda_function_name} not found. Creating..." ) response = lambda_client.create_function( FunctionName=lambda_function_name, Runtime="python3.9", # Ensure this runtime is supported/desired Role=role_arn, Handler="lambda_function.lambda_handler", Code={"ZipFile": zip_content}, Timeout=30, MemorySize=128, Description=f"Auto-shutdown function for {config.PROJECT_NAME} instance {instance_id}", Environment=env_vars, Tags={ "Project": config.PROJECT_NAME }, # Tag for easier identification ) lambda_arn = response["FunctionArn"]"Created Lambda function: {lambda_arn}") # Need to wait for function to be fully active before creating alarm/permissions"Waiting for Lambda function to become active...") waiter = lambda_client.get_waiter("function_active_v2") waiter.wait(FunctionName=lambda_function_name)"Lambda function is active.") else: raise # Reraise other ClientErrors during get/update/create if not lambda_arn: raise RuntimeError("Failed to get Lambda Function ARN after create/update.") # --- Remove Old CloudWatch Events Rule and Permissions (Idempotent) --- # (Keep this cleanup from previous fix) try: events_client = boto3.client("events", region_name=config.AWS_REGION) rule_name = f"{config.PROJECT_NAME}-inactivity-monitor" f"Attempting to cleanup old Event rule/targets for: {rule_name}" ) try: events_client.remove_targets(Rule=rule_name, Ids=["1"], Force=True) except ClientError as e_rem: logger.debug(f"Ignoring error removing targets: {e_rem}") try: events_client.delete_rule(Name=rule_name) except ClientError as e_del: logger.debug(f"Ignoring error deleting rule: {e_del}") f"Cleaned up old CloudWatch Events rule: {rule_name} (if it existed)" ) except Exception as e_ev_clean: logger.warning(f"Issue during old Event rule cleanup: {e_ev_clean}") try: "Attempting to remove old CloudWatch Events Lambda permission..." ) lambda_client.remove_permission( FunctionName=lambda_function_name, StatementId=f"{config.PROJECT_NAME}-cloudwatch-trigger", )"Removed old CloudWatch Events permission from Lambda.") except ClientError as e_perm: if e_perm.response["Error"]["Code"] != "ResourceNotFoundException": logger.warning(f"Could not remove old Lambda permission: {e_perm}") else:"Old Lambda permission not found.") # --- Create New CloudWatch Alarm --- evaluation_periods = max(1, config.INACTIVITY_TIMEOUT_MINUTES // 5) threshold_cpu = 5.0 f"Setting up CloudWatch alarm '{alarm_name}' for CPU < {threshold_cpu}% over {evaluation_periods * 5} minutes." ) try: # Delete existing alarm first for idempotency try: cloudwatch_client.delete_alarms(AlarmNames=[alarm_name]) f"Deleted potentially existing CloudWatch alarm: {alarm_name}" ) except ClientError as e: if e.response["Error"]["Code"] != "ResourceNotFoundException": logger.warning( f"Could not delete existing alarm {alarm_name} before creation: {e}" ) cloudwatch_client.put_metric_alarm( AlarmName=alarm_name, AlarmDescription=f"Stop EC2 instance {instance_id} if avg CPU < {threshold_cpu}% for {evaluation_periods * 5} mins", ActionsEnabled=True, AlarmActions=[lambda_arn], # Trigger Lambda function MetricName="CPUUtilization", Namespace="AWS/EC2", Statistic="Average", Dimensions=[{"Name": "InstanceId", "Value": instance_id}], Period=300, # 5 minutes period EvaluationPeriods=evaluation_periods, Threshold=threshold_cpu, ComparisonOperator="LessThanThreshold", TreatMissingData="breaching", Tags=[{"Key": "Project", "Value": config.PROJECT_NAME}], ) f"Created/Updated CloudWatch Alarm '{alarm_name}' triggering Lambda on low CPU." ) except Exception as e: logger.error( f"Failed to create/update CloudWatch alarm '{alarm_name}': {e}" ) # Decide if this failure should stop the deployment logger.success( f"Auto-shutdown infrastructure setup attempted for {instance_id=}" ) except Exception as e: logger.error( f"Error setting up auto-shutdown infrastructure: {e}", exc_info=True ) # Do not raise here, allow deployment to continue but log the failure class Deploy: """Class handling deployment operations for OmniParser.""" @staticmethod def start() -> Tuple[str | None, str | None]: # Added return type hint """ Start or configure EC2 instance, setup auto-shutdown, deploy OmniParser container. Returns the public IP and instance ID on success, or (None, None) on failure. """ instance_id = None instance_ip = None ssh_client = None key_path = config.AWS_EC2_KEY_PATH try: # 1. Deploy or find/start EC2 instance"Step 1: Deploying/Starting EC2 Instance...") instance_id, instance_ip = deploy_ec2_instance() if not instance_id or not instance_ip: # deploy_ec2_instance already logs the error raise RuntimeError("Failed to deploy or start EC2 instance") logger.success(f"EC2 instance ready: ID={instance_id}, IP={instance_ip}") # 2. Configure EC2 Instance (Docker etc.)"Step 2: Configuring EC2 Instance (Docker, etc.)...") if not os.path.exists(key_path): logger.error( f"SSH Key not found at {key_path}. Cannot proceed with configuration." ) raise RuntimeError(f"SSH Key missing: {key_path}") config_success = configure_ec2_instance(instance_id, instance_ip) if not config_success: # configure_ec2_instance already logs the error raise RuntimeError("Failed to configure EC2 instance") logger.success("EC2 instance configuration complete.") # 3. Set up Auto-Shutdown Infrastructure (Alarm-based)"Step 3: Setting up Auto-Shutdown Infrastructure...") # This function now handles errors internally and logs them but doesn't stop deployment create_auto_shutdown_infrastructure(instance_id) # Success/failure logged within the function # 4. Trigger Driver Installation via Non-Interactive SSH Login "Step 4: Triggering potential driver install via SSH login (might cause temporary disconnect)..." ) try: Deploy.ssh(non_interactive=True) logger.success("Non-interactive SSH login trigger completed.") except Exception as ssh_e: logger.warning(f"Non-interactive SSH step failed or timed out: {ssh_e}") logger.warning( "Proceeding with Docker deployment, assuming instance is accessible." ) # 5. Copy Dockerfile, .dockerignore"Step 5: Copying Docker related files...") current_dir = os.path.dirname(os.path.abspath(__file__)) files_to_copy = { "Dockerfile": os.path.join(current_dir, "Dockerfile"), ".dockerignore": os.path.join(current_dir, ".dockerignore"), } for filename, filepath in files_to_copy.items(): if os.path.exists(filepath):"Copying {filename} to instance {instance_ip}...") scp_command = [ "scp", "-i", key_path, "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-o", "ConnectTimeout=30", filepath, f"{config.AWS_EC2_USER}@{instance_ip}:~/{filename}", ] result = scp_command, check=False, capture_output=True, text=True, timeout=60, ) if result.returncode != 0: logger.error( f"Failed to copy {filename}: {result.stderr or result.stdout}" ) # Allow continuing even if copy fails? Or raise error? Let's allow for now. else:"Successfully copied {filename}.") else: logger.warning( f"Required file not found: {filepath}. Skipping copy." ) # 6. Connect SSH and Run Setup/Docker Commands "Step 6: Connecting via SSH to run setup and Docker commands..." ) key = paramiko.RSAKey.from_private_key_file(key_path) ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try:"Attempting final SSH connection to {instance_ip}...") ssh_client.connect( hostname=instance_ip, username=config.AWS_EC2_USER, pkey=key, timeout=30, ) logger.success("SSH connected for Docker setup.") setup_commands = [ # Ensure commands are safe and idempotent if possible "rm -rf OmniParser", f"git clone --depth 1 {config.REPO_URL}", "if [ -f ~/Dockerfile ]; then cp ~/Dockerfile ~/OmniParser/; else echo 'Warning: Dockerfile not found in home dir'; fi", "if [ -f ~/.dockerignore ]; then cp ~/.dockerignore ~/OmniParser/; else echo 'Warning: .dockerignore not found in home dir'; fi", ] for command in setup_commands: execute_command(ssh_client, command) docker_commands = [ f"sudo docker rm -f {config.CONTAINER_NAME} || true", f"sudo docker rmi {config.PROJECT_NAME} || true", ( f"cd OmniParser && sudo docker build --progress=plain " f"--no-cache -t {config.PROJECT_NAME} ." ), ( f"sudo docker run -d -p {config.PORT}:{config.PORT} --gpus all --name " f"{config.CONTAINER_NAME} {config.PROJECT_NAME}" ), ] for command in docker_commands: execute_command(ssh_client, command) logger.success("Docker build and run commands executed.") # 7. Wait for Container/Server to Become Responsive "Step 7: Waiting for server inside container to become responsive..." ) max_retries = 30 retry_delay = 10 server_ready = False check_command = ( f"curl -s --fail http://localhost:{config.PORT}/probe/ || exit 1" ) for attempt in range(max_retries): f"Checking server readiness via internal curl (attempt {attempt + 1}/{max_retries})..." ) try: execute_command(ssh_client, check_command, max_retries=1) logger.success("Server is responsive inside instance!") server_ready = True break except Exception as e: logger.warning(f"Server not ready yet (internal check): {e}") if attempt < max_retries - 1: try:"Checking Docker container status...") execute_command( ssh_client, f"sudo docker ps -f name={config.CONTAINER_NAME}", max_retries=1, ) except Exception as ps_e: logger.error(f"Container check failed: {ps_e}")"Waiting {retry_delay} seconds...") time.sleep(retry_delay) if not server_ready: try: logger.error( "Server failed to become responsive. Getting container logs..." ) execute_command( ssh_client, f"sudo docker logs {config.CONTAINER_NAME}" ) except Exception as log_e: logger.error(f"Could not retrieve container logs: {log_e}") raise RuntimeError( f"Server at localhost:{config.PORT} did not become responsive." ) # Final check execute_command( ssh_client, f"sudo docker ps --filter name={config.CONTAINER_NAME}" ) finally: if ssh_client: ssh_client.close()"SSH connection for Docker setup closed.") # 8. Deployment Successful server_url = f"http://{instance_ip}:{config.PORT}" logger.success(f"Deployment complete! Server running at: {server_url}") f"Auto-shutdown configured for inactivity (approx {config.INACTIVITY_TIMEOUT_MINUTES} minutes of low CPU)." ) # Optional: Verify external access try: import requests"Verifying external access to {server_url}/probe/ ...") response = requests.get(f"{server_url}/probe/", timeout=20) response.raise_for_status() logger.success( "Successfully verified external access to /probe/ endpoint." ) except Exception as e: logger.warning(f"Could not verify external access to server: {e}") # Return IP and ID on success return instance_ip, instance_id except Exception as e: logger.error(f"Deployment failed: {e}", exc_info=True) if CLEANUP_ON_FAILURE and instance_id: logger.warning("Attempting cleanup due to deployment failure...") try: Deploy.stop(project_name=config.PROJECT_NAME) except Exception as cleanup_error: logger.error(f"Cleanup after failure also failed: {cleanup_error}") # Return None on failure return None, None @staticmethod def stop( project_name: str = config.PROJECT_NAME, security_group_name: str = config.AWS_EC2_SECURITY_GROUP, ) -> None: """ Initiates termination of EC2 instance(s) and deletion of associated resources (SG, Auto-Shutdown Lambda, CW Alarm, IAM Role). Returns before termination completes. Excludes Discovery API components cleanup. Args: project_name (str): The project name used to tag the instance. security_group_name (str): The name of the security group to delete. """ # 1. Initialize clients ec2_resource = boto3.resource("ec2", region_name=config.AWS_REGION) ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) lambda_client = boto3.client("lambda", region_name=config.AWS_REGION) cloudwatch_client = boto3.client("cloudwatch", region_name=config.AWS_REGION) iam_client = boto3.client("iam", region_name=config.AWS_REGION)"Starting cleanup initiation...") # 2. Initiate EC2 instance termination instances_to_terminate = [] try: instances = ec2_resource.instances.filter( Filters=[ {"Name": "tag:Name", "Values": [project_name]}, { "Name": "instance-state-name", "Values": [ "pending", "running", "shutting-down", # Include shutting-down just in case "stopped", "stopping", ], }, ] ) instance_list = list(instances) if not instance_list: f"No instances found with tag Name={project_name} to terminate." ) else: f"Found {len(instance_list)} instance(s). Initiating termination..." ) for instance in instance_list: f"Initiating termination for instance: ID - {}" ) instances_to_terminate.append( try: instance.terminate() except ClientError as term_error: # Log error but don't stop overall cleanup logger.warning( f"Could not issue terminate for {}: {term_error}" ) if instances_to_terminate: f"Termination initiated for instance(s): {instances_to_terminate}. AWS will complete this in the background." ) # --- REMOVED WAITER BLOCK --- #"Waiting for instance(s) {instances_terminated} to terminate...") # try: # waiter = ec2_client.get_waiter('instance_terminated') # waiter.wait(...) #"Instance(s) {instances_terminated} confirmed terminated.") # except Exception as wait_error: # logger.warning(f"Error or timeout waiting for instance termination: {wait_error}") # logger.warning("Proceeding with cleanup...") except Exception as e: logger.error(f"Error during instance discovery/termination initiation: {e}") # Continue cleanup attempt anyway # 3. Delete CloudWatch Alarms try: alarm_prefix = f"{config.PROJECT_NAME}-CPU-Low-Alarm-" paginator = cloudwatch_client.get_paginator("describe_alarms") alarms_to_delete = []"Searching for CloudWatch alarms with prefix: {alarm_prefix}") for page in paginator.paginate(AlarmNamePrefix=alarm_prefix): for alarm in page.get("MetricAlarms", []): alarms_to_delete.append(alarm["AlarmName"]) alarms_to_delete = list(set(alarms_to_delete)) if alarms_to_delete:"Deleting CloudWatch alarms: {alarms_to_delete}") for i in range(0, len(alarms_to_delete), 100): chunk = alarms_to_delete[i : i + 100] try: cloudwatch_client.delete_alarms(AlarmNames=chunk)"Deleted alarm chunk: {chunk}") except ClientError as delete_alarm_err: logger.error( f"Failed to delete alarm chunk {chunk}: {delete_alarm_err}" ) else:"No matching CloudWatch alarms found to delete.") except Exception as e: logger.error(f"Error searching/deleting CloudWatch alarms: {e}") # 4. Delete Lambda function lambda_function_name = LAMBDA_FUNCTION_NAME try:"Attempting to delete Lambda function: {lambda_function_name}") lambda_client.delete_function(FunctionName=lambda_function_name)"Deleted Lambda function: {lambda_function_name}") except ClientError as e: if e.response["Error"]["Code"] == "ResourceNotFoundException":"Lambda function {lambda_function_name} does not exist.") else: logger.error( f"Error deleting Lambda function {lambda_function_name}: {e}" ) # 5. Delete IAM Role role_name = IAM_ROLE_NAME try:"Attempting to delete IAM role: {role_name}") attached_policies = iam_client.list_attached_role_policies( RoleName=role_name ).get("AttachedPolicies", []) if attached_policies: f"Detaching {len(attached_policies)} managed policies from role {role_name}..." ) for policy in attached_policies: try: iam_client.detach_role_policy( RoleName=role_name, PolicyArn=policy["PolicyArn"] ) logger.debug(f"Detached policy {policy['PolicyArn']}") except ClientError as detach_err: logger.warning( f"Could not detach policy {policy['PolicyArn']}: {detach_err}" ) inline_policies = iam_client.list_role_policies(RoleName=role_name).get( "PolicyNames", [] ) if inline_policies: f"Deleting {len(inline_policies)} inline policies from role {role_name}..." ) for policy_name in inline_policies: try: iam_client.delete_role_policy( RoleName=role_name, PolicyName=policy_name ) logger.debug(f"Deleted inline policy {policy_name}") except ClientError as inline_err: logger.warning( f"Could not delete inline policy {policy_name}: {inline_err}" ) iam_client.delete_role(RoleName=role_name)"Deleted IAM role: {role_name}") except ClientError as e: if e.response["Error"]["Code"] == "NoSuchEntity":"IAM role {role_name} does not exist.") elif e.response["Error"]["Code"] == "DeleteConflict": logger.error( f"Cannot delete IAM role {role_name} due to dependencies: {e}" ) else: logger.error(f"Error deleting IAM role {role_name}: {e}") # 6. Delete Security Group # Might still fail if instance termination hasn't fully released ENIs, # but we don't wait for termination anymore. Manual cleanup might be needed sometimes. sg_delete_wait = 5 # Shorter wait now, as we aren't waiting for termination f"Waiting {sg_delete_wait} seconds before attempting security group deletion..." ) time.sleep(sg_delete_wait) try:"Attempting to delete security group: {security_group_name}") ec2_client.delete_security_group(GroupName=security_group_name)"Deleted security group: {security_group_name}") except ClientError as e: if e.response["Error"]["Code"] == "InvalidGroup.NotFound":"Security group {security_group_name} not found.") elif e.response["Error"]["Code"] == "DependencyViolation": logger.warning( f"Could not delete security group {security_group_name} due to existing dependencies (likely ENI from terminating instance). AWS will clean it up later, or run stop again after a few minutes. Error: {e}" ) else: logger.error( f"Error deleting security group {security_group_name}: {e}" ) "Cleanup initiation finished. Instance termination proceeds in background." ) @staticmethod def status() -> None: """Check the status of deployed instances.""" ec2 = boto3.resource("ec2", region_name=config.AWS_REGION) instances = ec2.instances.filter( Filters=[{"Name": "tag:Name", "Values": [config.PROJECT_NAME]}] ) for instance in instances: public_ip = instance.public_ip_address if public_ip: server_url = f"http://{public_ip}:{config.PORT}" f"Instance ID: {}, State: {instance.state['Name']}, " f"URL: {server_url}" ) else: f"Instance ID: {}, State: {instance.state['Name']}, " f"URL: Not available (no public IP)" ) # Check auto-shutdown infrastructure lambda_client = boto3.client("lambda", region_name=config.AWS_REGION) try: lambda_response = lambda_client.get_function( FunctionName=LAMBDA_FUNCTION_NAME )"Auto-shutdown Lambda: {LAMBDA_FUNCTION_NAME} (Active)") logger.debug(f"{lambda_response=}") except ClientError:"Auto-shutdown Lambda: Not configured") @staticmethod def ssh(non_interactive: bool = False) -> None: # Get instance IP ec2 = boto3.resource("ec2", region_name=config.AWS_REGION) instances = ec2.instances.filter( Filters=[ {"Name": "tag:Name", "Values": [config.PROJECT_NAME]}, {"Name": "instance-state-name", "Values": ["running"]}, ] ) instance = next(iter(instances), None) if not instance: logger.error("No running instance found") return ip = instance.public_ip_address if not ip: logger.error("Instance has no public IP") return # Check if key file exists if not os.path.exists(config.AWS_EC2_KEY_PATH): logger.error(f"Key file not found: {config.AWS_EC2_KEY_PATH}") return if non_interactive: # Trigger driver installation (this might cause reboot) ssh_command = [ "ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-i", config.AWS_EC2_KEY_PATH, f"{config.AWS_EC2_USER}@{ip}", "-t", "-tt", "bash --login -c 'exit'", ] try:, check=True)"Initial SSH login completed successfully") except subprocess.CalledProcessError as e: logger.warning(f"Initial SSH connection closed: {e}") # Wait for potential reboot to complete "Waiting for instance to be fully available after potential reboot..." ) max_attempts = 20 attempt = 0 while attempt < max_attempts: attempt += 1"SSH connection attempt {attempt}/{max_attempts}") try: # Check if we can make a new SSH connection test_ssh_cmd = [ "ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=5", "-o", "UserKnownHostsFile=/dev/null", "-i", config.AWS_EC2_KEY_PATH, f"{config.AWS_EC2_USER}@{ip}", "echo 'SSH connection successful'", ] result = test_ssh_cmd, capture_output=True, text=True ) if result.returncode == 0:"Instance is ready for SSH connections") return except Exception: pass time.sleep(10) # Wait 10 seconds between attempts logger.error("Failed to reconnect to instance after potential reboot") else: # Interactive SSH session ssh_command = f"ssh -i {config.AWS_EC2_KEY_PATH} -o StrictHostKeyChecking=no {config.AWS_EC2_USER}@{ip}""Connecting with: {ssh_command}") os.system(ssh_command) return @staticmethod def stop_instance(instance_id: str) -> None: """Stop a specific EC2 instance.""" ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) try: ec2_client.stop_instances(InstanceIds=[instance_id])"Stopped instance {instance_id}") except ClientError as e: logger.error(f"Error stopping instance {instance_id}: {e}") @staticmethod def start_instance(instance_id: str) -> str: """Start a specific EC2 instance and return its public IP.""" ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) ec2_resource = boto3.resource("ec2", region_name=config.AWS_REGION) try: ec2_client.start_instances(InstanceIds=[instance_id])"Starting instance {instance_id}...") instance = ec2_resource.Instance(instance_id) instance.wait_until_running() instance.reload() f"Instance {instance_id} started, IP: {instance.public_ip_address}" ) return instance.public_ip_address except ClientError as e: logger.error(f"Error starting instance {instance_id}: {e}") return None @staticmethod def history(days: int = 7) -> None: """Display deployment and auto-shutdown history. Args: days: Number of days of history to retrieve (default: 7) """"Retrieving {days} days of deployment history...") # Calculate time range end_time = start_time = end_time - datetime.timedelta(days=days) # Initialize AWS clients cloudwatch_logs = boto3.client("logs", region_name=config.AWS_REGION) ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) # Get instance information instances = [] try: response = ec2_client.describe_instances( Filters=[{"Name": "tag:Name", "Values": [config.PROJECT_NAME]}] ) for reservation in response["Reservations"]: instances.extend(reservation["Instances"]) f"Found {len(instances)} instances with name tag '{config.PROJECT_NAME}'" ) except Exception as e: logger.error(f"Error retrieving instances: {e}") # Display instance state transition history"\n=== Instance State History ===") for instance in instances: instance_id = instance["InstanceId"] try: # Get instance state transition history response = ec2_client.describe_instance_status( InstanceIds=[instance_id], IncludeAllInstances=True ) state = instance["State"]["Name"] launch_time = instance.get("LaunchTime", "Unknown") f"Instance {instance_id}: Current state={state}, Launch time={launch_time}" ) # Get instance console output if available try: console = ec2_client.get_console_output(InstanceId=instance_id) if "Output" in console and console["Output"]:"Last console output (truncated):") # Show last few lines of console output lines = console["Output"].strip().split("\n") for line in lines[-10:]:" {line}") except Exception as e:"Console output not available: {e}") except Exception as e: logger.error(f"Error retrieving status for instance {instance_id}: {e}") # Check for Lambda auto-shutdown logs"\n=== Auto-shutdown Lambda Logs ===") try: # Check if log group exists log_group_name = f"/aws/lambda/{LAMBDA_FUNCTION_NAME}" log_streams = cloudwatch_logs.describe_log_streams( logGroupName=log_group_name, orderBy="LastEventTime", descending=True, limit=5, ) if not log_streams.get("logStreams"):"No log streams found for auto-shutdown Lambda") else: # Process the most recent log streams for stream in log_streams.get("logStreams", [])[:5]: stream_name = stream["logStreamName"]"Log stream: {stream_name}") logs = cloudwatch_logs.get_log_events( logGroupName=log_group_name, logStreamName=stream_name, startTime=int(start_time.timestamp() * 1000), endTime=int(end_time.timestamp() * 1000), limit=100, ) if not logs.get("events"):" No events in this stream") continue for event in logs.get("events", []): timestamp = datetime.datetime.fromtimestamp( event["timestamp"] / 1000 ) message = event["message"]" {timestamp}: {message}") except cloudwatch_logs.exceptions.ResourceNotFoundException: "No logs found for auto-shutdown Lambda. It may not have been triggered yet." ) except Exception as e: logger.error(f"Error retrieving Lambda logs: {e}")"\nHistory retrieval complete.") @staticmethod def discover() -> dict: """Discover instances by tag and optionally start them if stopped. Returns: dict: Information about the discovered instance including status and connection details """ ec2 = boto3.resource("ec2", region_name=config.AWS_REGION) # Find instance with project tag instances = list( ec2.instances.filter( Filters=[ {"Name": "tag:Name", "Values": [config.PROJECT_NAME]}, { "Name": "instance-state-name", "Values": ["pending", "running", "stopped"], }, ] ) ) if not instances:"No instances found") return {"status": "not_found"} instance = instances[0] # Get the first matching instance"Found instance {} in state {instance.state['Name']}") # If instance is stopped, start it if instance.state["Name"] == "stopped":"Starting stopped instance {}") instance.start() return { "instance_id":, "status": "starting", "message": "Instance is starting. Please try again in a few minutes.", } # Return info for running instance if instance.state["Name"] == "running": return { "instance_id":, "public_ip": instance.public_ip_address, "status": instance.state["Name"], "api_url": f"http://{instance.public_ip_address}:{config.PORT}", } # Instance is in another state (e.g., pending) return { "instance_id":, "status": instance.state["Name"], "message": f"Instance is {instance.state['Name']}. Please try again shortly.", } if __name__ == "__main__": # Ensure boto3 clients use the region from config if set # Note: Boto3 usually picks region from env vars or ~/.aws/config first if config.AWS_REGION: boto3.setup_default_session(region_name=config.AWS_REGION) fire.Fire(Deploy)