def get_instance_monitoring_data(self, instance_id: str, hours: int = 1) -> Dict:
"""
获取实例的监控数据
Args:
instance_id (str): EC2实例ID
hours (int): 获取过去多少小时的数据
Returns:
Dict: 监控数据或错误信息
"""
if not self.available:
return {
'error': f'AWS服务不可用: {getattr(self, "error", "未知错误")}',
'provider': 'aws'
}
try:
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=hours)
metrics = ['CPUUtilization', 'NetworkIn', 'NetworkOut', 'DiskReadOps', 'DiskWriteOps']
monitoring_data = {}
for metric in metrics:
try:
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName=metric,
Dimensions=[
{
'Name': 'InstanceId',
'Value': instance_id
}
],
StartTime=start_time,
EndTime=end_time,
Period=300, # 5分钟
Statistics=['Average', 'Maximum']
)
monitoring_data[metric] = {
'datapoints': len(response['Datapoints']),
'data': response['Datapoints']
}
except Exception as e:
monitoring_data[metric] = {
'error': str(e),
'datapoints': 0
}
return {
'provider': 'aws',
'instance_id': instance_id,
'time_range': f'{hours}小时',
'metrics': monitoring_data
}
except ClientError as e:
return {
'error': f'AWS API调用失败: {str(e)}',
'provider': 'aws'
}
except Exception as e:
return {
'error': f'获取监控数据时发生错误: {str(e)}',
'provider': 'aws'
}