"""
๋ค์ค ๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ด๋ฆฌ ๋ชจ๋
MySQL, PostgreSQL, Oracle ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ๊ณผ ์ฟผ๋ฆฌ ์คํ์ ๊ด๋ฆฌํฉ๋๋ค.
"""
import logging
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
import pymysql
from sqlalchemy import create_engine, text, MetaData, Table, Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from config import config
logger = logging.getLogger(__name__)
class DatabaseProvider(ABC):
"""๋ฐ์ดํฐ๋ฒ ์ด์ค Provider ์ถ์ ํด๋์ค"""
def __init__(self):
self.engine = None
self.session_factory = None
self.metadata = MetaData()
@abstractmethod
def _initialize_connection(self):
"""๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ์ ์ด๊ธฐํํฉ๋๋ค."""
pass
@abstractmethod
def _setup_connection(self, conn):
"""๋ฐ์ดํฐ๋ฒ ์ด์ค๋ณ ์ฐ๊ฒฐ ์ค์ ์ ์ํํฉ๋๋ค."""
pass
@abstractmethod
def get_table_schema(self, table_name: str) -> Dict[str, Any]:
"""ํ
์ด๋ธ ์คํค๋ง ์ ๋ณด๋ฅผ ๋ฐํํฉ๋๋ค."""
pass
@abstractmethod
def get_table_list(self, database_name: str = None) -> List[Dict[str, str]]:
"""๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ชจ๋ ํ
์ด๋ธ ๋ชฉ๋ก์ ๋ฐํํฉ๋๋ค."""
pass
@abstractmethod
def get_database_info(self) -> Dict[str, Any]:
"""๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๋ณด๋ฅผ ๋ฐํํฉ๋๋ค."""
pass
def constructor(self):
"""๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ์ ์ด๊ธฐํํฉ๋๋ค."""
self._initialize_connection()
def is_connected(self) -> bool:
"""๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ์ํ๋ฅผ ํ์ธํฉ๋๋ค."""
return self.engine is not None
def execute_query(self, query: str) -> List[Dict[str, Any]]:
"""SQL ์ฟผ๋ฆฌ๋ฅผ ์คํํ๊ณ ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํฉ๋๋ค."""
if not self.is_connected():
raise Exception("๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐ๋์ง ์์์ต๋๋ค.")
try:
with self.engine.connect() as conn:
result = conn.execute(text(query))
# ๊ฒฐ๊ณผ๋ฅผ ๋์
๋๋ฆฌ ๋ฆฌ์คํธ๋ก ๋ณํ
columns = result.keys()
rows = []
for row in result.fetchall():
# ๊ฐ ํ์ ๋ฐ์ดํฐ๋ฅผ UTF-8๋ก ์ ๋ฆฌ
cleaned_row = {}
for col, value in zip(columns, row):
cleaned_row[col] = self._clean_value(value)
rows.append(cleaned_row)
# 1~100๋ฒ์งธ ํ๋ง ์ถ๋ ฅ, 101๋ฒ์งธ๋ '...' ์ถ๋ ฅ
logger.debug("์ฟผ๋ฆฌ ์คํ ๊ฒฐ๊ณผ: \n")
max_log_rows = 100
for idx, row in enumerate(rows):
if idx < max_log_rows:
if(idx < len(rows) - 1):
logger.debug(f"[{idx+1:03}] {row}")
else:
logger.debug(f"[{idx+1:03}] {row}\n")
elif idx == max_log_rows:
logger.debug(f"[{idx+1:03}] ...(์ดํ ์๋ต)\n")
break
logger.info(f"์ฟผ๋ฆฌ ์คํ ์ฑ๊ณต: {len(rows)}๊ฐ ํ ๋ฐํ")
return rows
except Exception as e:
logger.error(f"์ฟผ๋ฆฌ ์คํ ์คํจ: {e}")
raise Exception(f"์ฟผ๋ฆฌ ์คํ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {e}")
def _clean_value(self, value):
"""๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ฐ์์ UTF-8 ์ธ์ฝ๋ฉ ๋ฌธ์ ์ ๋ค์ํ ๋ฐ์ดํฐ ํ์
์ ํด๊ฒฐํฉ๋๋ค."""
if value is None:
return None
try:
# ๋ ์ง/์๊ฐ ํ์
์ ๋ฌธ์์ด๋ก ๋ณํ (JSON ์ง๋ ฌํ๋ฅผ ์ํด)
import datetime
if isinstance(value, (datetime.date, datetime.datetime)):
return value.isoformat()
# Decimal ํ์
์ float๋ก ๋ณํ
from decimal import Decimal
if isinstance(value, Decimal):
return float(value)
# ๋ฐ์ด๋๋ฆฌ ๋ฐ์ดํฐ๋ฅผ 16์ง์ ๋ฌธ์์ด๋ก ๋ณํ
if isinstance(value, bytes):
return value.hex()
# MySQL/PostgreSQL/Oracle ํน์ ํ์
์ฒ๋ฆฌ
# UUID ํ์
์ ๋ฌธ์์ด๋ก ๋ณํ
if hasattr(value, '__class__') and 'uuid' in str(value.__class__).lower():
return str(value)
# JSON ํ์
์ ๋์
๋๋ฆฌ๋ก ๋ณํ (PostgreSQL JSONB ๋ฑ)
if hasattr(value, '__class__') and 'json' in str(value.__class__).lower():
try:
return value if isinstance(value, (dict, list)) else str(value)
except:
return str(value)
# Oracle ํน์ ํ์
์ฒ๋ฆฌ
if hasattr(value, '__class__') and 'oracle' in str(value.__class__).lower():
return str(value)
# MySQL ํน์ ํ์
์ฒ๋ฆฌ
if hasattr(value, '__class__') and 'mysql' in str(value.__class__).lower():
return str(value)
# ๋ฌธ์์ด์์ ๋ฌธ์ ์๋ ๋ฌธ์ ์ ๊ฑฐ
if isinstance(value, str):
cleaned = value.encode('utf-8', errors='ignore').decode('utf-8')
# ์ ์ด ๋ฌธ์ ์ ๊ฑฐ
import re
cleaned = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', cleaned)
return cleaned
# ๋ค๋ฅธ ํ์
์ ๊ทธ๋๋ก ๋ฐํ (์ซ์, ๋ฆฌ์คํธ, ๋์
๋๋ฆฌ ๋ฑ)
return value
except Exception as e:
logger.warning(f"๊ฐ ์ ๋ฆฌ ์ค ์ค๋ฅ: {e}, ์๋ณธ ๊ฐ: {value}")
# ์ค๋ฅ ๋ฐ์ ์ ์์ ํ ๋ฌธ์์ด๋ก ๋ณํ
try:
return str(value).encode('ascii', errors='ignore').decode('ascii')
except:
return "[์ธ์ฝ๋ฉ ์ค๋ฅ]"
def execute_non_query(self, query: str) -> int:
"""INSERT, UPDATE, DELETE ๋ฑ์ ์ฟผ๋ฆฌ๋ฅผ ์คํํฉ๋๋ค."""
if not self.is_connected():
raise Exception("๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐ๋์ง ์์์ต๋๋ค.")
try:
with self.engine.connect() as conn:
result = conn.execute(text(query))
conn.commit()
affected_rows = result.rowcount
logger.info(f"์ฟผ๋ฆฌ ์คํ ์ฑ๊ณต: {affected_rows}๊ฐ ํ ์ํฅ")
return affected_rows
except Exception as e:
logger.error(f"์ฟผ๋ฆฌ ์คํ ์คํจ: {e}")
raise Exception(f"์ฟผ๋ฆฌ ์คํ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {e}")
def validate_query(self, query: str) -> bool:
"""SQL ์ฟผ๋ฆฌ์ ์ ํจ์ฑ์ ๊ฒ์ฌํฉ๋๋ค."""
if not self.is_connected():
return False
try:
# ์ฟผ๋ฆฌ ๊ตฌ๋ฌธ ๊ฒ์ฌ (์ค์ ์คํํ์ง ์์)
with self.engine.connect() as conn:
if query.strip().upper().startswith('SELECT'):
self._explain_query(conn, query)
else:
conn.execute(text(query))
return True
except Exception as e:
# ๋ ์์ธํ ์ค๋ฅ ์ ๋ณด ๋ก๊น
error_msg = str(e)
logger.warning(f"์ฟผ๋ฆฌ ์ ํจ์ฑ ๊ฒ์ฌ ์คํจ: {error_msg}")
# ํ
์ด๋ธ๋ช
๊ด๋ จ ์ค๋ฅ์ธ์ง ํ์ธ
if "syntax" in error_msg.lower() and "'" in query:
# ํ
์ด๋ธ๋ช
์ ์์๋ฐ์ดํ๊ฐ ์๋ชป ์ฌ์ฉ๋ ๊ฒฝ์ฐ
logger.warning(f"ํ
์ด๋ธ๋ช
์ ์์๋ฐ์ดํ๊ฐ ์๋ชป ์ฌ์ฉ๋จ: {query}")
return False
@abstractmethod
def _explain_query(self, conn, query: str):
"""๋ฐ์ดํฐ๋ฒ ์ด์ค๋ณ EXPLAIN ์ฟผ๋ฆฌ๋ฅผ ์คํํฉ๋๋ค."""
pass
def close_connection(self):
"""๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ์ ์์ ํ๊ฒ ์ข
๋ฃํฉ๋๋ค."""
try:
if self.engine:
# ๋ชจ๋ ์ฐ๊ฒฐ ํ์ ์ฐ๊ฒฐ์ ์ ๋ฆฌ
self.engine.dispose()
logger.info("๋ฐ์ดํฐ๋ฒ ์ด์ค ์์ง์ด ์ ๋ฆฌ๋์์ต๋๋ค.")
if self.session_factory:
self.session_factory.close_all()
logger.info("์ธ์
ํฉํ ๋ฆฌ๊ฐ ์ ๋ฆฌ๋์์ต๋๋ค.")
self.engine = None
self.session_factory = None
logger.info("๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ์ด ์์ ํ ์ข
๋ฃ๋์์ต๋๋ค.")
except Exception as e:
logger.error(f"๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ์ข
๋ฃ ์ค ์ค๋ฅ ๋ฐ์: {e}")
class MySQLProvider(DatabaseProvider):
"""MySQL ๋ฐ์ดํฐ๋ฒ ์ด์ค Provider"""
def __init__(self):
super().__init__()
self.db_type = "mysql"
def _initialize_connection(self):
"""MySQL ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ์ ์ด๊ธฐํํฉ๋๋ค."""
try:
# SQLAlchemy ์์ง ์์ฑ
self.engine = create_engine(
config.get_mysql_url(),
echo=False, # SQL ๋ก๊ทธ ๋นํ์ฑํ
pool_pre_ping=True, # ์ฐ๊ฒฐ ์ํ ํ์ธ
pool_recycle=3600 # 1์๊ฐ๋ง๋ค ์ฐ๊ฒฐ ์ฌ์์ฑ
)
# ์ธ์
ํฉํ ๋ฆฌ ์์ฑ
self.session_factory = sessionmaker(bind=self.engine)
# ์ฐ๊ฒฐ ํ
์คํธ ๋ฐ ๋ฌธ์ ์ธ์ฝ๋ฉ ์ค์
with self.engine.connect() as conn:
self._setup_connection(conn)
# ์ฐ๊ฒฐ ํ
์คํธ
conn.execute(text("SELECT 1"))
logger.info(f"\n๐จ===== ๋ฐ์ดํฐ๋ฒ ์ด์ค[{self.db_type.upper()}] ์ฐ๊ฒฐ์ด ์ฑ๊ณต์ ์ผ๋ก ์ด๊ธฐํ๋์์ต๋๋ค.")
except Exception as e:
logger.error(f"{self.db_type.upper()} ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ์ด๊ธฐํ ์คํจ: {e}")
self.engine = None
self.session_factory = None
def _setup_connection(self, conn):
"""MySQL ์ฐ๊ฒฐ ์ค์ """
conn.execute(text("SET NAMES utf8mb4"))
conn.execute(text("SET CHARACTER SET utf8mb4"))
conn.execute(text("SET character_set_connection=utf8mb4"))
def get_table_schema(self, table_name: str) -> Dict[str, Any]:
"""MySQL ํ
์ด๋ธ ์คํค๋ง ์กฐํ"""
if not self.is_connected():
raise Exception("๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐ๋์ง ์์์ต๋๋ค.")
try:
# ํ
์ด๋ธ์ COMMENT(์ค๋ช
) ์ ๋ณด๋ฅผ ์กฐํ
table_comment_query = f"""
SELECT TABLE_COMMENT
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '{config.MYSQL_DATABASE}'
AND TABLE_NAME = '{table_name}'
"""
table_comment_result = self.execute_query(table_comment_query)
if table_comment_result and isinstance(table_comment_result, list):
table_comment = table_comment_result[0].get("TABLE_COMMENT", "")
else:
table_comment = ""
# ์ปฌ๋ผ ์ ๋ณด ์กฐํ
query = f"""
SELECT
COLUMN_NAME,
DATA_TYPE,
IS_NULLABLE,
COLUMN_DEFAULT,
COLUMN_KEY,
COLUMN_COMMENT
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = '{config.MYSQL_DATABASE}'
AND TABLE_NAME = '{table_name}'
ORDER BY ORDINAL_POSITION
"""
columns = self.execute_query(query)
return {
"TABLE_NAME": table_name,
"TABLE_COMMENT": table_comment,
"COLUMNS": columns
}
except Exception as e:
logger.error(f"MySQL ํ
์ด๋ธ ์คํค๋ง ์กฐํ ์คํจ: {e}")
raise Exception(f"ํ
์ด๋ธ ์คํค๋ง ์กฐํ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {e}")
def get_table_list(self, database_name: str = None) -> List[Dict[str, str]]:
"""MySQL ํ
์ด๋ธ ๋ชฉ๋ก ์กฐํ"""
if not self.is_connected():
raise Exception("๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐ๋์ง ์์์ต๋๋ค.")
try:
if database_name is None:
database_name = config.MYSQL_DATABASE
logger.debug(f"๋ฐ์ดํฐ๋ฒ ์ด์ค ์ด๋ฆ: {database_name}")
query = f"""
SELECT
TABLE_NAME,
TABLE_COMMENT
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '{database_name}'
"""
result = self.execute_query(query)
table_list = []
for row in result:
table_list.append({
"TABLE_NAME": row.get("TABLE_NAME", ""),
"TABLE_COMMENT": row.get("TABLE_COMMENT", "")
})
logger.info(f"MySQL ํ
์ด๋ธ ๋ชฉ๋ก ์กฐํ ์ฑ๊ณต: {len(table_list)}๊ฐ ํ
์ด๋ธ")
return table_list
except Exception as e:
logger.error(f"MySQL ํ
์ด๋ธ ๋ชฉ๋ก ์กฐํ ์คํจ: {e}")
raise Exception(f"ํ
์ด๋ธ ๋ชฉ๋ก ์กฐํ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {e}")
def get_database_info(self) -> Dict[str, Any]:
"""MySQL ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๋ณด๋ฅผ ๋ฐํํฉ๋๋ค."""
if not self.is_connected():
return {"error": "๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐ๋์ง ์์์ต๋๋ค."}
try:
info = {
"database_type": "MySQL",
"database_name": config.MYSQL_DATABASE,
"host": config.MYSQL_HOST,
"port": config.MYSQL_PORT,
"user": config.MYSQL_USER,
"tables": self.get_table_list(config.MYSQL_DATABASE),
"connection_status": "connected"
}
return info
except Exception as e:
logger.error(f"MySQL ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๋ณด ์กฐํ ์คํจ: {e}")
return {"error": f"๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๋ณด ์กฐํ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {e}"}
def _explain_query(self, conn, query: str):
"""MySQL EXPLAIN ์ฟผ๋ฆฌ ์คํ"""
conn.execute(text(f"EXPLAIN {query}"))
class PostgreSQLProvider(DatabaseProvider):
"""PostgreSQL ๋ฐ์ดํฐ๋ฒ ์ด์ค Provider"""
def __init__(self):
super().__init__()
self.db_type = "postgresql"
def _initialize_connection(self):
"""PostgreSQL ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ์ ์ด๊ธฐํํฉ๋๋ค."""
try:
# SQLAlchemy ์์ง ์์ฑ
self.engine = create_engine(
config.get_postgresql_url(),
echo=False, # SQL ๋ก๊ทธ ๋นํ์ฑํ
pool_pre_ping=True, # ์ฐ๊ฒฐ ์ํ ํ์ธ
pool_recycle=3600 # 1์๊ฐ๋ง๋ค ์ฐ๊ฒฐ ์ฌ์์ฑ
)
# ์ธ์
ํฉํ ๋ฆฌ ์์ฑ
self.session_factory = sessionmaker(bind=self.engine)
# ์ฐ๊ฒฐ ํ
์คํธ
with self.engine.connect() as conn:
self._setup_connection(conn)
# ์ฐ๊ฒฐ ํ
์คํธ
conn.execute(text("SELECT 1"))
logger.info(f"\n๐จ===== ๋ฐ์ดํฐ๋ฒ ์ด์ค[{self.db_type.upper()}] ์ฐ๊ฒฐ์ด ์ฑ๊ณต์ ์ผ๋ก ์ด๊ธฐํ๋์์ต๋๋ค.")
except Exception as e:
logger.error(f"{self.db_type.upper()} ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ์ด๊ธฐํ ์คํจ: {e}")
self.engine = None
self.session_factory = None
def _setup_connection(self, conn):
"""PostgreSQL ์ฐ๊ฒฐ ์ค์ """
# PostgreSQL์ ๊ธฐ๋ณธ์ ์ผ๋ก UTF-8์ ์ง์ํ๋ฏ๋ก ์ถ๊ฐ ์ค์ ๋ถํ์
pass
def get_table_schema(self, table_name: str) -> Dict[str, Any]:
"""PostgreSQL ํ
์ด๋ธ ์คํค๋ง ์กฐํ"""
if not self.is_connected():
raise Exception("๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐ๋์ง ์์์ต๋๋ค.")
try:
# ํ
์ด๋ธ ์ค๋ช
์ ๋ณด ์กฐํ
table_comment_query = f"""
SELECT obj_description(c.oid) as table_comment
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = '{table_name}' AND n.nspname = 'public'
"""
table_comment_result = self.execute_query(table_comment_query)
if table_comment_result and isinstance(table_comment_result, list):
table_comment = table_comment_result[0].get("table_comment", "")
else:
table_comment = ""
# ์ปฌ๋ผ ์ ๋ณด ์กฐํ
query = f"""
SELECT
cols.column_name,
cols.data_type,
cols.is_nullable,
cols.column_default,
CASE
WHEN pk.column_name IS NOT NULL THEN 'PRI'
ELSE ''
END as column_key,
col_description(c.oid, cols.ordinal_position) as column_comment
FROM information_schema.columns cols
JOIN pg_class c ON c.relname = cols.table_name
LEFT JOIN (
SELECT kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name
WHERE tc.constraint_type = 'PRIMARY KEY' AND tc.table_name = '{table_name}'
) pk ON cols.column_name = pk.column_name
WHERE cols.table_name = '{table_name}'
ORDER BY cols.ordinal_position
"""
columns = self.execute_query(query)
return {
"TABLE_NAME": table_name,
"TABLE_COMMENT": table_comment,
"COLUMNS": columns
}
except Exception as e:
logger.error(f"PostgreSQL ํ
์ด๋ธ ์คํค๋ง ์กฐํ ์คํจ: {e}")
raise Exception(f"ํ
์ด๋ธ ์คํค๋ง ์กฐํ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {e}")
def get_table_list(self, database_name: str = None) -> List[Dict[str, str]]:
"""PostgreSQL ํ
์ด๋ธ ๋ชฉ๋ก ์กฐํ"""
if not self.is_connected():
raise Exception("๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐ๋์ง ์์์ต๋๋ค.")
try:
logger.debug(f"PostgreSQL ํ
์ด๋ธ ๋ชฉ๋ก ์กฐํ")
query = """
SELECT
tablename as table_name,
obj_description(c.oid) as table_comment
FROM pg_tables t
JOIN pg_class c ON c.relname = t.tablename
WHERE schemaname = 'public'
"""
result = self.execute_query(query)
table_list = []
for row in result:
table_list.append({
"TABLE_NAME": row.get("table_name", ""),
"TABLE_COMMENT": row.get("table_comment", "")
})
logger.info(f"PostgreSQL ํ
์ด๋ธ ๋ชฉ๋ก ์กฐํ ์ฑ๊ณต: {len(table_list)}๊ฐ ํ
์ด๋ธ")
return table_list
except Exception as e:
logger.error(f"PostgreSQL ํ
์ด๋ธ ๋ชฉ๋ก ์กฐํ ์คํจ: {e}")
raise Exception(f"ํ
์ด๋ธ ๋ชฉ๋ก ์กฐํ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {e}")
def get_database_info(self) -> Dict[str, Any]:
"""PostgreSQL ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๋ณด๋ฅผ ๋ฐํํฉ๋๋ค."""
if not self.is_connected():
return {"error": "๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐ๋์ง ์์์ต๋๋ค."}
try:
info = {
"database_type": "PostgreSQL",
"database_name": config.POSTGRESQL_DATABASE,
"host": config.POSTGRESQL_HOST,
"port": config.POSTGRESQL_PORT,
"user": config.POSTGRESQL_USER,
"tables": self.get_table_list(),
"connection_status": "connected"
}
return info
except Exception as e:
logger.error(f"PostgreSQL ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๋ณด ์กฐํ ์คํจ: {e}")
return {"error": f"๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๋ณด ์กฐํ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {e}"}
def _explain_query(self, conn, query: str):
"""PostgreSQL EXPLAIN ์ฟผ๋ฆฌ ์คํ"""
conn.execute(text(f"EXPLAIN {query}"))
class OracleProvider(DatabaseProvider):
"""Oracle ๋ฐ์ดํฐ๋ฒ ์ด์ค Provider"""
def __init__(self):
super().__init__()
self.db_type = "oracle"
def _initialize_connection(self):
"""Oracle ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ์ ์ด๊ธฐํํฉ๋๋ค."""
try:
# SQLAlchemy ์์ง ์์ฑ
self.engine = create_engine(
config.get_oracle_url(),
echo=False, # SQL ๋ก๊ทธ ๋นํ์ฑํ
pool_pre_ping=True, # ์ฐ๊ฒฐ ์ํ ํ์ธ
pool_recycle=3600 # 1์๊ฐ๋ง๋ค ์ฐ๊ฒฐ ์ฌ์์ฑ
)
# ์ธ์
ํฉํ ๋ฆฌ ์์ฑ
self.session_factory = sessionmaker(bind=self.engine)
# ์ฐ๊ฒฐ ํ
์คํธ
with self.engine.connect() as conn:
self._setup_connection(conn)
# ์ฐ๊ฒฐ ํ
์คํธ
conn.execute(text("SELECT 1 FROM DUAL"))
logger.info(f"\n๐จ===== ๋ฐ์ดํฐ๋ฒ ์ด์ค[{self.db_type.upper()}] ์ฐ๊ฒฐ์ด ์ฑ๊ณต์ ์ผ๋ก ์ด๊ธฐํ๋์์ต๋๋ค.")
except Exception as e:
logger.error(f"{self.db_type.upper()} ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ์ด๊ธฐํ ์คํจ: {e}")
self.engine = None
self.session_factory = None
def _setup_connection(self, conn):
"""Oracle ์ฐ๊ฒฐ ์ค์ """
# Oracle์ ๊ธฐ๋ณธ์ ์ผ๋ก UTF-8์ ์ง์ํ๋ฏ๋ก ์ถ๊ฐ ์ค์ ๋ถํ์
pass
def get_table_schema(self, table_name: str) -> Dict[str, Any]:
"""Oracle ํ
์ด๋ธ ์คํค๋ง ์กฐํ"""
if not self.is_connected():
raise Exception("๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐ๋์ง ์์์ต๋๋ค.")
try:
# ํ
์ด๋ธ ์ค๋ช
์ ๋ณด ์กฐํ
table_comment_query = f"""
SELECT comments as table_comment
FROM user_tab_comments
WHERE table_name = '{table_name.upper()}'
"""
table_comment_result = self.execute_query(table_comment_query)
if table_comment_result and isinstance(table_comment_result, list):
table_comment = table_comment_result[0].get("table_comment", "")
else:
table_comment = ""
# ์ปฌ๋ผ ์ ๋ณด ์กฐํ
query = f"""
SELECT
column_name,
data_type,
nullable as is_nullable,
data_default as column_default,
CASE
WHEN pk.column_name IS NOT NULL THEN 'PRI'
ELSE ''
END as column_key,
comments as column_comment
FROM user_tab_columns cols
LEFT JOIN user_col_comments col_comments ON cols.table_name = col_comments.table_name AND cols.column_name = col_comments.column_name
LEFT JOIN (
SELECT cols.column_name
FROM user_constraints cons
JOIN user_cons_columns cols ON cons.constraint_name = cols.constraint_name
WHERE cons.constraint_type = 'P' AND cons.table_name = '{table_name.upper()}'
) pk ON cols.column_name = pk.column_name
WHERE cols.table_name = '{table_name.upper()}'
ORDER BY cols.column_id
"""
columns = self.execute_query(query)
return {
"TABLE_NAME": table_name,
"TABLE_COMMENT": table_comment,
"COLUMNS": columns
}
except Exception as e:
logger.error(f"Oracle ํ
์ด๋ธ ์คํค๋ง ์กฐํ ์คํจ: {e}")
raise Exception(f"ํ
์ด๋ธ ์คํค๋ง ์กฐํ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {e}")
def get_table_list(self, database_name: str = None) -> List[Dict[str, str]]:
"""Oracle ํ
์ด๋ธ ๋ชฉ๋ก ์กฐํ"""
if not self.is_connected():
raise Exception("๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐ๋์ง ์์์ต๋๋ค.")
try:
logger.debug(f"Oracle ํ
์ด๋ธ ๋ชฉ๋ก ์กฐํ")
query = """
SELECT
table_name,
comments as table_comment
FROM user_tab_comments
WHERE table_type = 'TABLE'
"""
result = self.execute_query(query)
table_list = []
for row in result:
table_list.append({
"TABLE_NAME": row.get("table_name", ""),
"TABLE_COMMENT": row.get("table_comment", "")
})
logger.info(f"Oracle ํ
์ด๋ธ ๋ชฉ๋ก ์กฐํ ์ฑ๊ณต: {len(table_list)}๊ฐ ํ
์ด๋ธ")
return table_list
except Exception as e:
logger.error(f"Oracle ํ
์ด๋ธ ๋ชฉ๋ก ์กฐํ ์คํจ: {e}")
raise Exception(f"ํ
์ด๋ธ ๋ชฉ๋ก ์กฐํ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {e}")
def get_database_info(self) -> Dict[str, Any]:
"""Oracle ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๋ณด๋ฅผ ๋ฐํํฉ๋๋ค."""
if not self.is_connected():
return {"error": "๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐ๋์ง ์์์ต๋๋ค."}
try:
info = {
"database_type": "Oracle",
"database_name": config.ORACLE_SERVICE_NAME or config.ORACLE_SID,
"host": config.ORACLE_HOST,
"port": config.ORACLE_PORT,
"user": config.ORACLE_USER,
"tables": self.get_table_list(),
"connection_status": "connected"
}
return info
except Exception as e:
logger.error(f"Oracle ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๋ณด ์กฐํ ์คํจ: {e}")
return {"error": f"๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๋ณด ์กฐํ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {e}"}
def _explain_query(self, conn, query: str):
"""Oracle EXPLAIN PLAN ์ฟผ๋ฆฌ ์คํ"""
conn.execute(text(f"EXPLAIN PLAN FOR {query}"))
class DatabaseManager:
"""๋ค์ค ๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ด๋ฆฌ์"""
def __init__(self):
self.provider = None
# ์์ฑ์์์ ์๋ ์ด๊ธฐํํ์ง ์์
def constructor(self):
"""๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ์ ์ด๊ธฐํํฉ๋๋ค. (๊ธฐ์กด ํธํ์ฑ์ ์ํด ์ ์ง)"""
self._initialize_provider()
def _initialize_provider(self):
"""ํ๊ฒฝ๋ณ์์ ๋ฐ๋ผ ์ ์ ํ ๋ฐ์ดํฐ๋ฒ ์ด์ค Provider๋ฅผ ์ด๊ธฐํํฉ๋๋ค."""
try:
db_type = config.DATABASE_TYPE.lower()
if db_type == "mysql":
self.provider = MySQLProvider()
elif db_type == "postgresql":
self.provider = PostgreSQLProvider()
elif db_type == "oracle":
self.provider = OracleProvider()
else:
raise ValueError(f"์ง์ํ์ง ์๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค ํ์
: {db_type}")
# Provider ์ด๊ธฐํ
self.provider.constructor()
logger.info(f"๋ฐ์ดํฐ๋ฒ ์ด์ค Provider [{db_type.upper()}]๊ฐ ์ด๊ธฐํ๋์์ต๋๋ค.")
except Exception as e:
logger.error(f"๋ฐ์ดํฐ๋ฒ ์ด์ค Provider ์ด๊ธฐํ ์คํจ: {e}")
self.provider = None
def is_connected(self) -> bool:
"""๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ์ํ๋ฅผ ํ์ธํฉ๋๋ค."""
return self.provider is not None and self.provider.is_connected()
def execute_query(self, query: str) -> List[Dict[str, Any]]:
"""SQL ์ฟผ๋ฆฌ๋ฅผ ์คํํ๊ณ ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํฉ๋๋ค."""
if not self.provider:
raise Exception("๋ฐ์ดํฐ๋ฒ ์ด์ค Provider๊ฐ ์ด๊ธฐํ๋์ง ์์์ต๋๋ค.")
return self.provider.execute_query(query)
def execute_non_query(self, query: str) -> int:
"""INSERT, UPDATE, DELETE ๋ฑ์ ์ฟผ๋ฆฌ๋ฅผ ์คํํฉ๋๋ค."""
if not self.provider:
raise Exception("๋ฐ์ดํฐ๋ฒ ์ด์ค Provider๊ฐ ์ด๊ธฐํ๋์ง ์์์ต๋๋ค.")
return self.provider.execute_non_query(query)
def get_table_schema(self, table_name: str) -> Dict[str, Any]:
"""ํ
์ด๋ธ ์คํค๋ง ์ ๋ณด๋ฅผ ๋ฐํํฉ๋๋ค."""
if not self.provider:
raise Exception("๋ฐ์ดํฐ๋ฒ ์ด์ค Provider๊ฐ ์ด๊ธฐํ๋์ง ์์์ต๋๋ค.")
return self.provider.get_table_schema(table_name)
def get_table_list(self, database_name: str = None) -> List[Dict[str, str]]:
"""๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ชจ๋ ํ
์ด๋ธ ๋ชฉ๋ก์ ๋ฐํํฉ๋๋ค."""
if not self.provider:
raise Exception("๋ฐ์ดํฐ๋ฒ ์ด์ค Provider๊ฐ ์ด๊ธฐํ๋์ง ์์์ต๋๋ค.")
return self.provider.get_table_list(database_name)
def validate_query(self, query: str) -> bool:
"""SQL ์ฟผ๋ฆฌ์ ์ ํจ์ฑ์ ๊ฒ์ฌํฉ๋๋ค."""
if not self.provider:
return False
return self.provider.validate_query(query)
def get_database_info(self) -> Dict[str, Any]:
"""๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๋ณด๋ฅผ ๋ฐํํฉ๋๋ค."""
if not self.provider:
return {"error": "๋ฐ์ดํฐ๋ฒ ์ด์ค Provider๊ฐ ์ด๊ธฐํ๋์ง ์์์ต๋๋ค."}
return self.provider.get_database_info()
def close_connection(self):
"""๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ์ ์์ ํ๊ฒ ์ข
๋ฃํฉ๋๋ค."""
if self.provider:
self.provider.close_connection()
# ์ ์ญ ๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ด๋ฆฌ์ ์ธ์คํด์ค
db_manager = DatabaseManager()