mcp-flowise

MIT License
18
''' Low-Level Server for the Flowise MCP. This server dynamically registers tools based on the provided chatflows retrieved from the Flowise API. Tool names are normalized for safety and consistency, and potential conflicts are logged. Descriptions for tools are prioritized from FLOWISE_CHATFLOW_DESCRIPTIONS, falling back to the chatflow names when not provided. Conflicts in tool names after normalization are handled gracefully by skipping those chatflows. ''' import os import sys import asyncio import json from typing import List, Dict, Any from mcp import types from mcp.server.lowlevel import Server from mcp.server.models import InitializationOptions from mcp.server.stdio import stdio_server from mcp_flowise.utils import ( flowise_predict, fetch_chatflows, normalize_tool_name, setup_logging, ) # Configure logging DEBUG = os.getenv("DEBUG", "").lower() in ("true", "1", "yes") logger = setup_logging(debug=DEBUG) # Global tool mapping: tool name to chatflow ID NAME_TO_ID_MAPPING: Dict[str, str] = {} tools: List[types.Tool] = [] # Initialize the Low-Level MCP Server mcp = Server("FlowiseMCP-with-EnvAuth") def get_chatflow_descriptions() -> Dict[str, str]: """ Parse the FLOWISE_CHATFLOW_DESCRIPTIONS environment variable for descriptions. Returns: dict: A dictionary mapping chatflow IDs to descriptions. """ descriptions_env = os.getenv("FLOWISE_CHATFLOW_DESCRIPTIONS", "") if not descriptions_env: logger.debug("No FLOWISE_CHATFLOW_DESCRIPTIONS provided.") return {} logger.debug("Retrieved FLOWISE_CHATFLOW_DESCRIPTIONS: %s", descriptions_env) descriptions = {} for pair in descriptions_env.split(","): if ":" not in pair: logger.warning("Invalid format in FLOWISE_CHATFLOW_DESCRIPTIONS: %s", pair) continue chatflow_id, description = map(str.strip, pair.split(":", 1)) if chatflow_id and description: descriptions[chatflow_id] = description logger.debug("Parsed FLOWISE_CHATFLOW_DESCRIPTIONS: %s", descriptions) return descriptions async def dispatcher_handler(request: types.CallToolRequest) -> types.ServerResult: """ Dispatcher handler that routes CallToolRequest to the appropriate tool handler based on the tool name. """ try: tool_name = request.params.name logger.debug("Dispatcher received CallToolRequest for tool: %s", tool_name) if tool_name not in NAME_TO_ID_MAPPING: logger.error("Unknown tool requested: %s", tool_name) return types.ServerResult( root=types.CallToolResult( content=[types.TextContent(type="text", text="Unknown tool requested")] ) ) chatflow_id = NAME_TO_ID_MAPPING[tool_name] question = request.params.arguments.get("question", "") if not question: logger.error("Missing 'question' argument for tool: %s", tool_name) return types.ServerResult( root=types.CallToolResult( content=[types.TextContent(type="text", text="Missing 'question' argument.")] ) ) logger.debug("Dispatching prediction for chatflow_id: %s with question: %s", chatflow_id, question) # Call the prediction function try: result = flowise_predict(chatflow_id, question) logger.debug("Prediction result: %s", result) except Exception as pred_err: logger.error("Error during prediction: %s", pred_err, exc_info=True) result = json.dumps({"error": "Error occurred during prediction."}) # Pass the raw JSON response or error JSON back to the client return types.ServerResult( root=types.CallToolResult( content=[types.TextContent(type="text", text=result)] ) ) except Exception as e: logger.error("Unhandled exception in dispatcher_handler: %s", e, exc_info=True) return types.ServerResult( root=types.CallToolResult( content=[types.TextContent(type="text", text=json.dumps({"error": "Internal server error."}))] # Ensure JSON is returned ) ) async def list_tools(request: types.ListToolsRequest) -> types.ServerResult: """ Handler for ListToolsRequest to list all registered tools. Args: request (types.ListToolsRequest): The request to list tools. Returns: types.ServerResult: The result containing the list of tools. """ logger.debug("Handling list_tools request.") return types.ServerResult(root=types.ListToolsResult(tools=tools)) def register_tools(chatflows: List[Dict[str, Any]], chatflow_descriptions: Dict[str, str]) -> List[types.Tool]: """ Register tools dynamically based on the provided chatflows. Args: chatflows (List[Dict[str, Any]]): List of chatflows retrieved from the Flowise API. chatflow_descriptions (Dict[str, str]): Dictionary mapping chatflow IDs to descriptions. Returns: List[types.Tool]: List of registered tools. """ global tools tools = [] # Clear existing tools before re-registration for chatflow in chatflows: try: normalized_name = normalize_tool_name(chatflow["name"]) if normalized_name in NAME_TO_ID_MAPPING: logger.warning( "Tool name conflict: '%s' already exists. Skipping chatflow '%s' (ID: '%s').", normalized_name, chatflow["name"], chatflow["id"], ) continue NAME_TO_ID_MAPPING[normalized_name] = chatflow["id"] description = chatflow_descriptions.get(chatflow["id"], chatflow["name"]) tool = types.Tool( name=normalized_name, description=description, inputSchema={ "type": "object", "required": ["question"], "properties": {"question": {"type": "string"}}, }, ) tools.append(tool) logger.debug("Registered tool: %s (ID: %s)", tool.name, chatflow["id"]) except Exception as e: logger.error("Error registering chatflow '%s' (ID: '%s'): %s", chatflow["name"], chatflow["id"], e) return tools async def start_server(): """ Start the Low-Level MCP server. """ logger.debug("Starting Low-Level MCP server...") try: async with stdio_server() as (read_stream, write_stream): await mcp.run( read_stream, write_stream, initialization_options=InitializationOptions( server_name="FlowiseMCP-with-EnvAuth", server_version="0.1.0", capabilities=types.ServerCapabilities(), ), ) except Exception as e: logger.critical("Unhandled exception in MCP server: %s", e) sys.exit(1) def run_server(): """ Run the Low-Level Flowise server by registering tools dynamically. """ try: chatflows = fetch_chatflows() if not chatflows: raise ValueError("No chatflows retrieved from the Flowise API.") except Exception as e: logger.critical("Failed to start server: %s", e) sys.exit(1) chatflow_descriptions = get_chatflow_descriptions() register_tools(chatflows, chatflow_descriptions) if not tools: logger.critical("No valid tools registered. Shutting down the server.") sys.exit(1) mcp.request_handlers[types.CallToolRequest] = dispatcher_handler logger.debug("Registered dispatcher_handler for CallToolRequest.") mcp.request_handlers[types.ListToolsRequest] = list_tools logger.debug("Registered list_tools handler.") try: asyncio.run(start_server()) except KeyboardInterrupt: logger.debug("MCP server shutdown initiated by user.") except Exception as e: logger.critical("Failed to start MCP server: %s", e) sys.exit(1) if __name__ == "__main__": run_server()