database.py•3.96 kB
"""데이터베이스 연결 및 관리"""
import asyncpg
import logging
from typing import Dict, Any, List, Optional
from src.exceptions import DatabaseConnectionError
class DatabaseManager:
"""TimescaleDB 데이터베이스 관리자"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.pool: Optional[asyncpg.Pool] = None
self.logger = logging.getLogger(__name__)
@property
def connection_url(self) -> str:
"""데이터베이스 연결 URL 생성"""
return (f"postgresql://{self.config['user']}:{self.config['password']}"
f"@{self.config['host']}:{self.config['port']}/{self.config['name']}")
@property
def is_connected(self) -> bool:
"""연결 상태 확인"""
return self.pool is not None and not self.pool._closed
async def connect(self) -> None:
"""데이터베이스 연결"""
try:
self.pool = await asyncpg.create_pool(
host=self.config['host'],
port=self.config['port'],
database=self.config['name'],
user=self.config['user'],
password=self.config['password'],
min_size=1,
max_size=10,
command_timeout=60
)
self.logger.info("데이터베이스 연결 성공")
except Exception as e:
self.logger.error(f"데이터베이스 연결 실패: {e}")
raise DatabaseConnectionError(f"데이터베이스 연결 실패: {e}")
async def disconnect(self) -> None:
"""데이터베이스 연결 해제"""
if self.pool:
await self.pool.close()
self.pool = None
self.logger.info("데이터베이스 연결 해제")
async def fetch_all(self, query: str, *params) -> List[Dict[str, Any]]:
"""쿼리 실행 및 모든 결과 반환"""
if not self.is_connected:
raise DatabaseConnectionError("데이터베이스에 연결되지 않음")
async with self.pool.acquire() as connection:
try:
rows = await connection.fetch(query, *params)
return [dict(row) for row in rows]
except Exception as e:
self.logger.error(f"쿼리 실행 실패: {e}")
raise
async def fetch_one(self, query: str, *params) -> Optional[Dict[str, Any]]:
"""쿼리 실행 및 단일 결과 반환"""
if not self.is_connected:
raise DatabaseConnectionError("데이터베이스에 연결되지 않음")
async with self.pool.acquire() as connection:
try:
row = await connection.fetchrow(query, *params)
return dict(row) if row else None
except Exception as e:
self.logger.error(f"쿼리 실행 실패: {e}")
raise
async def execute(self, query: str, *params) -> str:
"""쿼리 실행 (INSERT, UPDATE, DELETE 등)"""
if not self.is_connected:
raise DatabaseConnectionError("데이터베이스에 연결되지 않음")
async with self.pool.acquire() as connection:
try:
result = await connection.execute(query, *params)
return result
except Exception as e:
self.logger.error(f"쿼리 실행 실패: {e}")
raise
async def health_check(self) -> bool:
"""데이터베이스 상태 확인"""
try:
if not self.is_connected:
return False
async with self.pool.acquire() as connection:
result = await connection.fetchval("SELECT 1")
return result == 1
except Exception as e:
self.logger.error(f"헬스체크 실패: {e}")
return False