import sys
from pymongo import MongoClient
from .base_adapter import DBAdapter
class MongoAdapter(DBAdapter):
def __init__(self, db_config, db_instance=None):
self.uri = db_config.get("uri") if db_config else None
self.db_name = db_config.get("database_name") if db_config else None
self.client = None
self.db = db_instance
def connect(self):
if self.db is not None:
# Уже есть экземпляр базы данных, не нужно создавать новое соединение
return
try:
self.client = MongoClient(self.uri)
self.db = self.client[self.db_name]
# Проверка соединения
self.client.admin.command('ping')
except Exception as e:
raise
def disconnect(self):
if self.client:
try:
# Проверяем, что клиент все еще активен
if self.client:
self.client.close()
except Exception as e:
# Логируем ошибку при закрытии соединения
print(f"Ошибка при закрытии MongoDB соединения: {str(e)}", file=sys.stderr)
def select(self, table, columns, condition):
collection = self.db[table]
# Для MongoDB 'columns' - это проекция
projection = {col: 1 for col in columns} if columns and "*" not in columns else None
if projection and '_id' not in columns:
projection['_id'] = 0
# Поддержка дополнительных MongoDB операторов в condition
# Преобразование condition для поддержки MongoDB операторов
if condition is None:
condition = {}
# Поддержка сортировки, лимита и пропуска через condition
sort_by = condition.pop('sort', None)
limit_val = condition.pop('limit', None)
skip_val = condition.pop('skip', None)
cursor = collection.find(condition, projection)
if sort_by:
cursor = cursor.sort(sort_by)
if skip_val:
cursor = cursor.skip(skip_val)
if limit_val:
cursor = cursor.limit(limit_val)
return list(cursor)
def insert(self, table, data):
collection = self.db[table]
if isinstance(data, list):
result = collection.insert_many(data)
return {"inserted_ids": result.inserted_ids, "inserted_count": len(result.inserted_ids)}
else:
result = collection.insert_one(data)
return {"inserted_id": result.inserted_id, "inserted_count": 1}
def update(self, table, data, condition):
collection = self.db[table]
if condition is None:
condition = {}
# Поддержка различных MongoDB операторов обновления
if not any(key.startswith('$') for key in data.keys()):
# Если не указаны MongoDB операторы, используем $set по умолчанию
data = {"$set": data}
# Поддержка опций обновления
upsert = condition.pop('upsert', False)
multi = condition.pop('multi', True)
if multi:
result = collection.update_many(condition, data, upsert=upsert)
else:
result = collection.update_one(condition, data, upsert=upsert)
return {
"matched_count": result.matched_count,
"modified_count": result.modified_count,
"upserted_id": result.upserted_id if result.upserted_id else None
}
def delete(self, table, condition):
collection = self.db[table]
if condition is None:
condition = {}
# Поддержка опций удаления
limit = condition.pop('limit', 0) # 0 означает удаление всех
if limit == 1:
result = collection.delete_one(condition)
deleted_count = result.deleted_count
else:
result = collection.delete_many(condition)
deleted_count = result.deleted_count
return {"deleted_count": deleted_count}
def create_table(self, table_name, schema=None):
"""Создает коллекцию в MongoDB"""
# В MongoDB коллекции создаются автоматически при первом использовании
# Но мы можем явно создать коллекцию с определенными параметрами
if schema:
# Поддержка создания коллекции с валидацией
validation_options = schema.get('validation', {})
self.db.create_collection(
table_name,
validator=validation_options.get('validator'),
validation_level=validation_options.get('validationLevel', 'strict'),
validation_action=validation_options.get('validationAction', 'error')
)
else:
# Простое создание коллекции
self.db.create_collection(table_name)
return {"collection_name": table_name, "created": True}
def drop_table(self, table_name):
"""Удаляет коллекцию из MongoDB"""
self.db.drop_collection(table_name)
return {"collection_name": table_name, "dropped": True}
def create_database(self, database_name: str):
"""Создает новую базу данных в MongoDB"""
# В MongoDB база данных создается автоматически при первом использовании
# Но мы можем переключиться на неё и создать placeholder коллекцию
new_db = self.client[database_name]
# Создаём системную коллекцию чтобы БД появилась в списке
new_db.create_collection('_system_placeholder')
return {
"status": "success",
"database": database_name,
"message": f"База данных '{database_name}' создана"
}
def use_database(self, database_name: str):
"""Переключается на указанную базу данных MongoDB"""
self.db = self.client[database_name]
self.config['database_name'] = database_name
return {
"status": "success",
"database": database_name,
"message": f"Переключено на базу данных '{database_name}'"
}
def alter(self, table, instruction):
"""Поддержка операций изменения структуры коллекции в MongoDB"""
collection = self.db[table]
if isinstance(instruction, dict):
operation = instruction.get('operation')
if operation == 'create_index':
# Создание индекса
keys = instruction.get('keys', [])
options = instruction.get('options', {})
result = collection.create_index(keys, **options)
return {"operation": "create_index", "result": result}
elif operation == 'drop_index':
# Удаление индекса
index_name = instruction.get('index_name')
result = collection.drop_index(index_name)
return {"operation": "drop_index", "result": result}
elif operation == 'add_field':
# Добавление поля ко всем документам
field_updates = instruction.get('updates', {})
result = collection.update_many({}, {"$set": field_updates})
return {
"operation": "add_field",
"matched_count": result.matched_count,
"modified_count": result.modified_count
}
elif operation == 'rename_field':
# Переименование поля
old_name = instruction.get('old_name')
new_name = instruction.get('new_name')
result = collection.update_many(
{old_name: {"$exists": True}},
{"$rename": {old_name: new_name}}
)
return {
"operation": "rename_field",
"matched_count": result.matched_count,
"modified_count": result.modified_count
}
elif operation == 'remove_field':
# Удаление поля из всех документов
field_name = instruction.get('field_name')
result = collection.update_many(
{field_name: {"$exists": True}},
{"$unset": {field_name: ""}}
)
return {
"operation": "remove_field",
"matched_count": result.matched_count,
"modified_count": result.modified_count
}
# Если instruction не является словарем или операция не поддерживается
raise NotImplementedError(f"ALTER operation not supported: {instruction}")
def _parse_mongo_params(self, params_str: str):
"""
Парсит параметры MongoDB команды.
Поддерживает JSON-подобный синтаксис с $-операторами.
"""
import json
import re
from bson import ObjectId
if not params_str or not params_str.strip():
return None
params_str = params_str.strip()
# Список MongoDB операторов
mongo_ops = ['$set', '$unset', '$inc', '$push', '$pull', '$addToSet',
'$pop', '$rename', '$min', '$max', '$mul', '$currentDate',
'$gt', '$gte', '$lt', '$lte', '$ne', '$in', '$nin', '$regex',
'$exists', '$type', '$mod', '$text', '$where', '$all', '$elemMatch',
'$size', '$or', '$and', '$not', '$nor']
temp_str = params_str
# Заменяем "$op" -> "__MONGO_OP__op__" (с учетом кавычек)
# и $op -> "__MONGO_OP__op__" (без кавычек)
for op in mongo_ops:
op_name = op[1:] # без $
# Сначала заменяем варианты с кавычками
temp_str = temp_str.replace(f'"{op}"', f'"__MONGO_OP__{op_name}__"')
temp_str = temp_str.replace(f"'{op}'", f'"__MONGO_OP__{op_name}__"')
# Затем без кавычек (для случая {$set: ...})
temp_str = re.sub(rf'(?<!["\w])\{op}(?!["\w])', f'"__MONGO_OP__{op_name}__"', temp_str)
# Обрабатываем $oid для ObjectId (Extended JSON формат)
temp_str = temp_str.replace('"$oid"', '"__MONGO_EXT__oid__"')
temp_str = re.sub(r'(?<!["\w])\$oid(?!["\w])', '"__MONGO_EXT__oid__"', temp_str)
# Заменяем одинарные кавычки на двойные для JSON
temp_str = re.sub(r"'([^']*)'", r'"\1"', temp_str)
# Убираем trailing запятые перед } или ]
temp_str = re.sub(r',\s*([}\]])', r'\1', temp_str)
try:
# Пробуем парсить как JSON
parsed = json.loads(f'[{temp_str}]')
# Восстанавливаем MongoDB операторы и конвертируем Extended JSON
def restore_ops(obj):
if isinstance(obj, dict):
# Проверяем на Extended JSON типы
if '__MONGO_EXT__oid__' in obj:
# Это ObjectId в Extended JSON формате: {"$oid": "..."}
return ObjectId(obj['__MONGO_EXT__oid__'])
new_dict = {}
for k, v in obj.items():
# Восстанавливаем ключи
if k.startswith('__MONGO_OP__') and k.endswith('__'):
new_key = '$' + k[12:-2]
elif k.startswith('__MONGO_EXT__') and k.endswith('__'):
new_key = '$' + k[13:-2]
else:
new_key = k
new_dict[new_key] = restore_ops(v)
return new_dict
elif isinstance(obj, list):
return [restore_ops(item) for item in obj]
elif isinstance(obj, str) and obj.startswith('__MONGO_OP__') and obj.endswith('__'):
return '$' + obj[12:-2]
else:
return obj
result = restore_ops(parsed)
return result if len(result) > 1 else (result[0] if result else None)
except json.JSONDecodeError:
# Fallback к ast.literal_eval
import ast
try:
return ast.literal_eval(f'({params_str})')
except:
return None
def execute_raw(self, query: str):
"""Выполняет сырые MongoDB команды"""
try:
# Разбор строки команды для определения типа операции
query = query.strip()
if query.startswith('db.'):
# Это команда в формате MongoDB shell
# Парсим команду вида: db.collection.operation(...)
# Поддержка Unicode (кириллицы) в названиях коллекций
import re
# Паттерн с поддержкой Unicode для названий коллекций
match = re.match(r'db\.([^\.\(\)]+)\.(\w+)\((.*)\)', query, re.DOTALL)
if match:
collection_name = match.group(1)
operation = match.group(2)
params_str = match.group(3).strip()
collection = self.db[collection_name]
if operation == 'find':
# Обработка find команды
if params_str:
params = self._parse_mongo_params(params_str)
if params is None:
return list(collection.find())
elif isinstance(params, dict):
return list(collection.find(params))
elif isinstance(params, (list, tuple)) and len(params) >= 1:
condition = params[0] if params[0] else {}
projection = params[1] if len(params) > 1 else None
return list(collection.find(condition, projection))
else:
return list(collection.find())
else:
return list(collection.find())
elif operation == 'insertOne':
params = self._parse_mongo_params(params_str)
if params:
doc = params if isinstance(params, dict) else params[0]
result = collection.insert_one(doc)
return {"inserted_id": str(result.inserted_id), "acknowledged": result.acknowledged}
raise ValueError(f"Invalid document format in insertOne: {params_str}")
elif operation == 'insertMany':
params = self._parse_mongo_params(params_str)
if params:
docs = params if isinstance(params, list) else [params]
result = collection.insert_many(docs)
return {"inserted_ids": [str(id) for id in result.inserted_ids], "acknowledged": result.acknowledged}
raise ValueError(f"Invalid documents format in insertMany: {params_str}")
elif operation in ('updateOne', 'updateMany'):
params = self._parse_mongo_params(params_str)
if params and isinstance(params, (list, tuple)) and len(params) >= 2:
filter_doc, update_doc = params[0], params[1]
# Если update_doc не содержит $-операторов, оборачиваем в $set
if not any(k.startswith('$') for k in update_doc.keys()):
update_doc = {'$set': update_doc}
if operation == 'updateOne':
result = collection.update_one(filter_doc, update_doc)
else:
result = collection.update_many(filter_doc, update_doc)
return {
"matched_count": result.matched_count,
"modified_count": result.modified_count,
"acknowledged": result.acknowledged
}
raise ValueError(f"Invalid parameters for {operation}: {params_str}")
elif operation == 'findOneAndUpdate':
params = self._parse_mongo_params(params_str)
if params and isinstance(params, (list, tuple)) and len(params) >= 2:
filter_doc, update_doc = params[0], params[1]
options = params[2] if len(params) > 2 else {}
# Если update_doc не содержит $-операторов, оборачиваем в $set
if not any(k.startswith('$') for k in update_doc.keys()):
update_doc = {'$set': update_doc}
# Опция returnNewDocument в PyMongo называется return_document
from pymongo import ReturnDocument
return_new = options.get('returnNewDocument', False) or options.get('returnDocument', False)
result = collection.find_one_and_update(
filter_doc,
update_doc,
return_document=ReturnDocument.AFTER if return_new else ReturnDocument.BEFORE
)
return result
raise ValueError(f"Invalid parameters for findOneAndUpdate: {params_str}")
elif operation == 'findOneAndReplace':
params = self._parse_mongo_params(params_str)
if params and isinstance(params, (list, tuple)) and len(params) >= 2:
filter_doc, replacement = params[0], params[1]
options = params[2] if len(params) > 2 else {}
from pymongo import ReturnDocument
return_new = options.get('returnNewDocument', False) or options.get('returnDocument', False)
result = collection.find_one_and_replace(
filter_doc,
replacement,
return_document=ReturnDocument.AFTER if return_new else ReturnDocument.BEFORE
)
return result
raise ValueError(f"Invalid parameters for findOneAndReplace: {params_str}")
elif operation == 'findOneAndDelete':
params = self._parse_mongo_params(params_str)
if params:
filter_doc = params if isinstance(params, dict) else params[0]
result = collection.find_one_and_delete(filter_doc)
return result
raise ValueError(f"Invalid parameters for findOneAndDelete: {params_str}")
elif operation in ('deleteOne', 'deleteMany'):
params = self._parse_mongo_params(params_str)
if params:
filter_doc = params if isinstance(params, dict) else params[0]
if operation == 'deleteOne':
result = collection.delete_one(filter_doc)
else:
result = collection.delete_many(filter_doc)
return {
"deleted_count": result.deleted_count,
"acknowledged": result.acknowledged
}
raise ValueError(f"Invalid filter format for {operation}: {params_str}")
elif operation == 'findOne':
params = self._parse_mongo_params(params_str)
if params is None:
return collection.find_one()
elif isinstance(params, dict):
return collection.find_one(params)
elif isinstance(params, (list, tuple)):
filter_doc = params[0] if params else {}
projection = params[1] if len(params) > 1 else None
return collection.find_one(filter_doc, projection)
return collection.find_one()
elif operation == 'countDocuments':
params = self._parse_mongo_params(params_str)
filter_doc = params if isinstance(params, dict) else {}
return {"count": collection.count_documents(filter_doc)}
elif operation == 'distinct':
params = self._parse_mongo_params(params_str)
if params and isinstance(params, (list, tuple)) and len(params) >= 1:
field = params[0]
filter_doc = params[1] if len(params) > 1 else {}
return collection.distinct(field, filter_doc)
raise ValueError(f"Invalid parameters for distinct: {params_str}")
elif operation == 'aggregate':
params = self._parse_mongo_params(params_str)
if params:
pipeline = params if isinstance(params, list) else [params]
return list(collection.aggregate(pipeline))
return list(collection.aggregate([]))
elif operation == 'replaceOne':
params = self._parse_mongo_params(params_str)
if params and isinstance(params, (list, tuple)) and len(params) >= 2:
filter_doc, replacement = params[0], params[1]
result = collection.replace_one(filter_doc, replacement)
return {
"matched_count": result.matched_count,
"modified_count": result.modified_count,
"acknowledged": result.acknowledged
}
raise ValueError(f"Invalid parameters for replaceOne: {params_str}")
else:
# Для других операций
raise NotImplementedError(f"Operation '{operation}' not supported in raw query: {query}")
else:
raise ValueError(f"Invalid MongoDB shell command format: {query}")
elif query.strip().upper().startswith('SHOW COLLECTIONS') or query.strip().upper().startswith('LIST COLLECTIONS'):
# Команда для получения списка коллекций
return {"collections": self.db.list_collection_names()}
elif query.strip().upper().startswith('DB.'):
# Другие команды базы данных
lower_query = query.lower()
if 'getcollectionnames' in lower_query or 'listcollectionnames' in lower_query:
return {"collections": self.db.list_collection_names()}
elif 'stats' in lower_query:
return self.db.command('dbStats')
elif 'collectionstats' in lower_query:
import re
match = re.search(r'db\.(\w+)\.stats\(\)', query)
if match:
collection_name = match.group(1)
return self.db.command('collStats', collection_name)
else:
# Для других типов команд, возможно, это MongoDB команды в JSON формате
# или команды агрегации
import json
try:
# Попробуем распарсить как JSON команду агрегации
cmd = json.loads(query)
if 'aggregate' in cmd:
collection_name = cmd['aggregate']
pipeline = cmd.get('pipeline', [])
collection = self.db[collection_name]
result = list(collection.aggregate(pipeline))
return {"result": result}
elif 'command' in cmd:
# Прямая MongoDB команда
return self.db.command(cmd['command'])
except:
pass
raise ValueError(f"Unsupported raw query format: {query}")
except Exception as e:
raise RuntimeError(f"Error executing raw MongoDB query '{query}': {str(e)}")