import pymysql
import logging
from typing import Optional, Dict, Any, List
from contextlib import contextmanager
from pymysql.connections import Connection
from pymysql.cursors import DictCursor
logger = logging.getLogger(__name__)
class DatabaseManager:
def __init__(self, config=None):
# Load config from .env file if not provided
if config is None:
import os
from dotenv import load_dotenv
load_dotenv()
self.connection_config = {
'host': os.getenv('MYSQL_HOST', 'localhost'),
'port': int(os.getenv('MYSQL_PORT', 3306)),
'user': os.getenv('MYSQL_USER', 'root'),
'password': os.getenv('MYSQL_PASSWORD', ''),
'database': os.getenv('MYSQL_DATABASE', 'test'),
'charset': 'utf8mb4',
'autocommit': False,
'cursorclass': DictCursor
}
else:
self.connection_config = {
'host': config.MYSQL_HOST,
'port': config.MYSQL_PORT,
'user': config.MYSQL_USER,
'password': config.MYSQL_PASSWORD,
'database': config.MYSQL_DATABASE,
'charset': 'utf8mb4',
'autocommit': False,
'cursorclass': DictCursor
}
def get_connection(self) -> Connection:
"""Get a new database connection"""
try:
connection = pymysql.connect(**self.connection_config)
logger.info("Database connection established successfully")
return connection
except Exception as e:
logger.error(f"Failed to connect to database: {str(e)}")
raise
@contextmanager
def get_connection_context(self):
"""Context manager for database connections"""
connection = None
try:
connection = self.get_connection()
yield connection
except Exception as e:
if connection:
connection.rollback()
logger.error(f"Database operation failed: {str(e)}")
raise
finally:
if connection:
connection.close()
def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
"""Execute a SELECT query and return results"""
with self.get_connection_context() as connection:
with connection.cursor() as cursor:
cursor.execute(query, params)
return cursor.fetchall()
def execute_update(self, query: str, params: Optional[tuple] = None) -> int:
"""Execute an INSERT/UPDATE/DELETE query and return affected rows"""
with self.get_connection_context() as connection:
with connection.cursor() as cursor:
affected_rows = cursor.execute(query, params)
connection.commit()
return affected_rows
def execute_many(self, query: str, params_list: List[tuple]) -> int:
"""Execute a query with multiple parameter sets"""
with self.get_connection_context() as connection:
with connection.cursor() as cursor:
affected_rows = cursor.executemany(query, params_list)
connection.commit()
return affected_rows
def test_connection(self) -> bool:
"""Test database connection"""
try:
with self.get_connection_context() as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
return True
except Exception as e:
logger.error(f"Connection test failed: {str(e)}")
return False
# Global database manager instance - will be initialized when needed
db_manager = None
def get_db_manager():
"""Get or create global database manager instance"""
global db_manager
if db_manager is None:
db_manager = DatabaseManager()
return db_manager