import psycopg2
import psycopg2.extras
from .base import BaseDB
class PostgresDB(BaseDB):
def _connect(self):
return psycopg2.connect(
host=self.conn_params['server'],
dbname=self.conn_params['database'],
user=self.conn_params['user'],
password=self.conn_params['password']
)
def execute_query(self, query: str):
conn = self._connect()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cursor.execute(query)
# Commit se não for SELECT
if not query.strip().lower().startswith('select'):
conn.commit()
try:
rows = cursor.fetchall()
result = [dict(row) for row in rows]
except Exception:
result = []
conn.close()
return result
def insert_record(self, table: str, data: dict):
cols = ",".join(data.keys())
vals = ",".join(["%s"] * len(data))
query = f"INSERT INTO {table} ({cols}) VALUES ({vals})"
conn = self._connect()
cursor = conn.cursor()
cursor.execute(query, tuple(data.values()))
conn.commit()
conn.close()
def list_tables(self):
conn = self._connect()
cursor = conn.cursor()
cursor.execute("SELECT tablename FROM pg_tables WHERE schemaname='public'")
tables = [row[0] for row in cursor.fetchall()]
conn.close()
return tables
def get_schema(self):
conn = self._connect()
cursor = conn.cursor()
cursor.execute("""
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public'
""")
rows = cursor.fetchall()
conn.close()
return "\n".join([f"{r[0]}.{r[1]} ({r[2]})" for r in rows])