"""Bitrix24 Bot Bridge Service.
Этот сервис является "Мозгом" системы. Он:
1. Принимает вебхуки от Bitrix24.
2. Отправляет текст пользователя в YandexGPT.
3. Если YandexGPT решает вызвать инструмент - отправляет запрос в локальный MCP сервер (через SSE).
4. Возвращает ответ пользователю в чат Bitrix24.
"""
import json
import os
import asyncio
from typing import Any, Dict, List, Optional
import httpx
from fastapi import FastAPI, Request, BackgroundTasks
from pydantic import BaseModel
import uvicorn
from mcp.client.sse import sse_client
from mcp.client.session import ClientSession
# Настройки из переменных окружения
BITRIX_WEBHOOK_URL = os.getenv("BITRIX_WEBHOOK_URL", "")
MCP_SERVER_URL = os.getenv("MCP_SERVER_SSE_URL", "http://localhost:8000/sse")
YANDEX_FOLDER_ID = os.getenv("YANDEX_FOLDER_ID", "")
YANDEX_API_KEY = os.getenv("YANDEX_API_KEY", "")
YANDEX_GPT_MODEL = "yandexgpt-lite"
app = FastAPI()
# Базовый промпт для YandexGPT
SYSTEM_PROMPT = """Ты — умный ассистент CRM Bitrix24. Твоя цель — помогать менеджерам.
У тебя есть доступ к инструментам CRM через функцию `call_tool`.
Доступные инструменты:
- search_contacts(query): Поиск контакта по имени или телефону.
- get_contact(contact_id): Получить детали контакта.
- list_deals(active_only, limit): Список сделок.
- get_deal(deal_id): Детали сделки.
- update_deal_stage(deal_id, stage_id): Обновить стадию.
Алгоритм работы:
1. Если пользователь просит найти что-то или выполнить действие — верни JSON с вызовом функции:
{"tool": "имя_инструмента", "arguments": {параметры}}
Пример: {"tool": "search_contacts", "arguments": {"query": "Иванов"}}
2. Если это просто вопрос или приветствие — ответь текстом.
"""
class YandexGPTClient:
"""Клиент для работы с YandexGPT."""
def __init__(self, folder_id: str, api_key: str):
self.url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
self.folder_id = folder_id
self.api_key = api_key
async def generate(self, messages: List[Dict[str, str]]) -> str:
"""Генерация ответа."""
if not self.folder_id or not self.api_key:
return '{"error": "YandexGPT not configured"}'
payload = {
"modelUri": f"gpt://{self.folder_id}/{self.YANDEX_GPT_MODEL}",
"completionOptions": {"stream": False, "temperature": 0.3, "maxTokens": 1000},
"messages": messages
}
headers = {"Authorization": f"Api-Key {self.api_key}"}
async with httpx.AsyncClient() as client:
try:
response = await client.post(self.url, json=payload, headers=headers, timeout=30.0)
if response.status_code == 200:
result = response.json()
return result.get("result", {}).get("alternatives", [{}])[0].get("message", {}).get("text", "")
return f"Error {response.status_code}"
except Exception as e:
return f"Exception: {str(e)}"
# Глобальный клиент YandexGPT
gpt_client = YandexGPTClient(YANDEX_FOLDER_ID, YANDEX_API_KEY)
async def execute_mcp_tool(tool_name: str, arguments: Dict[str, Any]) -> str:
"""Выполнение инструмента через MCP SSE Client."""
# Подключаемся к локальному MCP серверу
async with sse_client(MCP_SERVER_URL) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Вызываем инструмент
try:
result = await session.call_tool(tool_name, arguments)
return json.dumps(result.content, ensure_ascii=False)
except Exception as e:
return f"Error executing tool {tool_name}: {str(e)}"
async def process_user_message(dialog_id: str, message: str):
"""Обработка сообщения асинхронно."""
print(f"Processing message: {message}")
# 1. Спрашиваем YandexGPT
messages = [
{"role": "system", "text": SYSTEM_PROMPT},
{"role": "user", "text": message}
]
gpt_response = await gpt_client.generate(messages)
# 2. Проверяем, есть ли вызов функции (JSON)
tool_result = ""
try:
# Пытаемся найти JSON-подобную структуру
start = gpt_response.find("{")
end = gpt_response.rfind("}")
if start != -1 and end != -1:
json_str = gpt_response[start:end+1]
command = json.loads(json_str)
if "tool" in command:
tool_name = command["tool"]
args = command.get("arguments", {})
# Сообщаем пользователю, что работаем
await send_bitrix_message(dialog_id, f"🔄 Выполняю {tool_name}...")
# 3. Вызываем MCP инструмент
tool_result = await execute_mcp_tool(tool_name, args)
# 4. Просим GPT отформатировать результат
messages.append({"role": "assistant", "text": gpt_response})
messages.append({"role": "user", "text": f"Результат выполнения: {tool_result}\n\nСформулируй ответ пользователю."})
final_response = await gpt_client.generate(messages)
else:
final_response = gpt_response
else:
final_response = gpt_response
except Exception as e:
print(f"Error parsing/executing: {e}")
final_response = gpt_response
# 5. Отправляем ответ в Bitrix
await send_bitrix_message(dialog_id, final_response)
async def send_bitrix_message(dialog_id: str, message: str):
"""Отправка сообщения в чат Bitrix24."""
if not BITRIX_WEBHOOK_URL:
print("Bitrix Webhook URL is missing")
return
url = f"{BITRIX_WEBHOOK_URL}im.message.add"
async with httpx.AsyncClient() as client:
await client.post(url, json={
"DIALOG_ID": dialog_id,
"MESSAGE": message,
"SYSTEM": "N"
})
@app.post("/webhook")
async def webhook(request: Request, background_tasks: BackgroundTasks):
"""Эндпоинт для вебхука Bitrix24."""
try:
data = await request.form()
data_dict = dict(data)
# Для JSON body (если Bitrix шлет JSON)
if not data_dict:
try:
data_dict = await request.json()
except:
pass
event = data_dict.get("event", "")
if event == "ONIMBOTMESSAGEADD":
# Извлекаем вложенные параметры. Bitrix шлет их как array[key] в form-data
# Но FastAPI form parser может развернуть их плоско.
# Упрощенная логика извлечения:
msg_params = data_dict.get("data[PARAMS][MESSAGE]") or data_dict.get("data[PARAMS][MESSAGE]", "")
dialog_id = data_dict.get("data[PARAMS][DIALOG_ID]") or data_dict.get("data[PARAMS][DIALOG_ID]", "")
# Если не нашли в form-data, пробуем распарсить сырой input (Bitrix специфичен)
if not msg_params:
# В реальном проекте тут нужен более надежный парсер Form Data от Bitrix
# Для MVP возьмем основные поля
pass
# Запускаем обработку в фоне, чтобы быстро ответить Bitrix "OK"
if dialog_id and msg_params:
background_tasks.add_task(process_user_message, dialog_id, msg_params)
return {"status": "ok"}
except Exception as e:
print(f"Webhook Error: {e}")
return {"status": "error", "message": str(e)}
@app.get("/health")
def health():
return {"status": "healthy", "service": "bitrix-bot-bridge"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=5001)