Dify as MCP Server
by Yevanchen
Verified
# Dify Endpoint 中实现 MCP 服务器的架构设计
## 总体 Roadmap
### 阶段 1: 基础 MCP 功能实现
- 设计端点架构,支持 GET 和 POST 请求
- 实现核心 JSON-RPC 处理
- 适配 Werkzeug 与 MCP SDK (如可能)
### 阶段 2: SSE 支持与用户体验优化
- 在端点限制下实现基础 SSE 功能
- 添加进度通知和心跳机制
- 优化错误处理和重试机制
### 阶段 3: 高级功能与生产化
- 完善工具定义和文档
- 添加配置选项和自定义功能
- 提高安全性和性能
## 架构设计
由于 Dify 不支持 `ANY` 类型的端点,需要创建两个独立端点来处理不同类型的请求:
```
/difyapp_as_mcp_server (GET) -> 处理 SSE 连接和说明页面
/difyapp_as_mcp_server (POST) -> 处理 JSON-RPC 请求
```
### 端点结构定义
```yaml
# GET 端点配置
path: "/difyapp_as_mcp_server"
method: "GET"
extra:
python:
source: "endpoints/difyapp_as_mcp_server.py"
# POST 端点配置
path: "/difyapp_as_mcp_server"
method: "POST"
extra:
python:
source: "endpoints/difyapp_as_mcp_server.py"
```
## MCP SDK 与 Werkzeug 适配方案
### 挑战分析
1. **MCP SDK 设计为独立服务器**:
- 通常由 SDK 控制整个 HTTP 服务器
- 假设可以完全控制请求/响应生命周期
2. **Dify Endpoint 限制**:
- 只是 HTTP 处理器,不是完整服务器
- 没有控制服务器生命周期的权限
- 无法直接使用 SDK 的 `run()` 方法
### 适配策略
采用**核心功能提取**策略:从 MCP SDK 中提取关键功能,而不是直接使用其服务器组件。
1. **引入 SDK 作为工具库**:
```python
from mcp.server.fastmcp import FastMCP
from mcp.server.tool_registry import ToolRegistry
```
2. **使用 SDK 注册工具**,但自行处理请求:
```python
# 在类外创建全局工具注册表
tool_registry = ToolRegistry()
class DifyappAsMcpServerEndpoint(Endpoint):
def __init__(self):
super().__init__()
# 工具定义,使用 SDK 的工具注册功能
self._setup_tools(tool_registry)
def _setup_tools(self, registry):
@registry.tool
async def dify_workflow(title: str, language: str = "English") -> str:
"""执行 Dify workflow 并返回结果"""
# 实现...
```
3. **手动路由并调用 SDK 功能**:
```python
def _invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response:
if r.method == "GET":
return self._handle_get(r, settings)
elif r.method == "POST" and r.is_json:
data = r.get_json()
if "jsonrpc" in data:
return self._handle_jsonrpc(r, data, settings)
else:
return self._handle_direct_call(r, data, settings)
```
## SSE 实现方案
在 Dify Endpoint 中实现 SSE 确实存在挑战,但可以通过以下策略实现基础 SSE 功能:
### 1. 生成器响应模式
```python
def _handle_get(self, r: Request, settings: Mapping) -> Response:
"""处理 GET 请求 - 支持 SSE 和普通页面"""
if r.headers.get("Accept") == "text/event-stream":
return self._handle_sse_connection(r, settings)
else:
return self._serve_html_page(r, settings)
def _handle_sse_connection(self, r: Request, settings: Mapping) -> Response:
"""处理 SSE 连接请求"""
connection_id = str(uuid.uuid4())
def generate():
# 1. 连接确认
yield f"data: {{\"type\": \"connection\", \"id\": \"{connection_id}\"}}\n\n"
# 2. 心跳机制 (有限次数,因为无法无限运行)
for _ in range(12): # 最多运行3分钟
time.sleep(15)
yield "data: {\"type\": \"ping\"}\n\n"
return Response(
generate(),
status=200,
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
```
### 2. 有限心跳模式
由于 Dify Endpoint 请求处理有时间限制,无法永久保持连接。采用有限心跳策略:
- 限制心跳次数,例如最多发送12次(约3分钟)
- 客户端检测到连接关闭后会自动重连
- 每次重连获得新的连接 ID
### 3. 状态管理权衡
- 使用连接级别变量而不是全局变量
- 放弃支持全局连接表和长期状态管理
- 专注于提供"最小可行"的 SSE 体验
## 客户端配置方案
客户端应配置为使用 GET 端点:
```json
{
"mcpServers": {
"dify-workflow": {
"url": "https://your-dify-instance.com/difyapp_as_mcp_server"
}
}
}
```
当指定 URL 时,MCP 客户端会自动:
- 向该 URL 发送 GET 请求建立 SSE 连接
- 向同一 URL 发送 POST 请求进行 JSON-RPC 调用
## 核心代码实现结构
以下是完整的架构设计实现方案:
```python
from typing import Mapping, Dict, Any, Optional
from werkzeug import Request, Response
from dify_plugin import Endpoint
import json
import uuid
import time
import asyncio
import inspect
from functools import wraps
# 全局工具注册
class ToolDef:
def __init__(self, func, name=None, description=None):
self.func = func
self.name = name or func.__name__
self.description = description or func.__doc__
self.signature = inspect.signature(func)
def get_schema(self):
"""生成工具的 JSON Schema"""
properties = {}
required = []
for name, param in self.signature.parameters.items():
if name == 'self':
continue
param_type = "string"
if param.annotation != inspect.Parameter.empty:
if param.annotation == int or param.annotation == float:
param_type = "number"
elif param.annotation == bool:
param_type = "boolean"
prop = {"type": param_type}
# 获取参数描述
if self.description:
for line in self.description.split('\n'):
if f"{name}:" in line or f"{name} :" in line:
prop["description"] = line.split(':', 1)[1].strip()
properties[name] = prop
# 如果参数没有默认值,则为必填
if param.default == inspect.Parameter.empty:
required.append(name)
return {
"name": self.name,
"description": self.description,
"input_schema": {
"type": "object",
"properties": properties,
"required": required
},
"output_schema": {
"type": "object",
"properties": {
"output": {
"type": "string",
"description": "The result of the function call"
}
}
}
}
async def execute(self, args):
"""执行工具"""
try:
result = self.func(**args)
if inspect.iscoroutine(result):
result = await result
return {"output": result}
except Exception as e:
return {"output": f"Error: {str(e)}"}
class ToolRegistry:
def __init__(self):
self.tools = {}
def tool(self, func=None, name=None, description=None):
"""工具注册装饰器"""
def decorator(f):
tool_def = ToolDef(f, name=name, description=description)
self.tools[tool_def.name] = tool_def
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
if func is None:
return decorator
return decorator(func)
def get_tools(self):
"""获取所有工具定义"""
return [tool.get_schema() for tool in self.tools.values()]
async def execute_tool(self, name, args):
"""执行指定名称的工具"""
if name not in self.tools:
raise ValueError(f"Tool {name} not found")
return await self.tools[name].execute(args)
# 创建全局工具注册表
tool_registry = ToolRegistry()
class DifyappAsMcpServerEndpoint(Endpoint):
def __init__(self):
super().__init__()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
# 注册工具
# 注意:这里使用全局 tool_registry,所以工具只会注册一次
if not tool_registry.tools:
self._register_tools()
def _register_tools(self):
"""注册 Dify workflow 工具"""
@tool_registry.tool
async def dify_workflow(title: str, language: str = "English") -> str:
"""执行 Dify workflow 并返回结果
Args:
title: 要处理的标题或主题
language: 输出使用的语言 (默认英文)
"""
try:
# 暂存 self 和 settings,稍后使用
app_id = getattr(self, 'current_app_id', "")
if not app_id:
return "Error: App ID not configured"
# 调用 Dify workflow
workflow_response = self.session.app.workflow.invoke(
app_id=app_id,
inputs={"title": title, "language": language},
response_mode="blocking"
)
# 从响应中提取输出
output = workflow_response.get("data", {}).get("outputs", {}).get("output", "")
if not output:
output = "Workflow completed but returned no output"
return output
except Exception as e:
return f"Error executing workflow: {str(e)}"
def _invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response:
"""请求处理入口点"""
# 保存当前应用 ID,供工具使用
self.current_app_id = settings.get('app_id', {}).get("app_id", "")
if r.method == "GET":
# 处理 GET 请求 (SSE 或 HTML)
if r.headers.get("Accept") == "text/event-stream":
return self._handle_sse_connection(r, settings)
else:
return self._serve_html_page(r, settings)
elif r.method == "POST" and r.is_json:
# 处理 POST 请求 (JSON-RPC 或直接调用)
data = r.get_json()
if "jsonrpc" in data:
return self._handle_jsonrpc(r, data, settings)
else:
return self._handle_direct_call(r, data, settings)
else:
# 不支持的请求
return Response(
"Unsupported request type",
status=400,
content_type="text/plain"
)
# GET 相关处理
def _serve_html_page(self, r: Request, settings: Mapping) -> Response:
"""返回 HTML 说明页面"""
server_name = settings.get("server_name", "Dify Workflow Server")
server_desc = settings.get("server_description", "Access Dify workflows via MCP")
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>{server_name}</title>
<style>
body {{ font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }}
h1 {{ color: #2c3e50; }}
code {{ background: #f8f8f8; padding: 2px 5px; border-radius: 3px; }}
pre {{ background: #f8f8f8; padding: 10px; border-radius: 5px; overflow-x: auto; }}
</style>
</head>
<body>
<h1>{server_name}</h1>
<p>{server_desc}</p>
<h2>MCP Server Information</h2>
<p>This endpoint implements the Model Context Protocol (MCP), allowing AI models to interact with Dify workflows.</p>
<h2>Available Tools</h2>
<p>This server exposes the following tools:</p>
<ul>
<li><strong>dify_workflow</strong>: Execute Dify workflows with custom inputs</li>
</ul>
<h2>Integration Instructions</h2>
<p>To connect an MCP client (like Claude Desktop):</p>
<ol>
<li>Add this URL to your MCP client configuration</li>
<li>The client will automatically discover available tools</li>
<li>Start using the tools through natural language commands</li>
</ol>
<h2>Example Configuration</h2>
<p>For Claude Desktop, add this to <code>claude_desktop_config.json</code>:</p>
<pre>{{
"mcpServers": {{
"dify-service": {{
"url": "{r.url_root}{r.path.lstrip('/')}"
}}
}}
}}</pre>
</body>
</html>
"""
return Response(
html,
status=200,
content_type="text/html"
)
def _handle_sse_connection(self, r: Request, settings: Mapping) -> Response:
"""处理 SSE 连接请求 - 有限心跳模式"""
connection_id = str(uuid.uuid4())
def generate():
# 1. 发送连接确认
yield f"data: {{\"type\": \"connection\", \"id\": \"{connection_id}\"}}\n\n"
# 可选:发送初始化和工具列表 (有些客户端期望这个)
tools_json = json.dumps({"type": "tools", "tools": tool_registry.get_tools()})
yield f"data: {tools_json}\n\n"
# 2. 心跳机制 (有限次数,因为无法无限运行)
# 假设 Dify Endpoint 请求最多可处理 5 分钟 = 300 秒
# 每 15 秒发送一次心跳,共 20 次
for _ in range(20):
time.sleep(15)
yield "data: {\"type\": \"ping\"}\n\n"
return Response(
generate(),
status=200,
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
# POST 相关处理
def _handle_jsonrpc(self, r: Request, data: Dict, settings: Mapping) -> Response:
"""处理 JSON-RPC 请求"""
method = data.get("method")
params = data.get("params", {})
req_id = data.get("id")
try:
if method == "initialize":
result = self._handle_initialize(params, settings)
elif method == "list_tools":
result = self._handle_list_tools(params, settings)
elif method == "call_tool":
result = self.loop.run_until_complete(self._handle_call_tool(params, settings))
else:
return self._jsonrpc_error(req_id, -32601, f"Method '{method}' not found")
return self._jsonrpc_success(req_id, result)
except Exception as e:
return self._jsonrpc_error(req_id, -32000, str(e))
def _handle_initialize(self, params: Dict, settings: Mapping) -> Dict:
"""处理 initialize 方法"""
server_name = settings.get("server_name", "Dify Workflow Server")
server_desc = settings.get("server_description", "Access Dify workflows via MCP")
return {
"name": server_name,
"description": server_desc,
"schema_version": "mcp-0.7.0",
"protocol_version": "0.7.0",
"server_source": "dify-plugin"
}
def _handle_list_tools(self, params: Dict, settings: Mapping) -> Dict:
"""处理 list_tools 方法"""
app_id = settings.get('app_id', {}).get("app_id", "")
if not app_id:
raise ValueError("App ID not configured")
# 使用工具注册表获取工具列表
tools = tool_registry.get_tools()
return {"tools": tools}
async def _handle_call_tool(self, params: Dict, settings: Mapping) -> Dict:
"""处理 call_tool 方法"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if not tool_name:
raise ValueError("Missing tool name")
# 使用工具注册表执行工具
return await tool_registry.execute_tool(tool_name, arguments)
def _handle_direct_call(self, r: Request, data: Dict, settings: Mapping) -> Response:
"""处理直接 workflow 调用(保持兼容)"""
try:
# 提取请求数据并准备工作流输入
workflow_inputs = {}
for key, value in data.get("responseValues", {}).items():
workflow_inputs[key] = value.get("value")
app_id = settings.get('app_id', {}).get("app_id", "")
# 调用 Dify 工作流
workflow_response = self.session.app.workflow.invoke(
app_id=app_id,
inputs=workflow_inputs,
response_mode="blocking",
)
return Response(
json.dumps({
"status": "success",
"workflow_response": workflow_response
}),
status=200,
content_type="application/json"
)
except Exception as e:
return Response(
json.dumps({
"error": str(e)
}),
status=500,
content_type="application/json"
)
# 工具函数
def _jsonrpc_success(self, req_id: Any, result: Any) -> Response:
"""构建 JSON-RPC 成功响应"""
return Response(
json.dumps({
"jsonrpc": "2.0",
"id": req_id,
"result": result
}),
status=200,
content_type="application/json"
)
def _jsonrpc_error(self, req_id: Any, code: int, message: str) -> Response:
"""构建 JSON-RPC 错误响应"""
return Response(
json.dumps({
"jsonrpc": "2.0",
"id": req_id,
"error": {
"code": code,
"message": message
}
}),
status=200, # JSON-RPC 总是返回 200,错误在响应内容中
content_type="application/json"
)
```
## 设计权衡与考虑
### 1. MCP SDK 使用方式的权衡
**选择**: 提取 SDK 核心功能,不直接使用其服务器组件
**理由**:
- Dify Endpoint 无法运行完整 SDK 服务器
- SDK 的工具注册和模式生成仍然有价值
- 手动实现协议处理比修改 SDK 更简单可控
### 2. SSE 实现策略权衡
**选择**: 有限心跳模式 + 生成器响应
**理由**:
- 完全符合 HTTP/SSE 标准,客户端兼容性最佳
- 在 Dify Endpoint 限制下提供最佳用户体验
- 不需要维护服务器状态,每个请求独立处理
### 3. 工具注册与执行权衡
**选择**: 全局工具注册表 + 实例级执行
**理由**:
- 避免每次请求重新注册工具(效率)
- 保持工具执行需要的上下文(会话和应用 ID)
- 兼容 Dify Endpoint 实例化模型
### 4. 错误处理策略
**选择**: JSON-RPC 标准错误 + 简洁错误信息
**理由**:
- 遵循 MCP 协议规范,保持兼容性
- 提供有用但不过度详细的错误信息
- 错误状态码保持为 200,符合 JSON-RPC 标准
## 推荐实施路径
1. **核心功能先行**:先实现 GET/POST 端点和基本 JSON-RPC 处理
2. **添加有限 SSE**:实现基础 SSE 连接和心跳机制
3. **工具模板化**:完善工具定义和文档自动生成
4. **测试与优化**:确保与主流 MCP 客户端兼容
此架构设计在 Dify Endpoint 限制下提供了最佳 MCP 实现方案,平衡了功能完整性和实现复杂度,让 Dify Workflow 可以作为 MCP 工具被 Claude 等客户端使用。