Dify Server

import asyncio import json import os from abc import ABC import mcp.server.stdio import mcp.types as types import requests from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions from omegaconf import OmegaConf class DifyAPI(ABC): def __init__(self, config_path, user="default_user"): if not config_path: raise ValueError("config path not provided") self.config = OmegaConf.load(config_path) # dify configs self.dify_base_url = self.config.dify_base_url self.dify_app_sks = self.config.dify_app_sks self.user = user # dify app infos dify_app_infos = [] dify_app_params = [] dify_app_metas = [] for key in self.dify_app_sks: dify_app_infos.append(self.get_app_info(key)) dify_app_params.append(self.get_app_parameters(key)) dify_app_metas.append(self.get_app_meta(key)) self.dify_app_infos = dify_app_infos self.dify_app_params = dify_app_params self.dify_app_metas = dify_app_metas self.dify_app_names = [x['name'] for x in dify_app_infos] def chat_message( self, api_key, inputs={}, response_mode="streaming", conversation_id=None, user="default_user", files=None,): url = f"{self.dify_base_url}/workflows/run" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } data = { "inputs": inputs, "response_mode": response_mode, "user": user, } if conversation_id: data["conversation_id"] = conversation_id if files: files_data = [] for file_info in files: file_path = file_info.get('path') transfer_method = file_info.get('transfer_method') if transfer_method == 'local_file': files_data.append(('file', open(file_path, 'rb'))) elif transfer_method == 'remote_url': pass response = requests.post( url, headers=headers, data=data, files=files_data, stream=response_mode == "streaming") else: response = requests.post( url, headers=headers, json=data, stream=response_mode == "streaming") response.raise_for_status() if response_mode == "streaming": for line in response.iter_lines(): if line: if line.startswith(b'data:'): try: json_data = json.loads(line[5:].decode('utf-8')) yield json_data except json.JSONDecodeError: print(f"Error decoding JSON: {line}") else: return response.json() def upload_file( self, api_key, file_path, user="default_user"): url = f"{self.dify_base_url}/files/upload" headers = { "Authorization": f"Bearer {api_key}" } files = { "file": open(file_path, "rb") } data = { "user": user } response = requests.post(url, headers=headers, files=files, data=data) response.raise_for_status() return response.json() def stop_response( self, api_key, task_id, user="default_user"): url = f"{self.dify_base_url}/chat-messages/{task_id}/stop" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } data = { "user": user } response = requests.post(url, headers=headers, json=data) response.raise_for_status() return response.json() def get_app_info( self, api_key, user="default_user"): url = f"{self.dify_base_url}/info" headers = { "Authorization": f"Bearer {api_key}" } params = { "user": user } response = requests.get(url, headers=headers, params=params) response.raise_for_status() return response.json() def get_app_parameters( self, api_key, user="default_user"): url = f"{self.dify_base_url}/parameters" headers = { "Authorization": f"Bearer {api_key}" } params = { "user": user } response = requests.get(url, headers=headers, params=params) response.raise_for_status() return response.json() def get_app_meta( self, api_key, user="default_user"): url = f"{self.dify_base_url}/meta" headers = { "Authorization": f"Bearer {api_key}" } params = { "user": user } response = requests.get(url, headers=headers, params=params) response.raise_for_status() return response.json() config_path = os.getenv("CONFIG_PATH") server = Server("dify_mcp_server") dify_api = DifyAPI(config_path) @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """ List available tools. Each tool specifies its arguments using JSON Schema validation. """ tools = [] tool_names = dify_api.dify_app_names tool_infos = dify_api.dify_app_infos tool_params = dify_api.dify_app_params tool_num = len(tool_names) for i in range(tool_num): # 0. load app info for each tool app_info = tool_infos[i] # 1. load app param for each tool inputSchema = dict( type="object", properties={}, required=[], ) app_param = tool_params[i] property_num = len(app_param['user_input_form']) if property_num > 0: for j in range(property_num): param = app_param['user_input_form'][j] # TODO: Add readme about strange dify user input param format param_type = list(param.keys())[0] param_info = param[param_type] property_name = param_info['variable'] inputSchema["properties"][property_name] = dict( type=param_type, description=param_info['label'], ) if param_info['required']: inputSchema['required'].append(property_name) tools.append( types.Tool( name=app_info['name'], description=app_info['description'], inputSchema=inputSchema, ) ) return tools @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: tool_names = dify_api.dify_app_names if name in tool_names: tool_idx = tool_names.index(name) tool_sk = dify_api.dify_app_sks[tool_idx] responses = dify_api.chat_message( tool_sk, arguments, ) for res in responses: if res['event'] == 'workflow_finished': outputs = res['data']['outputs'] mcp_out = [] for _, v in outputs.items(): mcp_out.append( types.TextContent( type='text', text=v ) ) return mcp_out else: raise ValueError(f"Unknown tool: {name}") async def main(): # Run the server using stdin/stdout streams async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="dify_mcp_server", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())