Skip to main content
Glama
ASXRND

MCP Weather & Accruals Server

by ASXRND
deepseek_api.py38.1 kB
#!/usr/bin/env python3 """ Модуль для работы с Deepseek API. Содержит функции для взаимодействия с Deepseek и вызова инструментов для получения погоды и работы с файлами. """ import json import logging import os from typing import Optional, Any, List, Dict import httpx import psycopg2 from psycopg2.extras import RealDictCursor from modules.config import Config from modules.weather_api import get_current_weather, get_forecast from modules.file_api import ( read_file, create_file, update_file, append_to_file, delete_file, list_files, ALLOWED_BASE_DIR, ) logger = logging.getLogger("DeepseekAPI") # API URL для работы с БД (берётся из конфига) ACCRUALS_API_URL = Config.ACCRUALS_API_URL # Параметры БД DB_URL = Config.DATABASE_URL # postgresql://user:pass@host:port/db async def query_accruals_db(query: str) -> str: """ Прямое подключение к PostgreSQL для выполнения запросов. Args: query: SQL запрос Returns: JSON ответ в виде строки или сообщение об ошибке """ try: # Парсим DATABASE_URL # postgresql://asx:asxAdmin1@192.168.0.111:5432/mydb from urllib.parse import urlparse parsed = urlparse(DB_URL) conn = psycopg2.connect( host=parsed.hostname, port=parsed.port or 5432, database=parsed.path.lstrip('/'), user=parsed.username, password=parsed.password, connect_timeout=10 ) cur = conn.cursor(cursor_factory=RealDictCursor) cur.execute(query) result = cur.fetchall() cur.close() conn.close() if not result: return "Нет данных" # Преобразуем результат в читаемый формат if len(result) == 1: # Для одной строки (например, статистика) row = dict(result[0]) return json.dumps(row, ensure_ascii=False, indent=2, default=str) else: # Для списка записей result_list = [dict(row) for row in result] return json.dumps(result_list, ensure_ascii=False, indent=2, default=str) except psycopg2.OperationalError as e: error_msg = f"❌ Ошибка подключения к БД: {str(e)}" logger.error(error_msg) return error_msg except Exception as e: error_msg = f"❌ Ошибка при запросе к БД: {str(e)}" logger.error(error_msg) return error_msg # ============================================================================ # Вспомогательные функции для работы с API БД # ============================================================================ async def query_accruals_api(endpoint: str, params: dict = None) -> str: """ Вспомогательная функция для вызова API БД (modules/api.py). Args: endpoint: Путь к endpoint (например, "/accruals" или "/stats/summary") params: Query параметры Returns: JSON ответ в виде строки или сообщение об ошибке """ try: url = f"{ACCRUALS_API_URL}{endpoint}" logger.debug(f"Запрос к API БД: {url} с параметрами {params}") async with httpx.AsyncClient() as client: response = await client.get(url, params=params, timeout=30) response.raise_for_status() data = response.json() # Преобразуем ответ в читаемую строку if isinstance(data, list): if not data: return "Нет данных" # Для списков показываем первые 5 элементов result = f"Найдено {len(data)} записей:\n" for i, item in enumerate(data[:5]): result += f"\n{i+1}. {item}\n" if len(data) > 5: result += f"\n... и ещё {len(data) - 5} записей" return result else: return json.dumps(data, ensure_ascii=False, indent=2) except httpx.ConnectError as e: error_msg = f"❌ Ошибка подключения к API БД ({ACCRUALS_API_URL}): {str(e)}" logger.error(error_msg) return error_msg except httpx.HTTPStatusError as e: error_msg = f"❌ HTTP ошибка {e.response.status_code}: {e.response.text}" logger.error(error_msg) return error_msg except Exception as e: error_msg = f"❌ Ошибка при запросе к API БД: {str(e)}" logger.error(error_msg) return error_msg def get_weather_tools() -> List[Dict[str, Any]]: """ Возвращает определение инструментов для Deepseek (function calling). Инструменты позволяют Deepseek запрашивать реальные данные о погоде и работать с файлами. Returns: Список с определением инструментов (JSON schema) """ return [ { "type": "function", "function": { "name": "get_current_weather", "description": "Получить текущую погоду для города", "parameters": { "type": "object", "properties": { "city": { "type": "string", "description": "Название города (на русском или английском)" }, "units": { "type": "string", "enum": ["metric", "imperial"], "description": "Система измерения (по умолчанию metric для Цельсия)" } }, "required": ["city"] } } }, { "type": "function", "function": { "name": "get_forecast", "description": "Получить прогноз погоды на несколько дней", "parameters": { "type": "object", "properties": { "city": { "type": "string", "description": "Название города" }, "days": { "type": "integer", "description": "Количество дней для прогноза (1-5, по умолчанию 3)" }, "units": { "type": "string", "enum": ["metric", "imperial"], "description": "Система измерения (по умолчанию metric)" } }, "required": ["city"] } } }, { "type": "function", "function": { "name": "read_file", "description": "Читает содержимое файла", "parameters": { "type": "object", "properties": { "file_path": { "type": "string", "description": "Путь к файлу. Может быть абсолютным или относительным." }, "base_dir": { "type": "string", "description": "(необязательно) Базовая директория для относительных путей. По умолчанию используется безопасная директория на хосте." } }, "required": ["file_path"] } } }, { "type": "function", "function": { "name": "create_file", "description": "Создаёт новый файл с содержимым", "parameters": { "type": "object", "properties": { "file_path": { "type": "string", "description": "Путь к файлу. Может быть абсолютным или относительным." }, "base_dir": { "type": "string", "description": "(необязательно) Базовая директория для относительных путей." }, "content": { "type": "string", "description": "Содержимое файла" }, "overwrite": { "type": "boolean", "description": "Перезаписать ли файл, если он существует (по умолчанию false)" } }, "required": ["file_path", "content"] } } }, { "type": "function", "function": { "name": "update_file", "description": "Обновляет содержимое существующего файла", "parameters": { "type": "object", "properties": { "file_path": { "type": "string", "description": "Путь к файлу. Может быть абсолютным или относительным." }, "base_dir": { "type": "string", "description": "(необязательно) Базовая директория для относительных путей." }, "content": { "type": "string", "description": "Новое содержимое файла" } }, "required": ["file_path", "content"] } } }, { "type": "function", "function": { "name": "append_to_file", "description": "Добавляет содержимое в конец файла", "parameters": { "type": "object", "properties": { "file_path": { "type": "string", "description": "Путь к файлу. Может быть абсолютным или относительным." }, "base_dir": { "type": "string", "description": "(необязательно) Базовая директория для относительных путей." }, "content": { "type": "string", "description": "Содержимое для добавления" } }, "required": ["file_path", "content"] } } }, { "type": "function", "function": { "name": "delete_file", "description": "Удаляет файл", "parameters": { "type": "object", "properties": { "file_path": { "type": "string", "description": "Путь к файлу. Может быть абсолютным или относительным." }, "base_dir": { "type": "string", "description": "(необязательно) Базовая директория для относительных путей." } }, "required": ["file_path"] } } }, { "type": "function", "function": { "name": "list_files", "description": "Выводит список файлов в директории", "parameters": { "type": "object", "properties": { "directory": { "type": "string", "description": "Путь к директории (относительно базовой директории, по умолчанию .)" }, "base_dir": { "type": "string", "description": "(необязательно) Базовая директория для относительных путей." } }, "required": [] } } }, { "type": "function", "function": { "name": "query_accruals", "description": "Запросить данные о начислениях из PostgreSQL БД (Ozon отчёты, ноябрь 2025)", "parameters": { "type": "object", "properties": { "query_type": { "type": "string", "enum": ["list", "stats", "summary"], "description": "Тип запроса: 'list' - список начислений, 'stats' - статистика с фильтрацией, 'summary' - общая сводка" }, "service_group": { "type": "string", "description": "(опционально) Фильтр по группе услуг (например, 'Услуги доставки', 'Продажи', 'Вознаграждение Ozon')" }, "sales_platform": { "type": "string", "description": "(опционально) Фильтр по платформе продажи (например, 'Ozon', 'Wildberries')" }, "accrual_type": { "type": "string", "description": "(опционально) Фильтр по типу начисления (например, 'Комиссия', 'Реклама')" }, "limit": { "type": "integer", "description": "(опционально) Максимум записей для возврата (по умолчанию 100)" }, "skip": { "type": "integer", "description": "(опционально) Количество записей для пропуска (для пагинации)" } }, "required": ["query_type"] } } } , { "type": "function", "function": { "name": "create_accrual", "description": "Создать новое начисление в БД", "parameters": { "type": "object", "properties": { "id_accrual": {"type": "string"}, "accrual_date": {"type": "string", "description": "Дата в формате YYYY-MM-DD"}, "service_group": {"type": "string"}, "accrual_type": {"type": "string"}, "article": {"type": "string"}, "sku": {"type": "string"}, "product_name": {"type": "string"}, "quantity": {"type": "integer"}, "seller_price": {"type": "number"}, "order_received_date": {"type": "string"}, "sales_platform": {"type": "string"}, "work_scheme": {"type": "string"}, "ozon_fee_pct": {"type": "number"}, "localization_index_pct": {"type": "number"}, "avg_delivery_hours": {"type": "number"}, "total_amount_rub": {"type": "number"} }, "required": ["id_accrual", "accrual_date", "service_group", "accrual_type", "sales_platform", "total_amount_rub"] } } }, { "type": "function", "function": { "name": "update_accrual", "description": "Обновить существующее начисление по id_accrual", "parameters": { "type": "object", "properties": { "id_accrual": {"type": "string"}, "accrual_date": {"type": "string"}, "service_group": {"type": "string"}, "accrual_type": {"type": "string"}, "article": {"type": "string"}, "sku": {"type": "string"}, "product_name": {"type": "string"}, "quantity": {"type": "integer"}, "seller_price": {"type": "number"}, "order_received_date": {"type": "string"}, "sales_platform": {"type": "string"}, "work_scheme": {"type": "string"}, "ozon_fee_pct": {"type": "number"}, "localization_index_pct": {"type": "number"}, "avg_delivery_hours": {"type": "number"}, "total_amount_rub": {"type": "number"} }, "required": ["id_accrual"] } } }, { "type": "function", "function": { "name": "delete_accrual", "description": "Удалить начисление по id_accrual", "parameters": { "type": "object", "properties": { "id_accrual": {"type": "string"} }, "required": ["id_accrual"] } } } ] async def call_deepseek_with_tools(user_query: str) -> str: """ Вызывает Deepseek API с поддержкой инструментов (function calling). Deepseek может запросить реальные данные о погоде через инструменты. Args: user_query: Запрос пользователя о погоде Returns: Ответ от Deepseek, дополненный реальными данными о погоде """ logger.info(f"Отправка запроса в Deepseek: '{user_query}'") tools = get_weather_tools() messages = [ { "role": "user", "content": user_query } ] headers = { "Authorization": f"Bearer {Config.DEEPSEEK_API_KEY}", "Content-Type": "application/json" } payload = { "model": "deepseek-chat", "messages": messages, "tools": tools, "temperature": 0.7, "max_tokens": 2000, "stream": False } try: async with httpx.AsyncClient() as client: logger.debug(f"Отправка запроса с инструментами к Deepseek") response = await client.post( Config.DEEPSEEK_BASE_URL, json=payload, headers=headers, timeout=Config.API_TIMEOUT ) response.raise_for_status() result = response.json() logger.debug(f"Получен ответ от Deepseek") # Обрабатываем ответ и вызовы инструментов return await process_deepseek_response(result, messages) except httpx.HTTPStatusError as e: error_msg = f"HTTP ошибка {e.response.status_code}" logger.error(error_msg) return f"❌ Ошибка при обращении к Deepseek API: {error_msg}" except Exception as e: error_msg = f"Ошибка при работе с Deepseek: {str(e)}" logger.error(error_msg) return f"❌ Внутренняя ошибка: {error_msg}" async def process_deepseek_response(response: dict, messages: list) -> str: """ Обрабатывает ответ от Deepseek, включая вызовы инструментов (function calling). Если Deepseek вызывает инструмент, мы выполняем его и отправляем результаты обратно. Args: response: JSON ответ от Deepseek API messages: История сообщений для контекста Returns: Финальный ответ от Deepseek с результатами инструментов """ while True: choice = response.get("choices", [{}])[0] message = choice.get("message", {}) # Если нет вызовов инструментов, возвращаем текстовый ответ if "tool_calls" not in message or not message["tool_calls"]: text_response = message.get("content", "❌ Пустой ответ от Deepseek") logger.info(f"Финальный ответ от Deepseek: {text_response[:100]}...") return text_response # Обрабатываем вызовы инструментов tool_results = [] for tool_call in message.get("tool_calls", []): tool_name = tool_call.get("function", {}).get("name") tool_args = json.loads(tool_call.get("function", {}).get("arguments", "{}")) logger.info(f"Deepseek вызвал инструмент: {tool_name}({tool_args})") # Выполняем инструмент if tool_name == "get_current_weather": result = await get_current_weather( tool_args.get("city"), tool_args.get("units", "metric") ) elif tool_name == "get_forecast": result = await get_forecast( tool_args.get("city"), tool_args.get("days", 3), tool_args.get("units", "metric") ) elif tool_name == "read_file": # allow caller to specify base_dir (for absolute/relative control) result = await read_file( tool_args.get("file_path", ""), tool_args.get("base_dir", ALLOWED_BASE_DIR), ) elif tool_name == "create_file": result = await create_file( tool_args.get("file_path", ""), tool_args.get("content", ""), base_dir=tool_args.get("base_dir", ALLOWED_BASE_DIR), overwrite=tool_args.get("overwrite", False) ) elif tool_name == "update_file": result = await update_file( tool_args.get("file_path", ""), tool_args.get("content", ""), base_dir=tool_args.get("base_dir", ALLOWED_BASE_DIR), ) elif tool_name == "append_to_file": result = await append_to_file( tool_args.get("file_path", ""), tool_args.get("content", ""), base_dir=tool_args.get("base_dir", ALLOWED_BASE_DIR), ) elif tool_name == "delete_file": result = await delete_file( tool_args.get("file_path", ""), base_dir=tool_args.get("base_dir", ALLOWED_BASE_DIR), ) elif tool_name == "list_files": result = await list_files( tool_args.get("directory", "."), base_dir=tool_args.get("base_dir", ALLOWED_BASE_DIR), ) elif tool_name == "query_accruals": query_type = tool_args.get("query_type", "summary") params = {} # Добавляем все опциональные параметры, если они предоставлены if tool_args.get("service_group"): params["service_group"] = tool_args["service_group"] if tool_args.get("sales_platform"): params["sales_platform"] = tool_args["sales_platform"] if tool_args.get("accrual_type"): params["accrual_type"] = tool_args["accrual_type"] if tool_args.get("limit"): params["limit"] = tool_args["limit"] if tool_args.get("skip"): params["skip"] = tool_args["skip"] # Строим SQL запрос в зависимости от типа if query_type == "summary": # Общая статистика по всем начислениям ноября sql_query = """ SELECT COUNT(*) as total_count, SUM(total_amount_rub) as total_amount, AVG(total_amount_rub) as avg_amount, MIN(total_amount_rub) as min_amount, MAX(total_amount_rub) as max_amount, COUNT(DISTINCT sales_platform) as platforms_count, COUNT(DISTINCT accrual_type) as types_count FROM accruals """ result = await query_accruals_db(sql_query) elif query_type == "list": # Список начислений с фильтрацией и пагинацией limit = params.get("limit", 50) skip = params.get("skip", 0) where_clauses = [] if "service_group" in params: where_clauses.append(f"service_group = '{params['service_group']}'") if "sales_platform" in params: where_clauses.append(f"sales_platform = '{params['sales_platform']}'") if "accrual_type" in params: where_clauses.append(f"accrual_type = '{params['accrual_type']}'") where_clause = " AND ".join(where_clauses) if where_clauses else "1=1" sql_query = f""" SELECT id, id_accrual, accrual_date, service_group, sales_platform, accrual_type, total_amount_rub, quantity, unit_price, created_at, updated_at FROM accruals WHERE {where_clause} ORDER BY accrual_date DESC, id DESC LIMIT {limit} OFFSET {skip} """ result = await query_accruals_db(sql_query) elif query_type in ["stats", "accruals"]: # Статистика с фильтрацией where_clauses = ["1=1"] if "service_group" in params: where_clauses.append(f"service_group = '{params['service_group']}'") if "sales_platform" in params: where_clauses.append(f"sales_platform = '{params['sales_platform']}'") if "accrual_type" in params: where_clauses.append(f"accrual_type = '{params['accrual_type']}'") where_clause = " AND ".join(where_clauses) sql_query = f""" SELECT sales_platform, accrual_type, COUNT(*) as count, SUM(total_amount_rub) as total_amount, AVG(total_amount_rub) as avg_amount FROM accruals WHERE {where_clause} GROUP BY sales_platform, accrual_type ORDER BY total_amount DESC """ result = await query_accruals_db(sql_query) elif query_type == "aggregate": # Агрегирование данных по выбранному полю aggregation = tool_args.get("aggregation", "sum") # sum, count, avg, min, max field = tool_args.get("field", "total_amount_rub") # Строим WHERE условия для фильтрации where_clauses = ["1=1"] if "service_group" in params: where_clauses.append(f"service_group = '{params['service_group']}'") if "sales_platform" in params: where_clauses.append(f"sales_platform = '{params['sales_platform']}'") if "accrual_type" in params: where_clauses.append(f"accrual_type = '{params['accrual_type']}'") where_clause = " AND ".join(where_clauses) # Маппинг функций агрегирования agg_func_map = { "sum": f"SUM({field})", "count": "COUNT(*)", "avg": f"AVG({field})", "min": f"MIN({field})", "max": f"MAX({field})" } agg_func = agg_func_map.get(aggregation.lower(), f"SUM({field})") sql_query = f""" SELECT {agg_func} as result FROM accruals WHERE {where_clause} """ result = await query_accruals_db(sql_query) else: result = f"❌ Неизвестный тип запроса: {query_type}" elif tool_name == "create_accrual": # Создать новое начисление (POST /accruals/) try: async with httpx.AsyncClient() as client: resp = await client.post(f"{ACCRUALS_API_URL}/accruals/", json=tool_args, timeout=30) if resp.status_code in (200, 201): result = resp.json() else: result = f"❌ HTTP {resp.status_code}: {resp.text}" except Exception as e: logger.error(f"Ошибка при create_accrual: {e}") result = f"❌ Ошибка при создании начисления: {e}" elif tool_name == "update_accrual": # Обновить начисление по id_accrual (PUT /accruals/{id_accrual}) id_accrual = tool_args.get("id_accrual") if not id_accrual: result = "❌ Отсутствует id_accrual для обновления" else: body = {k: v for k, v in tool_args.items() if k != "id_accrual"} try: async with httpx.AsyncClient() as client: resp = await client.put(f"{ACCRUALS_API_URL}/accruals/{id_accrual}", json=body, timeout=30) if resp.status_code == 200: result = resp.json() else: result = f"❌ HTTP {resp.status_code}: {resp.text}" except Exception as e: logger.error(f"Ошибка при update_accrual: {e}") result = f"❌ Ошибка при обновлении начисления: {e}" elif tool_name == "delete_accrual": # Удалить начисление по id_accrual (DELETE /accruals/{id_accrual}) id_accrual = tool_args.get("id_accrual") if not id_accrual: result = "❌ Отсутствует id_accrual для удаления" else: try: async with httpx.AsyncClient() as client: resp = await client.delete(f"{ACCRUALS_API_URL}/accruals/{id_accrual}", timeout=30) if resp.status_code in (200, 204): result = f"✅ Удалено {id_accrual}" else: result = f"❌ HTTP {resp.status_code}: {resp.text}" except Exception as e: logger.error(f"Ошибка при delete_accrual: {e}") result = f"❌ Ошибка при удалении начисления: {e}" else: result = f"❌ Неизвестный инструмент: {tool_name}" tool_results.append({ "tool_call_id": tool_call.get("id"), "content": result }) # Добавляем результаты инструментов в историю сообщений messages.append({ "role": "assistant", "content": message.get("content", ""), "tool_calls": message.get("tool_calls") }) for tool_result in tool_results: messages.append({ "role": "tool", "tool_call_id": tool_result["tool_call_id"], "content": tool_result["content"] }) # Отправляем обновленный запрос в Deepseek для получения финального ответа headers = { "Authorization": f"Bearer {Config.DEEPSEEK_API_KEY}", "Content-Type": "application/json" } payload = { "model": "deepseek-chat", "messages": messages, "temperature": 0.7, "max_tokens": 2000, "stream": False } try: async with httpx.AsyncClient() as client: logger.debug(f"Отправка результатов инструментов обратно в Deepseek") response_obj = await client.post( Config.DEEPSEEK_BASE_URL, json=payload, headers=headers, timeout=Config.API_TIMEOUT ) response_obj.raise_for_status() response = response_obj.json() except Exception as e: logger.error(f"Ошибка при повторном запросе к Deepseek: {e}") return "❌ Ошибка при обработке ответа" async def get_weather_response(user_query: str) -> str: """ Получает ответ о погоде, используя Deepseek с реальными данными. Главная функция для взаимодействия с пользователем. Args: user_query: Запрос пользователя Returns: Ответ с информацией о погоде """ logger.info(f"Обработка запроса пользователя: '{user_query}'") return await call_deepseek_with_tools(user_query)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ASXRND/MCP_deepseek'

If you have feedback or need assistance with the MCP directory API, please join our Discord server