db.py•1.74 kB
##? Import Libraries
import asyncpg
from pydantic import Field
## At last import business logic
from utils.config import config
## DEFn_: Function to return a connection to the database
async def get_db_connection():
""" Function to return a connection to the database """
try:
return await asyncpg.connect(
host = config["PostgreSQL"]["HOST"],
database = config["PostgreSQL"]["DB"],
user = config["PostgreSQL"]["USER"],
password = config["PostgreSQL"]["PASSWORD"],
port = config["PostgreSQL"]["PORT"]
)
except Exception as e:
raise e
#print(f"Note: unable to get connection instance. Happened in get_db_connection function. \n Error: {e}", file=sys.stderr)
## NOTE: Here we can have sentry alert sitting to capture this failure and report.
## DEFn_: Function to receive a query and execute on the database
async def execute_sql_query(query:str = Field(...,description="Query to be executed by SQL connection",min_length=2))->list[dict]:
"""
Objective:
Function to receive a SQL query and execute it on the database.
Only intended to read by SELECT/EXPLAIN kind of queries.
Args:
query (str): The query to execute on the database
Returns:
list[dict]: The result of the query in a list
"""
conn = await get_db_connection()
try:
records = await conn.fetch(query)
return [dict(record) for record in records]
except Exception as e:
raise e
## NOTE: Here we can have sentry alert sitting to capture this failure and report.
finally:
## The connection always closes
await conn.close()