class DBService:
"""
Create a read-only account, execute the SQL statements, and automatically delete the account afterward.
"""
def __init__(self,
region_id,
instance_id,
database=None, ):
self.instance_id = instance_id
self.database = database
self.region_id = region_id
self.__db_type = None
self.__account_name, self.__account_password = get_rds_account()
self.__host = None
self.__port = None
self.__client = get_rds_client(region_id)
self.__db_conn = None
async def __aenter__(self):
await asyncio.to_thread(self._get_db_instance_info)
if not self.__account_name or not self.__account_password:
await asyncio.to_thread(self._create_temp_account)
if self.database:
await asyncio.to_thread(self._grant_privilege)
else:
self.account_name = self.__account_name
self.account_password = self.__account_password
self.__db_conn = DBConn(self)
await asyncio.to_thread(self.__db_conn.connect)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.__db_conn is not None:
await asyncio.to_thread(self.__db_conn.close)
if not self.__account_name or not self.__account_password:
await asyncio.to_thread(self._delete_account)
self.__client = None
def _get_db_instance_info(self):
req = rds_20140815_models.DescribeDBInstanceAttributeRequest(
dbinstance_id=self.instance_id,
)
self.__client.describe_dbinstance_attribute(req)
resp = self.__client.describe_dbinstance_attribute(req)
self.db_type = resp.body.items.dbinstance_attribute[0].engine.lower()
req = rds_20140815_models.DescribeDBInstanceNetInfoRequest(
dbinstance_id=self.instance_id,
)
resp = self.__client.describe_dbinstance_net_info(req)
# 取支持的地址:
vpc_host, vpc_port, public_host, public_port, dbtype = None, None, None, None, None
net_infos = resp.body.dbinstance_net_infos.dbinstance_net_info
for item in net_infos:
if 'Private' == item.iptype:
vpc_host = item.connection_string
vpc_port = int(item.port)
elif 'Public' in item.iptype:
public_host = item.connection_string
public_port = int(item.port)
if vpc_host and test_connect(vpc_host, vpc_port):
self.host = vpc_host
self.port = vpc_port
elif public_host and test_connect(public_host, public_port):
self.host = public_host
self.port = public_port
else:
raise Exception('connection db failed.')
def _create_temp_account(self):
self.account_name = 'mcp_' + random_str(10)
self.account_password = random_password(32)
request = rds_20140815_models.CreateAccountRequest(
dbinstance_id=self.instance_id,
account_name=self.account_name,
account_password=self.account_password,
account_description="Created by mcp for execute sql."
)
self.__client.create_account(request)
def _grant_privilege(self):
req = rds_20140815_models.GrantAccountPrivilegeRequest(
dbinstance_id=self.instance_id,
account_name=self.account_name,
dbname=self.database,
account_privilege="ReadOnly" if self.db_type.lower() in ('mysql', 'postgresql') else "DBOwner"
)
self.__client.grant_account_privilege(req)
def _delete_account(self):
if not self.account_name:
return
req = rds_20140815_models.DeleteAccountRequest(
dbinstance_id=self.instance_id,
account_name=self.account_name
)
self.__client.delete_account(req)
async def execute_sql(self, sql):
return await asyncio.to_thread(self.__db_conn.execute_sql, sql)
@property
def user(self):
return self.account_name
@property
def password(self):
return self.account_password