#!/usr/bin/env python3
"""
Загрузчик данных из JSON в PostgreSQL
Использует async для быстрой загрузки
"""
import sys
import json
import asyncio
import logging
from pathlib import Path
from datetime import datetime
import databases
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("JSONLoader")
# DATABASE_URL
DATABASE_URL = "postgresql://asx:asxAdmin1@192.168.0.111:5432/mydb"
async def load_json_to_database(json_path):
"""
Загружает данные из JSON в PostgreSQL базу
"""
try:
# 1. Читаем JSON
logger.info(f"📂 Загрузка JSON: {Path(json_path).name}")
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
accruals = data.get("data", [])
metadata = data.get("metadata", {})
total_count = len(accruals)
logger.info(f"📊 Данные из JSON: {metadata}")
if total_count == 0:
logger.error("❌ Нет данных в JSON файле")
return {
"status": "error",
"message": "No data in JSON",
"total": 0,
"successful": 0,
"failed": 0,
}
# 2. Подключаемся к БД
database = databases.Database(DATABASE_URL)
await database.connect()
logger.info(f"🔌 Подключение к БД: 192.168.0.134:5432")
successful = 0
failed = 0
# 3. Вставляем записи батчами
batch_size = 100
for i in range(0, total_count, batch_size):
batch = accruals[i:i + batch_size]
try:
# SQL для UPSERT (обновляет сумму при дубликате)
query = """
INSERT INTO accruals (
id_accrual,
accrual_date,
service_group,
accrual_type,
article,
sku,
product_name,
quantity,
seller_price,
order_received_date,
sales_platform,
work_scheme,
ozon_fee_pct,
localization_index_pct,
avg_delivery_hours,
total_amount_rub
) VALUES (:id_accrual, :accrual_date, :service_group, :accrual_type, :article, :sku, :product_name, :quantity, :seller_price, :order_received_date, :sales_platform, :work_scheme, :ozon_fee_pct, :localization_index_pct, :avg_delivery_hours, :total_amount_rub)
ON CONFLICT (id_accrual) DO UPDATE SET total_amount_rub = accruals.total_amount_rub + EXCLUDED.total_amount_rub
"""
# Вставляем каждую запись
for accrual in batch:
try:
# Конвертируем ISO строки в datetime объекты
if accrual.get("accrual_date"):
if isinstance(accrual["accrual_date"], str):
accrual["accrual_date"] = datetime.fromisoformat(accrual["accrual_date"])
if accrual.get("order_received_date"):
if isinstance(accrual["order_received_date"], str):
accrual["order_received_date"] = datetime.fromisoformat(accrual["order_received_date"])
await database.execute(query=query, values=accrual)
successful += 1
except Exception as e:
logger.warning(f"⚠️ Ошибка при вставке {accrual.get('id_accrual')}: {e}")
failed += 1
# Логируем прогресс
progress = min(i + batch_size, total_count)
pct = (progress / total_count) * 100
logger.info(f"📊 Прогресс: {progress}/{total_count} ({pct:.1f}%) - успешно: {successful}, ошибок: {failed}")
except Exception as e:
logger.error(f"❌ Ошибка при обработке батча: {e}")
failed += len(batch)
# Проверяем финальное состояние
result = await database.fetch_one("SELECT COUNT(*) as cnt, SUM(total_amount_rub) as sum FROM accruals")
final_count = result['cnt']
final_sum = result['sum']
logger.info(f"✅ Загрузка завершена!")
logger.info(f" Обработано: {total_count}")
logger.info(f" Успешно загружено: {successful}")
logger.info(f" Ошибок: {failed}")
logger.info(f" В БД сейчас: {final_count} записей")
logger.info(f" Сумма всех начислений: {final_sum}")
await database.disconnect()
return {
"status": "success",
"message": "Загрузка завершена",
"total": total_count,
"successful": successful,
"failed": failed,
"db_count": final_count,
"db_sum": float(final_sum) if final_sum else 0,
}
except Exception as e:
logger.error(f"❌ Критическая ошибка: {e}", exc_info=True)
return {
"status": "error",
"message": str(e),
"total": 0,
"successful": 0,
"failed": 0,
}
async def main():
"""Точка входа"""
if len(sys.argv) < 2:
print("Использование: python3 json_loader.py <json_file>")
sys.exit(1)
json_file = sys.argv[1]
if not Path(json_file).exists():
print(f"❌ Файл не найден: {json_file}")
sys.exit(1)
result = await load_json_to_database(json_file)
print("\n" + "=" * 60)
print("Загрузка завершена")
print("=" * 60)
print(f"Статус: {result['status']}")
print(f"Обработано: {result['total']}")
print(f"Успешно загружено: {result['successful']}")
print(f"Ошибок: {result['failed']}")
print(f"В БД сейчас: {result.get('db_count', 'N/A')}")
print(f"Сумма начислений: {result.get('db_sum', 'N/A')}")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())