Skip to main content
Glama

Dify as MCP Server

by Yevanchen
roadmap.md20.4 kB
# Dify Endpoint 中实现 MCP 服务器的架构设计 ## 总体 Roadmap ### 阶段 1: 基础 MCP 功能实现 - 设计端点架构,支持 GET 和 POST 请求 - 实现核心 JSON-RPC 处理 - 适配 Werkzeug 与 MCP SDK (如可能) ### 阶段 2: SSE 支持与用户体验优化 - 在端点限制下实现基础 SSE 功能 - 添加进度通知和心跳机制 - 优化错误处理和重试机制 ### 阶段 3: 高级功能与生产化 - 完善工具定义和文档 - 添加配置选项和自定义功能 - 提高安全性和性能 ## 架构设计 由于 Dify 不支持 `ANY` 类型的端点,需要创建两个独立端点来处理不同类型的请求: ``` /difyapp_as_mcp_server (GET) -> 处理 SSE 连接和说明页面 /difyapp_as_mcp_server (POST) -> 处理 JSON-RPC 请求 ``` ### 端点结构定义 ```yaml # GET 端点配置 path: "/difyapp_as_mcp_server" method: "GET" extra: python: source: "endpoints/difyapp_as_mcp_server.py" # POST 端点配置 path: "/difyapp_as_mcp_server" method: "POST" extra: python: source: "endpoints/difyapp_as_mcp_server.py" ``` ## MCP SDK 与 Werkzeug 适配方案 ### 挑战分析 1. **MCP SDK 设计为独立服务器**: - 通常由 SDK 控制整个 HTTP 服务器 - 假设可以完全控制请求/响应生命周期 2. **Dify Endpoint 限制**: - 只是 HTTP 处理器,不是完整服务器 - 没有控制服务器生命周期的权限 - 无法直接使用 SDK 的 `run()` 方法 ### 适配策略 采用**核心功能提取**策略:从 MCP SDK 中提取关键功能,而不是直接使用其服务器组件。 1. **引入 SDK 作为工具库**: ```python from mcp.server.fastmcp import FastMCP from mcp.server.tool_registry import ToolRegistry ``` 2. **使用 SDK 注册工具**,但自行处理请求: ```python # 在类外创建全局工具注册表 tool_registry = ToolRegistry() class DifyappAsMcpServerEndpoint(Endpoint): def __init__(self): super().__init__() # 工具定义,使用 SDK 的工具注册功能 self._setup_tools(tool_registry) def _setup_tools(self, registry): @registry.tool async def dify_workflow(title: str, language: str = "English") -> str: """执行 Dify workflow 并返回结果""" # 实现... ``` 3. **手动路由并调用 SDK 功能**: ```python def _invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response: if r.method == "GET": return self._handle_get(r, settings) elif r.method == "POST" and r.is_json: data = r.get_json() if "jsonrpc" in data: return self._handle_jsonrpc(r, data, settings) else: return self._handle_direct_call(r, data, settings) ``` ## SSE 实现方案 在 Dify Endpoint 中实现 SSE 确实存在挑战,但可以通过以下策略实现基础 SSE 功能: ### 1. 生成器响应模式 ```python def _handle_get(self, r: Request, settings: Mapping) -> Response: """处理 GET 请求 - 支持 SSE 和普通页面""" if r.headers.get("Accept") == "text/event-stream": return self._handle_sse_connection(r, settings) else: return self._serve_html_page(r, settings) def _handle_sse_connection(self, r: Request, settings: Mapping) -> Response: """处理 SSE 连接请求""" connection_id = str(uuid.uuid4()) def generate(): # 1. 连接确认 yield f"data: {{\"type\": \"connection\", \"id\": \"{connection_id}\"}}\n\n" # 2. 心跳机制 (有限次数,因为无法无限运行) for _ in range(12): # 最多运行3分钟 time.sleep(15) yield "data: {\"type\": \"ping\"}\n\n" return Response( generate(), status=200, content_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive" } ) ``` ### 2. 有限心跳模式 由于 Dify Endpoint 请求处理有时间限制,无法永久保持连接。采用有限心跳策略: - 限制心跳次数,例如最多发送12次(约3分钟) - 客户端检测到连接关闭后会自动重连 - 每次重连获得新的连接 ID ### 3. 状态管理权衡 - 使用连接级别变量而不是全局变量 - 放弃支持全局连接表和长期状态管理 - 专注于提供"最小可行"的 SSE 体验 ## 客户端配置方案 客户端应配置为使用 GET 端点: ```json { "mcpServers": { "dify-workflow": { "url": "https://your-dify-instance.com/difyapp_as_mcp_server" } } } ``` 当指定 URL 时,MCP 客户端会自动: - 向该 URL 发送 GET 请求建立 SSE 连接 - 向同一 URL 发送 POST 请求进行 JSON-RPC 调用 ## 核心代码实现结构 以下是完整的架构设计实现方案: ```python from typing import Mapping, Dict, Any, Optional from werkzeug import Request, Response from dify_plugin import Endpoint import json import uuid import time import asyncio import inspect from functools import wraps # 全局工具注册 class ToolDef: def __init__(self, func, name=None, description=None): self.func = func self.name = name or func.__name__ self.description = description or func.__doc__ self.signature = inspect.signature(func) def get_schema(self): """生成工具的 JSON Schema""" properties = {} required = [] for name, param in self.signature.parameters.items(): if name == 'self': continue param_type = "string" if param.annotation != inspect.Parameter.empty: if param.annotation == int or param.annotation == float: param_type = "number" elif param.annotation == bool: param_type = "boolean" prop = {"type": param_type} # 获取参数描述 if self.description: for line in self.description.split('\n'): if f"{name}:" in line or f"{name} :" in line: prop["description"] = line.split(':', 1)[1].strip() properties[name] = prop # 如果参数没有默认值,则为必填 if param.default == inspect.Parameter.empty: required.append(name) return { "name": self.name, "description": self.description, "input_schema": { "type": "object", "properties": properties, "required": required }, "output_schema": { "type": "object", "properties": { "output": { "type": "string", "description": "The result of the function call" } } } } async def execute(self, args): """执行工具""" try: result = self.func(**args) if inspect.iscoroutine(result): result = await result return {"output": result} except Exception as e: return {"output": f"Error: {str(e)}"} class ToolRegistry: def __init__(self): self.tools = {} def tool(self, func=None, name=None, description=None): """工具注册装饰器""" def decorator(f): tool_def = ToolDef(f, name=name, description=description) self.tools[tool_def.name] = tool_def @wraps(f) def wrapper(*args, **kwargs): return f(*args, **kwargs) return wrapper if func is None: return decorator return decorator(func) def get_tools(self): """获取所有工具定义""" return [tool.get_schema() for tool in self.tools.values()] async def execute_tool(self, name, args): """执行指定名称的工具""" if name not in self.tools: raise ValueError(f"Tool {name} not found") return await self.tools[name].execute(args) # 创建全局工具注册表 tool_registry = ToolRegistry() class DifyappAsMcpServerEndpoint(Endpoint): def __init__(self): super().__init__() self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) # 注册工具 # 注意:这里使用全局 tool_registry,所以工具只会注册一次 if not tool_registry.tools: self._register_tools() def _register_tools(self): """注册 Dify workflow 工具""" @tool_registry.tool async def dify_workflow(title: str, language: str = "English") -> str: """执行 Dify workflow 并返回结果 Args: title: 要处理的标题或主题 language: 输出使用的语言 (默认英文) """ try: # 暂存 self 和 settings,稍后使用 app_id = getattr(self, 'current_app_id', "") if not app_id: return "Error: App ID not configured" # 调用 Dify workflow workflow_response = self.session.app.workflow.invoke( app_id=app_id, inputs={"title": title, "language": language}, response_mode="blocking" ) # 从响应中提取输出 output = workflow_response.get("data", {}).get("outputs", {}).get("output", "") if not output: output = "Workflow completed but returned no output" return output except Exception as e: return f"Error executing workflow: {str(e)}" def _invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response: """请求处理入口点""" # 保存当前应用 ID,供工具使用 self.current_app_id = settings.get('app_id', {}).get("app_id", "") if r.method == "GET": # 处理 GET 请求 (SSE 或 HTML) if r.headers.get("Accept") == "text/event-stream": return self._handle_sse_connection(r, settings) else: return self._serve_html_page(r, settings) elif r.method == "POST" and r.is_json: # 处理 POST 请求 (JSON-RPC 或直接调用) data = r.get_json() if "jsonrpc" in data: return self._handle_jsonrpc(r, data, settings) else: return self._handle_direct_call(r, data, settings) else: # 不支持的请求 return Response( "Unsupported request type", status=400, content_type="text/plain" ) # GET 相关处理 def _serve_html_page(self, r: Request, settings: Mapping) -> Response: """返回 HTML 说明页面""" server_name = settings.get("server_name", "Dify Workflow Server") server_desc = settings.get("server_description", "Access Dify workflows via MCP") html = f""" <!DOCTYPE html> <html> <head> <title>{server_name}</title> <style> body {{ font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }} h1 {{ color: #2c3e50; }} code {{ background: #f8f8f8; padding: 2px 5px; border-radius: 3px; }} pre {{ background: #f8f8f8; padding: 10px; border-radius: 5px; overflow-x: auto; }} </style> </head> <body> <h1>{server_name}</h1> <p>{server_desc}</p> <h2>MCP Server Information</h2> <p>This endpoint implements the Model Context Protocol (MCP), allowing AI models to interact with Dify workflows.</p> <h2>Available Tools</h2> <p>This server exposes the following tools:</p> <ul> <li><strong>dify_workflow</strong>: Execute Dify workflows with custom inputs</li> </ul> <h2>Integration Instructions</h2> <p>To connect an MCP client (like Claude Desktop):</p> <ol> <li>Add this URL to your MCP client configuration</li> <li>The client will automatically discover available tools</li> <li>Start using the tools through natural language commands</li> </ol> <h2>Example Configuration</h2> <p>For Claude Desktop, add this to <code>claude_desktop_config.json</code>:</p> <pre>{{ "mcpServers": {{ "dify-service": {{ "url": "{r.url_root}{r.path.lstrip('/')}" }} }} }}</pre> </body> </html> """ return Response( html, status=200, content_type="text/html" ) def _handle_sse_connection(self, r: Request, settings: Mapping) -> Response: """处理 SSE 连接请求 - 有限心跳模式""" connection_id = str(uuid.uuid4()) def generate(): # 1. 发送连接确认 yield f"data: {{\"type\": \"connection\", \"id\": \"{connection_id}\"}}\n\n" # 可选:发送初始化和工具列表 (有些客户端期望这个) tools_json = json.dumps({"type": "tools", "tools": tool_registry.get_tools()}) yield f"data: {tools_json}\n\n" # 2. 心跳机制 (有限次数,因为无法无限运行) # 假设 Dify Endpoint 请求最多可处理 5 分钟 = 300 秒 # 每 15 秒发送一次心跳,共 20 次 for _ in range(20): time.sleep(15) yield "data: {\"type\": \"ping\"}\n\n" return Response( generate(), status=200, content_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive" } ) # POST 相关处理 def _handle_jsonrpc(self, r: Request, data: Dict, settings: Mapping) -> Response: """处理 JSON-RPC 请求""" method = data.get("method") params = data.get("params", {}) req_id = data.get("id") try: if method == "initialize": result = self._handle_initialize(params, settings) elif method == "list_tools": result = self._handle_list_tools(params, settings) elif method == "call_tool": result = self.loop.run_until_complete(self._handle_call_tool(params, settings)) else: return self._jsonrpc_error(req_id, -32601, f"Method '{method}' not found") return self._jsonrpc_success(req_id, result) except Exception as e: return self._jsonrpc_error(req_id, -32000, str(e)) def _handle_initialize(self, params: Dict, settings: Mapping) -> Dict: """处理 initialize 方法""" server_name = settings.get("server_name", "Dify Workflow Server") server_desc = settings.get("server_description", "Access Dify workflows via MCP") return { "name": server_name, "description": server_desc, "schema_version": "mcp-0.7.0", "protocol_version": "0.7.0", "server_source": "dify-plugin" } def _handle_list_tools(self, params: Dict, settings: Mapping) -> Dict: """处理 list_tools 方法""" app_id = settings.get('app_id', {}).get("app_id", "") if not app_id: raise ValueError("App ID not configured") # 使用工具注册表获取工具列表 tools = tool_registry.get_tools() return {"tools": tools} async def _handle_call_tool(self, params: Dict, settings: Mapping) -> Dict: """处理 call_tool 方法""" tool_name = params.get("name") arguments = params.get("arguments", {}) if not tool_name: raise ValueError("Missing tool name") # 使用工具注册表执行工具 return await tool_registry.execute_tool(tool_name, arguments) def _handle_direct_call(self, r: Request, data: Dict, settings: Mapping) -> Response: """处理直接 workflow 调用(保持兼容)""" try: # 提取请求数据并准备工作流输入 workflow_inputs = {} for key, value in data.get("responseValues", {}).items(): workflow_inputs[key] = value.get("value") app_id = settings.get('app_id', {}).get("app_id", "") # 调用 Dify 工作流 workflow_response = self.session.app.workflow.invoke( app_id=app_id, inputs=workflow_inputs, response_mode="blocking", ) return Response( json.dumps({ "status": "success", "workflow_response": workflow_response }), status=200, content_type="application/json" ) except Exception as e: return Response( json.dumps({ "error": str(e) }), status=500, content_type="application/json" ) # 工具函数 def _jsonrpc_success(self, req_id: Any, result: Any) -> Response: """构建 JSON-RPC 成功响应""" return Response( json.dumps({ "jsonrpc": "2.0", "id": req_id, "result": result }), status=200, content_type="application/json" ) def _jsonrpc_error(self, req_id: Any, code: int, message: str) -> Response: """构建 JSON-RPC 错误响应""" return Response( json.dumps({ "jsonrpc": "2.0", "id": req_id, "error": { "code": code, "message": message } }), status=200, # JSON-RPC 总是返回 200,错误在响应内容中 content_type="application/json" ) ``` ## 设计权衡与考虑 ### 1. MCP SDK 使用方式的权衡 **选择**: 提取 SDK 核心功能,不直接使用其服务器组件 **理由**: - Dify Endpoint 无法运行完整 SDK 服务器 - SDK 的工具注册和模式生成仍然有价值 - 手动实现协议处理比修改 SDK 更简单可控 ### 2. SSE 实现策略权衡 **选择**: 有限心跳模式 + 生成器响应 **理由**: - 完全符合 HTTP/SSE 标准,客户端兼容性最佳 - 在 Dify Endpoint 限制下提供最佳用户体验 - 不需要维护服务器状态,每个请求独立处理 ### 3. 工具注册与执行权衡 **选择**: 全局工具注册表 + 实例级执行 **理由**: - 避免每次请求重新注册工具(效率) - 保持工具执行需要的上下文(会话和应用 ID) - 兼容 Dify Endpoint 实例化模型 ### 4. 错误处理策略 **选择**: JSON-RPC 标准错误 + 简洁错误信息 **理由**: - 遵循 MCP 协议规范,保持兼容性 - 提供有用但不过度详细的错误信息 - 错误状态码保持为 200,符合 JSON-RPC 标准 ## 推荐实施路径 1. **核心功能先行**:先实现 GET/POST 端点和基本 JSON-RPC 处理 2. **添加有限 SSE**:实现基础 SSE 连接和心跳机制 3. **工具模板化**:完善工具定义和文档自动生成 4. **测试与优化**:确保与主流 MCP 客户端兼容 此架构设计在 Dify Endpoint 限制下提供了最佳 MCP 实现方案,平衡了功能完整性和实现复杂度,让 Dify Workflow 可以作为 MCP 工具被 Claude 等客户端使用。

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Yevanchen/difyapp_as_mcp_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server