Skip to main content
Glama
ASXRND

MCP Weather & Accruals Server

by ASXRND
json_loader.py7.06 kB
#!/usr/bin/env python3 """ Загрузчик данных из JSON в PostgreSQL Использует async для быстрой загрузки """ import sys import json import asyncio import logging from pathlib import Path from datetime import datetime import databases # Настройка логирования logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("JSONLoader") # DATABASE_URL DATABASE_URL = "postgresql://asx:asxAdmin1@192.168.0.111:5432/mydb" async def load_json_to_database(json_path): """ Загружает данные из JSON в PostgreSQL базу """ try: # 1. Читаем JSON logger.info(f"📂 Загрузка JSON: {Path(json_path).name}") with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) accruals = data.get("data", []) metadata = data.get("metadata", {}) total_count = len(accruals) logger.info(f"📊 Данные из JSON: {metadata}") if total_count == 0: logger.error("❌ Нет данных в JSON файле") return { "status": "error", "message": "No data in JSON", "total": 0, "successful": 0, "failed": 0, } # 2. Подключаемся к БД database = databases.Database(DATABASE_URL) await database.connect() logger.info(f"🔌 Подключение к БД: 192.168.0.134:5432") successful = 0 failed = 0 # 3. Вставляем записи батчами batch_size = 100 for i in range(0, total_count, batch_size): batch = accruals[i:i + batch_size] try: # SQL для UPSERT (обновляет сумму при дубликате) query = """ INSERT INTO accruals ( id_accrual, accrual_date, service_group, accrual_type, article, sku, product_name, quantity, seller_price, order_received_date, sales_platform, work_scheme, ozon_fee_pct, localization_index_pct, avg_delivery_hours, total_amount_rub ) VALUES (:id_accrual, :accrual_date, :service_group, :accrual_type, :article, :sku, :product_name, :quantity, :seller_price, :order_received_date, :sales_platform, :work_scheme, :ozon_fee_pct, :localization_index_pct, :avg_delivery_hours, :total_amount_rub) ON CONFLICT (id_accrual) DO UPDATE SET total_amount_rub = accruals.total_amount_rub + EXCLUDED.total_amount_rub """ # Вставляем каждую запись for accrual in batch: try: # Конвертируем ISO строки в datetime объекты if accrual.get("accrual_date"): if isinstance(accrual["accrual_date"], str): accrual["accrual_date"] = datetime.fromisoformat(accrual["accrual_date"]) if accrual.get("order_received_date"): if isinstance(accrual["order_received_date"], str): accrual["order_received_date"] = datetime.fromisoformat(accrual["order_received_date"]) await database.execute(query=query, values=accrual) successful += 1 except Exception as e: logger.warning(f"⚠️ Ошибка при вставке {accrual.get('id_accrual')}: {e}") failed += 1 # Логируем прогресс progress = min(i + batch_size, total_count) pct = (progress / total_count) * 100 logger.info(f"📊 Прогресс: {progress}/{total_count} ({pct:.1f}%) - успешно: {successful}, ошибок: {failed}") except Exception as e: logger.error(f"❌ Ошибка при обработке батча: {e}") failed += len(batch) # Проверяем финальное состояние result = await database.fetch_one("SELECT COUNT(*) as cnt, SUM(total_amount_rub) as sum FROM accruals") final_count = result['cnt'] final_sum = result['sum'] logger.info(f"✅ Загрузка завершена!") logger.info(f" Обработано: {total_count}") logger.info(f" Успешно загружено: {successful}") logger.info(f" Ошибок: {failed}") logger.info(f" В БД сейчас: {final_count} записей") logger.info(f" Сумма всех начислений: {final_sum}") await database.disconnect() return { "status": "success", "message": "Загрузка завершена", "total": total_count, "successful": successful, "failed": failed, "db_count": final_count, "db_sum": float(final_sum) if final_sum else 0, } except Exception as e: logger.error(f"❌ Критическая ошибка: {e}", exc_info=True) return { "status": "error", "message": str(e), "total": 0, "successful": 0, "failed": 0, } async def main(): """Точка входа""" if len(sys.argv) < 2: print("Использование: python3 json_loader.py <json_file>") sys.exit(1) json_file = sys.argv[1] if not Path(json_file).exists(): print(f"❌ Файл не найден: {json_file}") sys.exit(1) result = await load_json_to_database(json_file) print("\n" + "=" * 60) print("Загрузка завершена") print("=" * 60) print(f"Статус: {result['status']}") print(f"Обработано: {result['total']}") print(f"Успешно загружено: {result['successful']}") print(f"Ошибок: {result['failed']}") print(f"В БД сейчас: {result.get('db_count', 'N/A')}") print(f"Сумма начислений: {result.get('db_sum', 'N/A')}") print("=" * 60) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ASXRND/MCP_deepseek'

If you have feedback or need assistance with the MCP directory API, please join our Discord server