Skip to main content
Glama
ASXRND

MCP Weather & Accruals Server

by ASXRND
test_accruals_api.py7.17 kB
#!/usr/bin/env python3 """ Тестирование новых endpoints FastAPI для начисления Этот скрипт помогает проверить: 1. Подключение к удалённому FastAPI 2. Создание записей о начислении 3. Получение списка начисления 4. Получение статистики """ import httpx import json from datetime import date, datetime from decimal import Decimal import sys # Конфигурация ACCRUALS_API_URL = "http://192.168.0.137:8000" # Измените на ваш URL TIMEOUT = 10 async def test_health(): """Проверка health endpoint""" print("=" * 60) print("1️⃣ Проверка здоровья API") print("=" * 60) try: async with httpx.AsyncClient(timeout=TIMEOUT) as client: response = await client.get(f"{ACCRUALS_API_URL}/health") print(f"Status: {response.status_code}") print(f"Response: {response.json()}") return response.status_code == 200 except Exception as e: print(f"❌ Ошибка: {e}") return False async def test_create_accrual(): """Тестирование создания новой записи о начислении""" print("\n" + "=" * 60) print("2️⃣ Создание новой записи о начислении") print("=" * 60) accrual_data = { "id_accrual": f"TEST_{datetime.now().timestamp()}", "accrual_date": date.today().isoformat(), "service_group": "Доставка", "accrual_type": "Комиссия", "article": "ABC123", "sku": "SKU-001", "product_name": "Тестовый товар", "quantity": 5, "seller_price": 1500.00, "order_received_date": date.today().isoformat(), "sales_platform": "Ozon", "work_scheme": "FBS", "ozon_fee_pct": 15.0, "localization_index_pct": 100.0, "avg_delivery_hours": 48.0, "total_amount_rub": "1234.56" } print(f"Отправка данных:") print(json.dumps(accrual_data, indent=2, ensure_ascii=False, default=str)) try: async with httpx.AsyncClient(timeout=TIMEOUT) as client: response = await client.post( f"{ACCRUALS_API_URL}/accruals/", json=accrual_data ) print(f"\nStatus: {response.status_code}") print(f"Response:") print(json.dumps(response.json(), indent=2, ensure_ascii=False, default=str)) return response.status_code in [200, 201] except Exception as e: print(f"❌ Ошибка: {e}") return False async def test_get_accruals(): """Тестирование получения списка начисления""" print("\n" + "=" * 60) print("3️⃣ Получение списка начисления") print("=" * 60) params = { "skip": 0, "limit": 10, "sales_platform": "Ozon" } print(f"Параметры запроса: {params}") try: async with httpx.AsyncClient(timeout=TIMEOUT) as client: response = await client.get( f"{ACCRUALS_API_URL}/accruals/", params=params ) print(f"\nStatus: {response.status_code}") data = response.json() print(f"Найдено записей: {len(data)}") if data: print(f"Первая запись:") print(json.dumps(data[0], indent=2, ensure_ascii=False, default=str)) return response.status_code == 200 except Exception as e: print(f"❌ Ошибка: {e}") return False async def test_get_stats_summary(): """Тестирование получения общей статистики""" print("\n" + "=" * 60) print("4️⃣ Получение общей статистики") print("=" * 60) try: async with httpx.AsyncClient(timeout=TIMEOUT) as client: response = await client.get(f"{ACCRUALS_API_URL}/stats/summary") print(f"Status: {response.status_code}") print(f"Response:") print(json.dumps(response.json(), indent=2, ensure_ascii=False, default=str)) return response.status_code == 200 except Exception as e: print(f"❌ Ошибка: {e}") return False async def test_get_filtered_stats(): """Тестирование получения статистики с фильтрацией""" print("\n" + "=" * 60) print("5️⃣ Получение статистики с фильтрацией") print("=" * 60) params = { "sales_platform": "Ozon", "service_group": "Доставка" } print(f"Параметры фильтра: {params}") try: async with httpx.AsyncClient(timeout=TIMEOUT) as client: response = await client.get( f"{ACCRUALS_API_URL}/stats/accruals", params=params ) print(f"\nStatus: {response.status_code}") print(f"Response:") print(json.dumps(response.json(), indent=2, ensure_ascii=False, default=str)) return response.status_code == 200 except Exception as e: print(f"❌ Ошибка: {e}") return False async def main(): """Запуск всех тестов""" print("\n") print("🧪 ТЕСТИРОВАНИЕ FastAPI ENDPOINTS ДЛЯ НАЧИСЛЕНИЯ") print(f"API URL: {ACCRUALS_API_URL}") print("=" * 60) tests = [ ("Health Check", test_health), ("Создание начисления", test_create_accrual), ("Получение списка", test_get_accruals), ("Общая статистика", test_get_stats_summary), ("Фильтрованная статистика", test_get_filtered_stats), ] results = {} for test_name, test_func in tests: try: result = await test_func() results[test_name] = "✅ ПРОЙДЕН" if result else "❌ ПРОВАЛЕН" except Exception as e: print(f"❌ Исключение в тесте: {e}") results[test_name] = f"❌ ОШИБКА: {e}" # Итоговый отчёт print("\n" + "=" * 60) print("📊 ИТОГОВЫЙ ОТЧЁТ") print("=" * 60) for test_name, result in results.items(): print(f"{test_name}: {result}") passed = sum(1 for r in results.values() if "✅" in r) total = len(results) print(f"\nПройдено: {passed}/{total}") if passed == total: print("\n🎉 Все тесты пройдены успешно!") return 0 else: print(f"\n⚠️ {total - passed} тестов провалено") return 1 if __name__ == "__main__": import asyncio exit_code = asyncio.run(main()) sys.exit(exit_code)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ASXRND/MCP_deepseek'

If you have feedback or need assistance with the MCP directory API, please join our Discord server