mcp-flowise

MIT License
18
""" Utility functions for mcp_flowise, including logging setup, chatflow filtering, and Flowise API interactions. This module centralizes shared functionality such as: 1. Logging configuration for consistent log output across the application. 2. Safe redaction of sensitive data like API keys in logs. 3. Low-level interactions with the Flowise API for predictions and chatflow management. 4. Flexible filtering of chatflows based on whitelist/blacklist criteria. """ import os import sys import logging import requests import re import json from dotenv import load_dotenv # Load environment variables from .env if present load_dotenv() # Flowise API configuration FLOWISE_API_KEY = os.getenv("FLOWISE_API_KEY", "") FLOWISE_API_ENDPOINT = os.getenv("FLOWISE_API_ENDPOINT", "http://localhost:3000") def setup_logging(debug: bool = False, log_dir: str = None, log_file: str = "debug-mcp-flowise.log") -> logging.Logger: """ Sets up logging for the application, including outputting CRITICAL and ERROR logs to stdout. Args: debug (bool): If True, set log level to DEBUG; otherwise, INFO. log_dir (str): Directory where log files will be stored. Ignored if `FLOWISE_LOGFILE_PATH` is set. log_file (str): Name of the log file. Ignored if `FLOWISE_LOGFILE_PATH` is set. Returns: logging.Logger: Configured logger instance. """ log_path = os.getenv("FLOWISE_LOGFILE_PATH") if not log_path: if log_dir is None: log_dir = os.path.join(os.path.expanduser("~"), "mcp_logs") try: os.makedirs(log_dir, exist_ok=True) log_path = os.path.join(log_dir, log_file) except PermissionError as e: # Fallback to stdout logging if directory creation fails log_path = None print(f"[ERROR] Failed to create log directory: {e}", file=sys.stderr) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG if debug else logging.INFO) logger.propagate = False # Prevent log messages from propagating to the root logger # Remove all existing handlers to prevent accumulation for handler in logger.handlers[:]: logger.removeHandler(handler) handlers = [] if log_path: try: file_handler = logging.FileHandler(log_path, mode="a") file_handler.setLevel(logging.DEBUG if debug else logging.INFO) formatter = logging.Formatter("[%(levelname)s] %(asctime)s - %(message)s") file_handler.setFormatter(formatter) handlers.append(file_handler) except Exception as e: print(f"[ERROR] Failed to create log file handler: {e}", file=sys.stderr) # Attempt to create StreamHandler for ERROR level logs try: stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setLevel(logging.ERROR) formatter = logging.Formatter("[%(levelname)s] %(message)s") stdout_handler.setFormatter(formatter) handlers.append(stdout_handler) except Exception as e: print(f"[ERROR] Failed to create stdout log handler: {e}", file=sys.stderr) # Add all handlers to the logger for handler in handlers: logger.addHandler(handler) if log_path: logger.debug(f"Logging initialized. Writing logs to {log_path}") else: logger.debug("Logging initialized. Logs will only appear in stdout.") return logger def redact_api_key(key: str) -> str: """ Redacts the Flowise API key for safe logging output. Args: key (str): The API key to redact. Returns: str: The redacted API key or '<not set>' if the key is invalid. """ if not key or len(key) <= 4: return "<not set>" return f"{key[:2]}{'*' * (len(key) - 4)}{key[-2:]}" def normalize_tool_name(name: str) -> str: """ Normalize tool names by converting to lowercase and replacing non-alphanumeric characters with underscores. Args: name (str): Original tool name. Returns: str: Normalized tool name. Returns 'unknown_tool' if the input is invalid. """ logger = logging.getLogger(__name__) if not name or not isinstance(name, str): logger.warning("Invalid tool name input: %s. Using default 'unknown_tool'.", name) return "unknown_tool" normalized = re.sub(r"[^a-zA-Z0-9]", "_", name).lower() logger.debug("Normalized tool name from '%s' to '%s'", name, normalized) return normalized or "unknown_tool" def filter_chatflows(chatflows: list[dict]) -> list[dict]: """ Filters chatflows based on whitelist and blacklist criteria. Whitelist takes precedence over blacklist. Args: chatflows (list[dict]): A list of chatflow dictionaries. Returns: list[dict]: Filtered list of chatflows. """ logger = logging.getLogger(__name__) # Dynamically fetch filtering criteria whitelist_ids = set(filter(bool, os.getenv("FLOWISE_WHITELIST_ID", "").split(","))) blacklist_ids = set(filter(bool, os.getenv("FLOWISE_BLACKLIST_ID", "").split(","))) whitelist_name_regex = os.getenv("FLOWISE_WHITELIST_NAME_REGEX", "") blacklist_name_regex = os.getenv("FLOWISE_BLACKLIST_NAME_REGEX", "") filtered_chatflows = [] for chatflow in chatflows: chatflow_id = chatflow.get("id", "") chatflow_name = chatflow.get("name", "") # Flags to determine inclusion is_whitelisted = False # Check Whitelist if whitelist_ids or whitelist_name_regex: if whitelist_ids and chatflow_id in whitelist_ids: is_whitelisted = True if whitelist_name_regex and re.search(whitelist_name_regex, chatflow_name): is_whitelisted = True if is_whitelisted: # If whitelisted, include regardless of blacklist logger.debug("Including whitelisted chatflow '%s' (ID: '%s').", chatflow_name, chatflow_id) filtered_chatflows.append(chatflow) continue # Skip blacklist checks else: # If not whitelisted, exclude regardless of blacklist logger.debug("Excluding non-whitelisted chatflow '%s' (ID: '%s').", chatflow_name, chatflow_id) continue else: # If no whitelist, apply blacklist directly if blacklist_ids and chatflow_id in blacklist_ids: logger.debug("Skipping chatflow '%s' (ID: '%s') - In blacklist.", chatflow_name, chatflow_id) continue # Exclude blacklisted by ID if blacklist_name_regex and re.search(blacklist_name_regex, chatflow_name): logger.debug("Skipping chatflow '%s' (ID: '%s') - Name matches blacklist regex.", chatflow_name, chatflow_id) continue # Exclude blacklisted by name # Include the chatflow if it passes all filters logger.debug("Including chatflow '%s' (ID: '%s').", chatflow_name, chatflow_id) filtered_chatflows.append(chatflow) logger.debug("Filtered chatflows: %d out of %d", len(filtered_chatflows), len(chatflows)) return filtered_chatflows def flowise_predict(chatflow_id: str, question: str) -> str: """ Sends a question to a specific chatflow ID via the Flowise API and returns the response JSON text. Args: chatflow_id (str): The ID of the Flowise chatflow to be used. question (str): The question or prompt to send to the chatflow. Returns: str: The raw JSON response text from the Flowise API, or an error message if something goes wrong. """ logger = logging.getLogger(__name__) # Construct the Flowise API URL for predictions url = f"{FLOWISE_API_ENDPOINT.rstrip('/')}/api/v1/prediction/{chatflow_id}" headers = { "Content-Type": "application/json", } if FLOWISE_API_KEY: headers["Authorization"] = f"Bearer {FLOWISE_API_KEY}" payload = {"question": question} logger.debug(f"Sending prediction request to {url} with payload: {payload}") try: # Send POST request to the Flowise API response = requests.post(url, json=payload, headers=headers, timeout=30) logger.debug(f"Prediction response code: HTTP {response.status_code}") # response.raise_for_status() # Log the raw response text for debugging logger.debug(f"Raw prediction response: {response.text}") # Return the raw JSON response text return response.text #except requests.exceptions.RequestException as e: except Exception as e: # Log and return an error message logger.error(f"Error during prediction: {e}") return json.dumps({"error": str(e)}) def fetch_chatflows() -> list[dict]: """ Fetch a list of all chatflows from the Flowise API. Returns: list of dict: Each dict contains the 'id' and 'name' of a chatflow. Returns an empty list if there's an error. """ logger = logging.getLogger(__name__) # Construct the Flowise API URL for fetching chatflows url = f"{FLOWISE_API_ENDPOINT.rstrip('/')}/api/v1/chatflows" headers = {} if FLOWISE_API_KEY: headers["Authorization"] = f"Bearer {FLOWISE_API_KEY}" logger.debug(f"Fetching chatflows from {url}") try: # Send GET request to the Flowise API response = requests.get(url, headers=headers, timeout=30) response.raise_for_status() # Parse and simplify the response data chatflows_data = response.json() simplified_chatflows = [{"id": cf["id"], "name": cf["name"]} for cf in chatflows_data] logger.debug(f"Fetched chatflows: {simplified_chatflows}") return filter_chatflows(simplified_chatflows) #except requests.exceptions.RequestException as e: except Exception as e: # Log and return an empty list on error logger.error(f"Error fetching chatflows: {e}") return [] # Set up logging before obtaining the logger DEBUG = os.getenv("DEBUG", "").lower() in ("true", "1", "yes") logger = setup_logging(debug=DEBUG) # Log key environment variable values logger.debug(f"Flowise API Key (redacted): {redact_api_key(FLOWISE_API_KEY)}") logger.debug(f"Flowise API Endpoint: {FLOWISE_API_ENDPOINT}")