"""Единый сервис: MCP сервер + Bitrix24 Бот.
Объединяет функциональность:
1. MCP Сервер (SSE): для внешних клиентов (Yandex Cloud).
2. REST API (/webhook): для входящих сообщений от Bitrix24.
3. Bot Logic: обработка сообщений с использованием YandexGPT и инструментов.
"""
import asyncio
import json
import os
from contextlib import asynccontextmanager
from typing import Any, Dict, List
import httpx
import uvicorn
from fastapi import FastAPI, Request, BackgroundTasks, Response
from fastapi.responses import JSONResponse
from sse_starlette.sse import EventSourceResponse
# MCP Imports
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
from src.config import SettingsManager
from src.infrastructure.logging.logger import logger
from src.infrastructure.mcp.handlers import (
register_contact_handlers,
register_deal_handlers,
register_user_handlers,
)
# Импортируем наш server wrapper, но будем использовать его инструменты
from src.presentation.mcp import create_mcp_server
# =============================================================================
# Настройки
# =============================================================================
settings = SettingsManager.init()
# =============================================================================
# YandexGPT Client
# =============================================================================
class YandexGPTClient:
def __init__(self, folder_id: str, api_key: str):
self.url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
self.folder_id = folder_id
self.api_key = api_key
self.model = "yandexgpt-lite"
async def generate(self, messages: List[Dict[str, str]]) -> str:
if not self.folder_id or not self.api_key:
return "⚠️ YandexGPT не настроен (нет ключей)."
payload = {
"modelUri": f"gpt://{self.folder_id}/{self.model}",
"completionOptions": {"stream": False, "temperature": 0.3, "maxTokens": 1000},
"messages": messages
}
headers = {"Authorization": f"Api-Key {self.api_key}"}
async with httpx.AsyncClient() as client:
try:
response = await client.post(self.url, json=payload, headers=headers, timeout=30.0)
if response.status_code == 200:
result = response.json()
return result.get("result", {}).get("alternatives", [{}])[0].get("message", {}).get("text", "")
return f"Error YandexGPT: {response.status_code} {response.text}"
except Exception as e:
return f"Exception YandexGPT: {str(e)}"
gpt_client = YandexGPTClient(settings.YANDEX_FOLDER_ID, settings.YANDEX_API_KEY)
# =============================================================================
# MCP Server & Tools
# =============================================================================
# Мы используем FastMCP экземпляр из create_mcp_server чтобы получить доступ к инструментам
# FastMCP - это удобная обертка, мы будем дергать ее методы
fast_mcp = create_mcp_server()
async def call_local_tool(name: str, arguments: dict) -> str:
"""Вызов инструмента локально (для бота)."""
try:
# FastMCP имеет метод call_tool
result = await fast_mcp.call_tool(name, arguments)
# Результат это список Content объектов. Преобразуем в строку.
text_parts = []
for content in result:
if hasattr(content, "text"):
text_parts.append(content.text)
else:
text_parts.append(str(content))
return "\n".join(text_parts)
except Exception as e:
logger.error("Tool execution failed", tool=name, error=str(e))
return f"Error executing {name}: {e}"
# =============================================================================
# FastAPI App
# =============================================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger.info("Service started")
yield
# Shutdown
logger.info("Service stopped")
app = FastAPI(lifespan=lifespan)
# --- MCP SSE Endpoints ---
# Для Yandex Cloud мы должны реализовать SSE протокол.
# FastMCP умеет сам себя сервить, но мы хотим интегрировать это в FastAPI.
# Простой способ: используем SSE Starlette и подключаем FastMCP.
# ВАЖНО: FastMCP (текущая версия) инкапсулирует Starlette.
# Чтобы не усложнять, мы пойдем на хитрость:
# Мы создадим /sse и /messages эндпоинты, и будем проксировать их...
# ИЛИ (лучше): Мы просто используем FastMCP как "реестр инструментов" для бота,
# а для внешки (Yandex) мы поднимем стандартный MCP SSE Transport.
# Инициализируем "ванильный" MCP Server для SSE транспорта
mcp_sse_server = Server("Bitrix24-Unified")
sse_transport = SseServerTransport("/messages")
# Копируем инструменты из FastMCP в ванильный Server (или регистрируем заново)
# Это сложно сделать автоматически, поэтому проще зарегистрировать handlers заново
# Но у нас handlers привязаны к FastMCP.
# РЕШЕНИЕ:
# Мы просто запустим FastMCP внутри FastAPI как "Sub Application" или используем его asgi_app?
# FastMCP наследует от чего-то? Нет.
# НО! Мы можем просто эмулировать SSE endpoint, дергая FastMCP.
# ЛАДНО, САМЫЙ ПРОСТОЙ ВАРИАНТ ДЛЯ ПОЛЬЗОВАТЕЛЯ:
# Бот (Webhook) и MCP работают в одном процессе.
# Чтобы Yandex видел MCP, мы реализуем /sse сами.
@app.get("/sse")
async def handle_sse(request: Request):
"""SSE Endpoint для MCP клиента."""
async with sse_transport.connect_sse(request.scope, request.receive, request._send) as streams:
await mcp_sse_server.run(
streams[0], streams[1], mcp_sse_server.create_initialization_options()
)
@app.post("/messages")
async def handle_messages(request: Request):
"""Messages Endpoint для MCP клиента."""
await sse_transport.handle_post_message(request.scope, request.receive, request._send)
# Регистрируем инструменты в ванильном сервере
# Нам нужно достать функции из handlers.py и зарегистрировать их
# Чтобы не дублировать код, импортируем функции регистрации, но модифицируем их
# чтобы они принимали любой сервер с интерфейсом .tool()
# FastMCP.tool() и mcp.server.Server.list_tools() отличаются.
# ВРЕМЕННОЕ РЕШЕНИЕ:
# Мы объявим инструменты "прокси" в ванильном сервере, которые вызывают FastMCP.
@mcp_sse_server.list_tools()
async def list_tools() -> list[Tool]:
tools = await fast_mcp.list_tools() # FastMCP возвращает список Tool
return tools
@mcp_sse_server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent | ImageContent | EmbeddedResource]:
# Делегируем вызов в FastMCP
return await fast_mcp.call_tool(name, arguments)
# --- Bot Webhook ---
SYSTEM_PROMPT = """Ты — ассистент CRM Bitrix24.
Инструменты: search_contacts, get_contact, list_deals, get_deal.
Если нужно действие - ответь JSON: {"tool": "name", "arguments": {...}}
Иначе - текст.
"""
async def process_user_message(dialog_id: str, message: str):
"""Логика бота."""
logger.info("Bot msg", text=message)
# 1. GPT
messages = [{"role": "system", "text": SYSTEM_PROMPT}, {"role": "user", "text": message}]
gpt_response = await gpt_client.generate(messages)
# 2. Tool Check
final_response = gpt_response
try:
start = gpt_response.find("{")
end = gpt_response.rfind("}")
if start != -1 and end != -1:
cmd = json.loads(gpt_response[start:end+1])
if "tool" in cmd:
t_name = cmd["tool"]
t_args = cmd.get("arguments", {})
await send_bitrix(dialog_id, f"⚙️ Выполняю {t_name}...")
# Вызов локального инструмента
tool_res = await call_local_tool(t_name, t_args)
# GPT format
messages.append({"role": "assistant", "text": gpt_response})
messages.append({"role": "user", "text": f"Результат: {tool_res}\nОтветь пользователю."})
final_response = await gpt_client.generate(messages)
except:
pass
# 3. Response
await send_bitrix(dialog_id, final_response)
async def send_bitrix(dialog_id: str, text: str):
if not settings.BITRIX_WEBHOOK_URL:
return
url = f"{settings.BITRIX_WEBHOOK_URL}im.message.add"
async with httpx.AsyncClient() as c:
await c.post(url, json={"DIALOG_ID": dialog_id, "MESSAGE": text, "SYSTEM": "N"})
@app.post("/webhook")
async def webhook(request: Request, background_tasks: BackgroundTasks):
try:
form = await request.form()
data = dict(form)
# Упрощенный парсинг для Битрикса
# Обычно параметры приходят как data[PARAMS][MESSAGE]
msg = data.get("data[PARAMS][MESSAGE]")
did = data.get("data[PARAMS][DIALOG_ID]")
if msg and did:
background_tasks.add_task(process_user_message, did, msg)
return {"status": "ok"}
except Exception as e:
logger.error("Webhook error", error=str(e))
return {"status": "error"}
@app.get("/health")
def health():
return {"status": "ok", "mode": "unified"}
def run():
"""Точка входа."""
config = uvicorn.Config(
app,
host=settings.MCP_HOST,
port=settings.MCP_PORT,
log_level=settings.LOG_LEVEL.lower()
)
server = uvicorn.Server(config)
asyncio.run(server.serve())
if __name__ == "__main__":
run()