# -*- coding: utf-8 -*-
"""
SQL адаптер для работы с реляционными базами данных (SQLite, MySQL, PostgreSQL).
"""
import sys
import sqlite3
import psycopg2
import mysql.connector
from typing import List, Dict, Any
from .base_adapter import DBAdapter
class SQLAdapter(DBAdapter):
def __init__(self, db_config: Dict[str, Any], db_instance=None):
self.db_type = db_config.get('type', 'sqlite')
self.config = db_config
self.connection = None
self.db = db_instance
def connect(self):
"""Устанавливает соединение с базой данных"""
if self.db is not None:
return
try:
if self.db_type == 'sqlite':
db_name = self.config.get('db_name', 'database.db')
self.connection = sqlite3.connect(db_name)
self.connection.row_factory = sqlite3.Row
elif self.db_type == 'mysql':
# Для MySQL подключаемся без указания database если её нет
connect_params = {
'host': self.config.get('host', 'localhost'),
'port': self.config.get('port', 3306),
'user': self.config.get('user'),
'password': self.config.get('password'),
}
# Добавляем database только если она указана
if self.config.get('database'):
connect_params['database'] = self.config.get('database')
self.connection = mysql.connector.connect(**connect_params)
elif self.db_type == 'postgresql':
self.connection = psycopg2.connect(
host=self.config.get('host', 'localhost'),
port=self.config.get('port', 5432),
user=self.config.get('user'),
password=self.config.get('password'),
dbname=self.config.get('dbname')
)
self.connection.set_session(autocommit=True)
else:
raise ValueError(f"Неподдерживаемый тип базы данных: {self.db_type}")
except Exception as e:
raise
def disconnect(self):
"""Закрывает соединение с базой данных"""
if self.connection:
try:
# Проверяем, что соединение все еще активно
if self.connection:
self.connection.close()
except Exception as e:
# Логируем ошибку при закрытии соединения
print(f"Ошибка при закрытии соединения с {self.db_type}: {str(e)}", file=sys.stderr)
finally:
self.connection = None
def _execute_query(self, query: str, params: tuple = None) -> List[Dict[str, Any]]:
"""Выполняет запрос и возвращает результат"""
cursor = self.connection.cursor()
try:
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
# Если запрос возвращает данные
if cursor.description:
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
return [dict(zip(columns, row)) for row in rows]
else:
# Для INSERT, UPDATE, DELETE
self.connection.commit()
return [{"affected_rows": cursor.rowcount}]
except Exception as e:
self.connection.rollback()
raise
finally:
cursor.close()
def select(self, table: str, columns: List[str] = None, condition: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""Выполняет SELECT запрос"""
# Формируем колонки
if not columns or '*' in columns:
columns_str = '*'
else:
columns_str = ', '.join(columns)
# Формируем условие WHERE
where_clause = ""
params = []
if condition:
where_parts = []
for key, value in condition.items():
where_parts.append(f"{key} = %s")
params.append(value)
where_clause = f"WHERE {' AND '.join(where_parts)}"
query = f"SELECT {columns_str} FROM {table} {where_clause}".strip()
# Для SQLite используем ? вместо %s
if self.db_type == 'sqlite':
query = query.replace('%s', '?')
params = [p for p in params] # Копируем список
return self._execute_query(query, tuple(params) if params else None)
def insert(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Выполняет INSERT запрос"""
columns = list(data.keys())
values = list(data.values())
placeholders = ', '.join(['%s'] * len(values))
columns_str = ', '.join(columns)
query = f"INSERT INTO {table} ({columns_str}) VALUES ({placeholders})"
# Для SQLite используем ? вместо %s
if self.db_type == 'sqlite':
query = query.replace('%s', '?')
result = self._execute_query(query, tuple(values))
return result[0] if result else {"inserted": True}
def update(self, table: str, data: Dict[str, Any], condition: Dict[str, Any]) -> Dict[str, Any]:
"""Выполняет UPDATE запрос"""
# Формируем SET часть
set_parts = []
set_values = []
for key, value in data.items():
set_parts.append(f"{key} = %s")
set_values.append(value)
set_clause = ', '.join(set_parts)
# Формируем WHERE часть
where_parts = []
where_values = []
for key, value in condition.items():
where_parts.append(f"{key} = %s")
where_values.append(value)
where_clause = f"WHERE {' AND '.join(where_parts)}" if where_parts else ""
query = f"UPDATE {table} SET {set_clause} {where_clause}".strip()
all_params = set_values + where_values
# Для SQLite используем ? вместо %s
if self.db_type == 'sqlite':
query = query.replace('%s', '?')
result = self._execute_query(query, tuple(all_params))
return result[0] if result else {"updated": True}
def delete(self, table: str, condition: Dict[str, Any]) -> Dict[str, Any]:
"""Выполняет DELETE запрос"""
# Формируем WHERE часть
where_parts = []
where_values = []
for key, value in condition.items():
where_parts.append(f"{key} = %s")
where_values.append(value)
where_clause = f"WHERE {' AND '.join(where_parts)}" if where_parts else ""
query = f"DELETE FROM {table} {where_clause}".strip()
# Для SQLite используем ? вместо %s
if self.db_type == 'sqlite':
query = query.replace('%s', '?')
result = self._execute_query(query, tuple(where_values))
return result[0] if result else {"deleted": True}
def alter(self, table: str, instruction: Dict[str, Any]) -> Dict[str, Any]:
"""Выполняет ALTER TABLE запрос"""
action = instruction.get('action', '').upper()
if action == 'ADD_COLUMN':
column_name = instruction.get('column_name')
column_type = instruction.get('column_type', 'TEXT')
if not column_name:
raise ValueError("Не указано имя столбца")
query = f"ALTER TABLE {table} ADD COLUMN {column_name} {column_type}"
elif action == 'DROP_COLUMN':
column_name = instruction.get('column_name')
if not column_name:
raise ValueError("Не указано имя столбца")
# DROP COLUMN поддерживается не во всех версиях SQLite
if self.db_type == 'sqlite':
raise NotImplementedError("DROP COLUMN не поддерживается в SQLite")
query = f"ALTER TABLE {table} DROP COLUMN {column_name}"
else:
raise ValueError(f"Неизвестное действие ALTER: {action}")
result = self._execute_query(query)
return result[0] if result else {"altered": True}
def create_table(self, table_name: str, schema: Dict[str, Any] = None) -> Dict[str, Any]:
"""Создает таблицу в базе данных"""
if schema:
# Если схема предоставлена, создаем таблицу с указанными колонками
columns = schema.get('columns', {})
if not columns:
raise ValueError("Схема должна содержать определения колонок")
column_defs = []
for col_name, col_type in columns.items():
if isinstance(col_type, dict):
# Поддержка дополнительных параметров колонки
col_def = f"{col_name} {col_type.get('type', 'TEXT')}"
if col_type.get('primary_key'):
col_def += " PRIMARY KEY"
if col_type.get('not_null'):
col_def += " NOT NULL"
if col_type.get('unique'):
col_def += " UNIQUE"
if col_type.get('default'):
col_def += f" DEFAULT {col_type.get('default')}"
else:
# Простое определение типа
col_def = f"{col_name} {col_type}"
column_defs.append(col_def)
columns_str = ', '.join(column_defs)
query = f"CREATE TABLE {table_name} ({columns_str})"
else:
# Создаем пустую таблицу с одной колонкой по умолчанию
query = f"CREATE TABLE {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)"
result = self._execute_query(query)
return {"table_name": table_name, "created": True, "query": query}
def drop_table(self, table_name: str) -> Dict[str, Any]:
"""Удаляет таблицу из базы данных"""
query = f"DROP TABLE IF EXISTS {table_name}"
result = self._execute_query(query)
return {"table_name": table_name, "dropped": True, "query": query}
def execute_raw(self, query: str):
"""Выполняет сырой SQL запрос"""
result = self._execute_query(query)
# Для CREATE, DROP, ALTER возвращаем статус успешности
query_lower = query.strip().upper()
if any(query_lower.startswith(cmd) for cmd in ['CREATE', 'DROP', 'ALTER', 'PRAGMA', 'USE']):
# Используем явный UTF-8 текст для статуса
success_message = "Запрос выполнен успешно"
return {"status": "success", "message": success_message, "query": query}
# Для SELECT возвращаем результат
return result
def create_database(self, database_name: str) -> Dict[str, Any]:
"""Создает новую базу данных"""
if self.db_type == 'sqlite':
# Для SQLite база данных - это файл, создаём новое подключение
db_path = f"{database_name}.db"
new_conn = sqlite3.connect(db_path)
new_conn.close()
return {"status": "success", "database": database_name, "message": f"База данных '{database_name}' создана (файл: {db_path})"}
elif self.db_type == 'mysql':
# Для MySQL выполняем CREATE DATABASE
query = f"CREATE DATABASE IF NOT EXISTS `{database_name}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"
self._execute_query(query)
return {"status": "success", "database": database_name, "message": f"База данных '{database_name}' создана"}
elif self.db_type == 'postgresql':
# Для PostgreSQL выполняем CREATE DATABASE
query = f'CREATE DATABASE "{database_name}" ENCODING \'UTF8\''
self._execute_query(query)
return {"status": "success", "database": database_name, "message": f"База данных '{database_name}' создана"}
else:
raise ValueError(f"Создание БД не поддерживается для: {self.db_type}")
def use_database(self, database_name: str) -> Dict[str, Any]:
"""Переключается на указанную базу данных"""
if self.db_type == 'mysql':
query = f"USE `{database_name}`"
self._execute_query(query)
self.config['database'] = database_name
return {"status": "success", "database": database_name, "message": f"Переключено на базу данных '{database_name}'"}
elif self.db_type == 'sqlite':
# Для SQLite переподключаемся к другому файлу
self.disconnect()
self.config['db_name'] = f"{database_name}.db"
self.connect()
return {"status": "success", "database": database_name, "message": f"Переключено на базу данных '{database_name}'"}
elif self.db_type == 'postgresql':
# PostgreSQL не поддерживает USE, нужно переподключиться
self.disconnect()
self.config['dbname'] = database_name
self.connect()
return {"status": "success", "database": database_name, "message": f"Переключено на базу данных '{database_name}'"}
else:
raise ValueError(f"Переключение БД не поддерживается для: {self.db_type}")