#!/usr/bin/env python3
"""
Модуль для работы с Deepseek API.
Содержит функции для взаимодействия с Deepseek и вызова инструментов для получения погоды и работы с файлами.
"""
import json
import logging
import os
from typing import Optional, Any, List, Dict
import httpx
import psycopg2
from psycopg2.extras import RealDictCursor
from modules.config import Config
from modules.weather_api import get_current_weather, get_forecast
from modules.file_api import (
read_file,
create_file,
update_file,
append_to_file,
delete_file,
list_files,
ALLOWED_BASE_DIR,
)
logger = logging.getLogger("DeepseekAPI")
# API URL для работы с БД (берётся из конфига)
ACCRUALS_API_URL = Config.ACCRUALS_API_URL
# Параметры БД
DB_URL = Config.DATABASE_URL # postgresql://user:pass@host:port/db
async def query_accruals_db(query: str) -> str:
"""
Прямое подключение к PostgreSQL для выполнения запросов.
Args:
query: SQL запрос
Returns:
JSON ответ в виде строки или сообщение об ошибке
"""
try:
# Парсим DATABASE_URL
# postgresql://asx:asxAdmin1@192.168.0.111:5432/mydb
from urllib.parse import urlparse
parsed = urlparse(DB_URL)
conn = psycopg2.connect(
host=parsed.hostname,
port=parsed.port or 5432,
database=parsed.path.lstrip('/'),
user=parsed.username,
password=parsed.password,
connect_timeout=10
)
cur = conn.cursor(cursor_factory=RealDictCursor)
cur.execute(query)
result = cur.fetchall()
cur.close()
conn.close()
if not result:
return "Нет данных"
# Преобразуем результат в читаемый формат
if len(result) == 1:
# Для одной строки (например, статистика)
row = dict(result[0])
return json.dumps(row, ensure_ascii=False, indent=2, default=str)
else:
# Для списка записей
result_list = [dict(row) for row in result]
return json.dumps(result_list, ensure_ascii=False, indent=2, default=str)
except psycopg2.OperationalError as e:
error_msg = f"❌ Ошибка подключения к БД: {str(e)}"
logger.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"❌ Ошибка при запросе к БД: {str(e)}"
logger.error(error_msg)
return error_msg
# ============================================================================
# Вспомогательные функции для работы с API БД
# ============================================================================
async def query_accruals_api(endpoint: str, params: dict = None) -> str:
"""
Вспомогательная функция для вызова API БД (modules/api.py).
Args:
endpoint: Путь к endpoint (например, "/accruals" или "/stats/summary")
params: Query параметры
Returns:
JSON ответ в виде строки или сообщение об ошибке
"""
try:
url = f"{ACCRUALS_API_URL}{endpoint}"
logger.debug(f"Запрос к API БД: {url} с параметрами {params}")
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
# Преобразуем ответ в читаемую строку
if isinstance(data, list):
if not data:
return "Нет данных"
# Для списков показываем первые 5 элементов
result = f"Найдено {len(data)} записей:\n"
for i, item in enumerate(data[:5]):
result += f"\n{i+1}. {item}\n"
if len(data) > 5:
result += f"\n... и ещё {len(data) - 5} записей"
return result
else:
return json.dumps(data, ensure_ascii=False, indent=2)
except httpx.ConnectError as e:
error_msg = f"❌ Ошибка подключения к API БД ({ACCRUALS_API_URL}): {str(e)}"
logger.error(error_msg)
return error_msg
except httpx.HTTPStatusError as e:
error_msg = f"❌ HTTP ошибка {e.response.status_code}: {e.response.text}"
logger.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"❌ Ошибка при запросе к API БД: {str(e)}"
logger.error(error_msg)
return error_msg
def get_weather_tools() -> List[Dict[str, Any]]:
"""
Возвращает определение инструментов для Deepseek (function calling).
Инструменты позволяют Deepseek запрашивать реальные данные о погоде и работать с файлами.
Returns:
Список с определением инструментов (JSON schema)
"""
return [
{
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Получить текущую погоду для города",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "Название города (на русском или английском)"
},
"units": {
"type": "string",
"enum": ["metric", "imperial"],
"description": "Система измерения (по умолчанию metric для Цельсия)"
}
},
"required": ["city"]
}
}
},
{
"type": "function",
"function": {
"name": "get_forecast",
"description": "Получить прогноз погоды на несколько дней",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "Название города"
},
"days": {
"type": "integer",
"description": "Количество дней для прогноза (1-5, по умолчанию 3)"
},
"units": {
"type": "string",
"enum": ["metric", "imperial"],
"description": "Система измерения (по умолчанию metric)"
}
},
"required": ["city"]
}
}
},
{
"type": "function",
"function": {
"name": "read_file",
"description": "Читает содержимое файла",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Путь к файлу. Может быть абсолютным или относительным."
},
"base_dir": {
"type": "string",
"description": "(необязательно) Базовая директория для относительных путей. По умолчанию используется безопасная директория на хосте."
}
},
"required": ["file_path"]
}
}
},
{
"type": "function",
"function": {
"name": "create_file",
"description": "Создаёт новый файл с содержимым",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Путь к файлу. Может быть абсолютным или относительным."
},
"base_dir": {
"type": "string",
"description": "(необязательно) Базовая директория для относительных путей."
},
"content": {
"type": "string",
"description": "Содержимое файла"
},
"overwrite": {
"type": "boolean",
"description": "Перезаписать ли файл, если он существует (по умолчанию false)"
}
},
"required": ["file_path", "content"]
}
}
},
{
"type": "function",
"function": {
"name": "update_file",
"description": "Обновляет содержимое существующего файла",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Путь к файлу. Может быть абсолютным или относительным."
},
"base_dir": {
"type": "string",
"description": "(необязательно) Базовая директория для относительных путей."
},
"content": {
"type": "string",
"description": "Новое содержимое файла"
}
},
"required": ["file_path", "content"]
}
}
},
{
"type": "function",
"function": {
"name": "append_to_file",
"description": "Добавляет содержимое в конец файла",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Путь к файлу. Может быть абсолютным или относительным."
},
"base_dir": {
"type": "string",
"description": "(необязательно) Базовая директория для относительных путей."
},
"content": {
"type": "string",
"description": "Содержимое для добавления"
}
},
"required": ["file_path", "content"]
}
}
},
{
"type": "function",
"function": {
"name": "delete_file",
"description": "Удаляет файл",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Путь к файлу. Может быть абсолютным или относительным."
},
"base_dir": {
"type": "string",
"description": "(необязательно) Базовая директория для относительных путей."
}
},
"required": ["file_path"]
}
}
},
{
"type": "function",
"function": {
"name": "list_files",
"description": "Выводит список файлов в директории",
"parameters": {
"type": "object",
"properties": {
"directory": {
"type": "string",
"description": "Путь к директории (относительно базовой директории, по умолчанию .)"
},
"base_dir": {
"type": "string",
"description": "(необязательно) Базовая директория для относительных путей."
}
},
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "query_accruals",
"description": "Запросить данные о начислениях из PostgreSQL БД (Ozon отчёты, ноябрь 2025)",
"parameters": {
"type": "object",
"properties": {
"query_type": {
"type": "string",
"enum": ["list", "stats", "summary"],
"description": "Тип запроса: 'list' - список начислений, 'stats' - статистика с фильтрацией, 'summary' - общая сводка"
},
"service_group": {
"type": "string",
"description": "(опционально) Фильтр по группе услуг (например, 'Услуги доставки', 'Продажи', 'Вознаграждение Ozon')"
},
"sales_platform": {
"type": "string",
"description": "(опционально) Фильтр по платформе продажи (например, 'Ozon', 'Wildberries')"
},
"accrual_type": {
"type": "string",
"description": "(опционально) Фильтр по типу начисления (например, 'Комиссия', 'Реклама')"
},
"limit": {
"type": "integer",
"description": "(опционально) Максимум записей для возврата (по умолчанию 100)"
},
"skip": {
"type": "integer",
"description": "(опционально) Количество записей для пропуска (для пагинации)"
}
},
"required": ["query_type"]
}
}
}
,
{
"type": "function",
"function": {
"name": "create_accrual",
"description": "Создать новое начисление в БД",
"parameters": {
"type": "object",
"properties": {
"id_accrual": {"type": "string"},
"accrual_date": {"type": "string", "description": "Дата в формате YYYY-MM-DD"},
"service_group": {"type": "string"},
"accrual_type": {"type": "string"},
"article": {"type": "string"},
"sku": {"type": "string"},
"product_name": {"type": "string"},
"quantity": {"type": "integer"},
"seller_price": {"type": "number"},
"order_received_date": {"type": "string"},
"sales_platform": {"type": "string"},
"work_scheme": {"type": "string"},
"ozon_fee_pct": {"type": "number"},
"localization_index_pct": {"type": "number"},
"avg_delivery_hours": {"type": "number"},
"total_amount_rub": {"type": "number"}
},
"required": ["id_accrual", "accrual_date", "service_group", "accrual_type", "sales_platform", "total_amount_rub"]
}
}
},
{
"type": "function",
"function": {
"name": "update_accrual",
"description": "Обновить существующее начисление по id_accrual",
"parameters": {
"type": "object",
"properties": {
"id_accrual": {"type": "string"},
"accrual_date": {"type": "string"},
"service_group": {"type": "string"},
"accrual_type": {"type": "string"},
"article": {"type": "string"},
"sku": {"type": "string"},
"product_name": {"type": "string"},
"quantity": {"type": "integer"},
"seller_price": {"type": "number"},
"order_received_date": {"type": "string"},
"sales_platform": {"type": "string"},
"work_scheme": {"type": "string"},
"ozon_fee_pct": {"type": "number"},
"localization_index_pct": {"type": "number"},
"avg_delivery_hours": {"type": "number"},
"total_amount_rub": {"type": "number"}
},
"required": ["id_accrual"]
}
}
},
{
"type": "function",
"function": {
"name": "delete_accrual",
"description": "Удалить начисление по id_accrual",
"parameters": {
"type": "object",
"properties": {
"id_accrual": {"type": "string"}
},
"required": ["id_accrual"]
}
}
}
]
async def call_deepseek_with_tools(user_query: str) -> str:
"""
Вызывает Deepseek API с поддержкой инструментов (function calling).
Deepseek может запросить реальные данные о погоде через инструменты.
Args:
user_query: Запрос пользователя о погоде
Returns:
Ответ от Deepseek, дополненный реальными данными о погоде
"""
logger.info(f"Отправка запроса в Deepseek: '{user_query}'")
tools = get_weather_tools()
messages = [
{
"role": "user",
"content": user_query
}
]
headers = {
"Authorization": f"Bearer {Config.DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "deepseek-chat",
"messages": messages,
"tools": tools,
"temperature": 0.7,
"max_tokens": 2000,
"stream": False
}
try:
async with httpx.AsyncClient() as client:
logger.debug(f"Отправка запроса с инструментами к Deepseek")
response = await client.post(
Config.DEEPSEEK_BASE_URL,
json=payload,
headers=headers,
timeout=Config.API_TIMEOUT
)
response.raise_for_status()
result = response.json()
logger.debug(f"Получен ответ от Deepseek")
# Обрабатываем ответ и вызовы инструментов
return await process_deepseek_response(result, messages)
except httpx.HTTPStatusError as e:
error_msg = f"HTTP ошибка {e.response.status_code}"
logger.error(error_msg)
return f"❌ Ошибка при обращении к Deepseek API: {error_msg}"
except Exception as e:
error_msg = f"Ошибка при работе с Deepseek: {str(e)}"
logger.error(error_msg)
return f"❌ Внутренняя ошибка: {error_msg}"
async def process_deepseek_response(response: dict, messages: list) -> str:
"""
Обрабатывает ответ от Deepseek, включая вызовы инструментов (function calling).
Если Deepseek вызывает инструмент, мы выполняем его и отправляем результаты обратно.
Args:
response: JSON ответ от Deepseek API
messages: История сообщений для контекста
Returns:
Финальный ответ от Deepseek с результатами инструментов
"""
while True:
choice = response.get("choices", [{}])[0]
message = choice.get("message", {})
# Если нет вызовов инструментов, возвращаем текстовый ответ
if "tool_calls" not in message or not message["tool_calls"]:
text_response = message.get("content", "❌ Пустой ответ от Deepseek")
logger.info(f"Финальный ответ от Deepseek: {text_response[:100]}...")
return text_response
# Обрабатываем вызовы инструментов
tool_results = []
for tool_call in message.get("tool_calls", []):
tool_name = tool_call.get("function", {}).get("name")
tool_args = json.loads(tool_call.get("function", {}).get("arguments", "{}"))
logger.info(f"Deepseek вызвал инструмент: {tool_name}({tool_args})")
# Выполняем инструмент
if tool_name == "get_current_weather":
result = await get_current_weather(
tool_args.get("city"),
tool_args.get("units", "metric")
)
elif tool_name == "get_forecast":
result = await get_forecast(
tool_args.get("city"),
tool_args.get("days", 3),
tool_args.get("units", "metric")
)
elif tool_name == "read_file":
# allow caller to specify base_dir (for absolute/relative control)
result = await read_file(
tool_args.get("file_path", ""),
tool_args.get("base_dir", ALLOWED_BASE_DIR),
)
elif tool_name == "create_file":
result = await create_file(
tool_args.get("file_path", ""),
tool_args.get("content", ""),
base_dir=tool_args.get("base_dir", ALLOWED_BASE_DIR),
overwrite=tool_args.get("overwrite", False)
)
elif tool_name == "update_file":
result = await update_file(
tool_args.get("file_path", ""),
tool_args.get("content", ""),
base_dir=tool_args.get("base_dir", ALLOWED_BASE_DIR),
)
elif tool_name == "append_to_file":
result = await append_to_file(
tool_args.get("file_path", ""),
tool_args.get("content", ""),
base_dir=tool_args.get("base_dir", ALLOWED_BASE_DIR),
)
elif tool_name == "delete_file":
result = await delete_file(
tool_args.get("file_path", ""),
base_dir=tool_args.get("base_dir", ALLOWED_BASE_DIR),
)
elif tool_name == "list_files":
result = await list_files(
tool_args.get("directory", "."),
base_dir=tool_args.get("base_dir", ALLOWED_BASE_DIR),
)
elif tool_name == "query_accruals":
query_type = tool_args.get("query_type", "summary")
params = {}
# Добавляем все опциональные параметры, если они предоставлены
if tool_args.get("service_group"):
params["service_group"] = tool_args["service_group"]
if tool_args.get("sales_platform"):
params["sales_platform"] = tool_args["sales_platform"]
if tool_args.get("accrual_type"):
params["accrual_type"] = tool_args["accrual_type"]
if tool_args.get("limit"):
params["limit"] = tool_args["limit"]
if tool_args.get("skip"):
params["skip"] = tool_args["skip"]
# Строим SQL запрос в зависимости от типа
if query_type == "summary":
# Общая статистика по всем начислениям ноября
sql_query = """
SELECT
COUNT(*) as total_count,
SUM(total_amount_rub) as total_amount,
AVG(total_amount_rub) as avg_amount,
MIN(total_amount_rub) as min_amount,
MAX(total_amount_rub) as max_amount,
COUNT(DISTINCT sales_platform) as platforms_count,
COUNT(DISTINCT accrual_type) as types_count
FROM accruals
"""
result = await query_accruals_db(sql_query)
elif query_type == "list":
# Список начислений с фильтрацией и пагинацией
limit = params.get("limit", 50)
skip = params.get("skip", 0)
where_clauses = []
if "service_group" in params:
where_clauses.append(f"service_group = '{params['service_group']}'")
if "sales_platform" in params:
where_clauses.append(f"sales_platform = '{params['sales_platform']}'")
if "accrual_type" in params:
where_clauses.append(f"accrual_type = '{params['accrual_type']}'")
where_clause = " AND ".join(where_clauses) if where_clauses else "1=1"
sql_query = f"""
SELECT
id, id_accrual, accrual_date, service_group, sales_platform,
accrual_type, total_amount_rub, quantity, unit_price,
created_at, updated_at
FROM accruals
WHERE {where_clause}
ORDER BY accrual_date DESC, id DESC
LIMIT {limit} OFFSET {skip}
"""
result = await query_accruals_db(sql_query)
elif query_type in ["stats", "accruals"]:
# Статистика с фильтрацией
where_clauses = ["1=1"]
if "service_group" in params:
where_clauses.append(f"service_group = '{params['service_group']}'")
if "sales_platform" in params:
where_clauses.append(f"sales_platform = '{params['sales_platform']}'")
if "accrual_type" in params:
where_clauses.append(f"accrual_type = '{params['accrual_type']}'")
where_clause = " AND ".join(where_clauses)
sql_query = f"""
SELECT
sales_platform,
accrual_type,
COUNT(*) as count,
SUM(total_amount_rub) as total_amount,
AVG(total_amount_rub) as avg_amount
FROM accruals
WHERE {where_clause}
GROUP BY sales_platform, accrual_type
ORDER BY total_amount DESC
"""
result = await query_accruals_db(sql_query)
elif query_type == "aggregate":
# Агрегирование данных по выбранному полю
aggregation = tool_args.get("aggregation", "sum") # sum, count, avg, min, max
field = tool_args.get("field", "total_amount_rub")
# Строим WHERE условия для фильтрации
where_clauses = ["1=1"]
if "service_group" in params:
where_clauses.append(f"service_group = '{params['service_group']}'")
if "sales_platform" in params:
where_clauses.append(f"sales_platform = '{params['sales_platform']}'")
if "accrual_type" in params:
where_clauses.append(f"accrual_type = '{params['accrual_type']}'")
where_clause = " AND ".join(where_clauses)
# Маппинг функций агрегирования
agg_func_map = {
"sum": f"SUM({field})",
"count": "COUNT(*)",
"avg": f"AVG({field})",
"min": f"MIN({field})",
"max": f"MAX({field})"
}
agg_func = agg_func_map.get(aggregation.lower(), f"SUM({field})")
sql_query = f"""
SELECT
{agg_func} as result
FROM accruals
WHERE {where_clause}
"""
result = await query_accruals_db(sql_query)
else:
result = f"❌ Неизвестный тип запроса: {query_type}"
elif tool_name == "create_accrual":
# Создать новое начисление (POST /accruals/)
try:
async with httpx.AsyncClient() as client:
resp = await client.post(f"{ACCRUALS_API_URL}/accruals/", json=tool_args, timeout=30)
if resp.status_code in (200, 201):
result = resp.json()
else:
result = f"❌ HTTP {resp.status_code}: {resp.text}"
except Exception as e:
logger.error(f"Ошибка при create_accrual: {e}")
result = f"❌ Ошибка при создании начисления: {e}"
elif tool_name == "update_accrual":
# Обновить начисление по id_accrual (PUT /accruals/{id_accrual})
id_accrual = tool_args.get("id_accrual")
if not id_accrual:
result = "❌ Отсутствует id_accrual для обновления"
else:
body = {k: v for k, v in tool_args.items() if k != "id_accrual"}
try:
async with httpx.AsyncClient() as client:
resp = await client.put(f"{ACCRUALS_API_URL}/accruals/{id_accrual}", json=body, timeout=30)
if resp.status_code == 200:
result = resp.json()
else:
result = f"❌ HTTP {resp.status_code}: {resp.text}"
except Exception as e:
logger.error(f"Ошибка при update_accrual: {e}")
result = f"❌ Ошибка при обновлении начисления: {e}"
elif tool_name == "delete_accrual":
# Удалить начисление по id_accrual (DELETE /accruals/{id_accrual})
id_accrual = tool_args.get("id_accrual")
if not id_accrual:
result = "❌ Отсутствует id_accrual для удаления"
else:
try:
async with httpx.AsyncClient() as client:
resp = await client.delete(f"{ACCRUALS_API_URL}/accruals/{id_accrual}", timeout=30)
if resp.status_code in (200, 204):
result = f"✅ Удалено {id_accrual}"
else:
result = f"❌ HTTP {resp.status_code}: {resp.text}"
except Exception as e:
logger.error(f"Ошибка при delete_accrual: {e}")
result = f"❌ Ошибка при удалении начисления: {e}"
else:
result = f"❌ Неизвестный инструмент: {tool_name}"
tool_results.append({
"tool_call_id": tool_call.get("id"),
"content": result
})
# Добавляем результаты инструментов в историю сообщений
messages.append({
"role": "assistant",
"content": message.get("content", ""),
"tool_calls": message.get("tool_calls")
})
for tool_result in tool_results:
messages.append({
"role": "tool",
"tool_call_id": tool_result["tool_call_id"],
"content": tool_result["content"]
})
# Отправляем обновленный запрос в Deepseek для получения финального ответа
headers = {
"Authorization": f"Bearer {Config.DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "deepseek-chat",
"messages": messages,
"temperature": 0.7,
"max_tokens": 2000,
"stream": False
}
try:
async with httpx.AsyncClient() as client:
logger.debug(f"Отправка результатов инструментов обратно в Deepseek")
response_obj = await client.post(
Config.DEEPSEEK_BASE_URL,
json=payload,
headers=headers,
timeout=Config.API_TIMEOUT
)
response_obj.raise_for_status()
response = response_obj.json()
except Exception as e:
logger.error(f"Ошибка при повторном запросе к Deepseek: {e}")
return "❌ Ошибка при обработке ответа"
async def get_weather_response(user_query: str) -> str:
"""
Получает ответ о погоде, используя Deepseek с реальными данными.
Главная функция для взаимодействия с пользователем.
Args:
user_query: Запрос пользователя
Returns:
Ответ с информацией о погоде
"""
logger.info(f"Обработка запроса пользователя: '{user_query}'")
return await call_deepseek_with_tools(user_query)