from typing import Dict, Any, Optional
import mysql.connector
from contextlib import contextmanager
from ..interfaces.database_adapter import DatabaseAdapter
class MySQLAdapter(DatabaseAdapter):
def __init__(self, config: Dict[str, Any]):
self.config = {
"host": config.get("host", "localhost"),
"port": int(config.get("port") or 3306),
"user": config.get("user", "root"),
"password": config.get("password", ""),
"database": config.get("database", None),
"charset": "utf8mb4"
}
self.pool = mysql.connector.pooling.MySQLConnectionPool(
pool_name="custom_pool",
pool_size=5,
**self.config
)
@contextmanager
def get_connection(self):
conn = self.pool.get_connection()
try:
yield conn
finally:
conn.close()
def execute_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
with self.get_connection() as conn:
cursor = conn.cursor(dictionary=False) # Use tuple for consistency or dict if needed
try:
# Basic param substitution might vary, mysql connector uses %s or %(name)s
cursor.execute(query, params or {})
if cursor.description:
columns = [col[0] for col in cursor.description]
rows = cursor.fetchall()
return {
"columns": columns,
"rows": [list(row) for row in rows], # Convert generic tuples to list
"row_count": len(rows),
"affected_rows": 0
}
else:
conn.commit()
return {
"columns": [],
"rows": [],
"row_count": 0,
"affected_rows": cursor.rowcount
}
finally:
cursor.close()
def list_tables(self) -> list[str]:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SHOW TABLES")
return [row[0] for row in cursor.fetchall()]
def describe_table(self, table_name: str) -> Dict[str, Any]:
with self.get_connection() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute(f"DESCRIBE {table_name}")
return {"columns": cursor.fetchall()}
def get_version(self) -> str:
with self.get_connection() as conn:
return conn.get_server_info()