#!/usr/bin/env python3
"""
Тестирование новых endpoints FastAPI для начисления
Этот скрипт помогает проверить:
1. Подключение к удалённому FastAPI
2. Создание записей о начислении
3. Получение списка начисления
4. Получение статистики
"""
import httpx
import json
from datetime import date, datetime
from decimal import Decimal
import sys
# Конфигурация
ACCRUALS_API_URL = "http://192.168.0.137:8000" # Измените на ваш URL
TIMEOUT = 10
async def test_health():
"""Проверка health endpoint"""
print("=" * 60)
print("1️⃣ Проверка здоровья API")
print("=" * 60)
try:
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
response = await client.get(f"{ACCRUALS_API_URL}/health")
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")
return response.status_code == 200
except Exception as e:
print(f"❌ Ошибка: {e}")
return False
async def test_create_accrual():
"""Тестирование создания новой записи о начислении"""
print("\n" + "=" * 60)
print("2️⃣ Создание новой записи о начислении")
print("=" * 60)
accrual_data = {
"id_accrual": f"TEST_{datetime.now().timestamp()}",
"accrual_date": date.today().isoformat(),
"service_group": "Доставка",
"accrual_type": "Комиссия",
"article": "ABC123",
"sku": "SKU-001",
"product_name": "Тестовый товар",
"quantity": 5,
"seller_price": 1500.00,
"order_received_date": date.today().isoformat(),
"sales_platform": "Ozon",
"work_scheme": "FBS",
"ozon_fee_pct": 15.0,
"localization_index_pct": 100.0,
"avg_delivery_hours": 48.0,
"total_amount_rub": "1234.56"
}
print(f"Отправка данных:")
print(json.dumps(accrual_data, indent=2, ensure_ascii=False, default=str))
try:
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
response = await client.post(
f"{ACCRUALS_API_URL}/accruals/",
json=accrual_data
)
print(f"\nStatus: {response.status_code}")
print(f"Response:")
print(json.dumps(response.json(), indent=2, ensure_ascii=False, default=str))
return response.status_code in [200, 201]
except Exception as e:
print(f"❌ Ошибка: {e}")
return False
async def test_get_accruals():
"""Тестирование получения списка начисления"""
print("\n" + "=" * 60)
print("3️⃣ Получение списка начисления")
print("=" * 60)
params = {
"skip": 0,
"limit": 10,
"sales_platform": "Ozon"
}
print(f"Параметры запроса: {params}")
try:
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
response = await client.get(
f"{ACCRUALS_API_URL}/accruals/",
params=params
)
print(f"\nStatus: {response.status_code}")
data = response.json()
print(f"Найдено записей: {len(data)}")
if data:
print(f"Первая запись:")
print(json.dumps(data[0], indent=2, ensure_ascii=False, default=str))
return response.status_code == 200
except Exception as e:
print(f"❌ Ошибка: {e}")
return False
async def test_get_stats_summary():
"""Тестирование получения общей статистики"""
print("\n" + "=" * 60)
print("4️⃣ Получение общей статистики")
print("=" * 60)
try:
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
response = await client.get(f"{ACCRUALS_API_URL}/stats/summary")
print(f"Status: {response.status_code}")
print(f"Response:")
print(json.dumps(response.json(), indent=2, ensure_ascii=False, default=str))
return response.status_code == 200
except Exception as e:
print(f"❌ Ошибка: {e}")
return False
async def test_get_filtered_stats():
"""Тестирование получения статистики с фильтрацией"""
print("\n" + "=" * 60)
print("5️⃣ Получение статистики с фильтрацией")
print("=" * 60)
params = {
"sales_platform": "Ozon",
"service_group": "Доставка"
}
print(f"Параметры фильтра: {params}")
try:
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
response = await client.get(
f"{ACCRUALS_API_URL}/stats/accruals",
params=params
)
print(f"\nStatus: {response.status_code}")
print(f"Response:")
print(json.dumps(response.json(), indent=2, ensure_ascii=False, default=str))
return response.status_code == 200
except Exception as e:
print(f"❌ Ошибка: {e}")
return False
async def main():
"""Запуск всех тестов"""
print("\n")
print("🧪 ТЕСТИРОВАНИЕ FastAPI ENDPOINTS ДЛЯ НАЧИСЛЕНИЯ")
print(f"API URL: {ACCRUALS_API_URL}")
print("=" * 60)
tests = [
("Health Check", test_health),
("Создание начисления", test_create_accrual),
("Получение списка", test_get_accruals),
("Общая статистика", test_get_stats_summary),
("Фильтрованная статистика", test_get_filtered_stats),
]
results = {}
for test_name, test_func in tests:
try:
result = await test_func()
results[test_name] = "✅ ПРОЙДЕН" if result else "❌ ПРОВАЛЕН"
except Exception as e:
print(f"❌ Исключение в тесте: {e}")
results[test_name] = f"❌ ОШИБКА: {e}"
# Итоговый отчёт
print("\n" + "=" * 60)
print("📊 ИТОГОВЫЙ ОТЧЁТ")
print("=" * 60)
for test_name, result in results.items():
print(f"{test_name}: {result}")
passed = sum(1 for r in results.values() if "✅" in r)
total = len(results)
print(f"\nПройдено: {passed}/{total}")
if passed == total:
print("\n🎉 Все тесты пройдены успешно!")
return 0
else:
print(f"\n⚠️ {total - passed} тестов провалено")
return 1
if __name__ == "__main__":
import asyncio
exit_code = asyncio.run(main())
sys.exit(exit_code)