def get_instance_storage_info(self, instance_id: str) -> Dict:
"""
获取实例的存储详细信息
Args:
instance_id (str): EC2实例ID
Returns:
Dict: 存储信息或错误信息
"""
if not self.available:
return {
'error': f'AWS服务不可用: {getattr(self, "error", "未知错误")}',
'provider': 'aws'
}
try:
# 获取实例信息
instance_response = self.ec2.describe_instances(InstanceIds=[instance_id])
if not instance_response['Reservations']:
return {
'error': f'未找到ID为 {instance_id} 的EC2实例',
'provider': 'aws'
}
instance = instance_response['Reservations'][0]['Instances'][0]
# 获取卷信息
storage_info = []
for block_device in instance.get('BlockDeviceMappings', []):
volume_id = block_device.get('Ebs', {}).get('VolumeId')
if volume_id:
volume_response = self.ec2.describe_volumes(VolumeIds=[volume_id])
if volume_response['Volumes']:
volume = volume_response['Volumes'][0]
# 获取IOPS和吞吐量信息
iops = volume.get('Iops', 'N/A')
throughput = volume.get('Throughput', 'N/A')
storage_info.append({
'device_name': block_device.get('DeviceName'),
'volume_id': volume_id,
'volume_type': volume.get('VolumeType'),
'size': volume.get('Size'),
'iops': iops,
'throughput': throughput,
'encrypted': volume.get('Encrypted', False),
'state': volume.get('State'),
'created_time': volume.get('CreateTime').isoformat() if volume.get('CreateTime') else None
})
return {
'provider': 'aws',
'instance_id': instance_id,
'storage_devices': storage_info,
'total_devices': len(storage_info)
}
except ClientError as e:
return {
'error': f'AWS API调用失败: {str(e)}',
'provider': 'aws'
}
except Exception as e:
return {
'error': f'获取存储信息时发生错误: {str(e)}',
'provider': 'aws'
}