Skip to main content
Glama
server.py91.2 kB
"""WorkFlowy MCP server implementation using FastMCP.""" import sys from datetime import datetime # Also log to file to debug deployment/environment try: with open(r"E:\__daniel347x\__Obsidian\__Inking into Mind\--TypingMind\Projects - All\Projects - Individual\TODO\temp\reconcile_debug.log", "a", encoding="utf-8") as f: f.write(f"[{datetime.now().isoformat()}] DEBUG: Workflowy MCP Server loaded from {__file__}\n") except Exception: pass print("DEBUG: Workflowy MCP Server loaded from " + __file__, file=sys.stderr) import asyncio import json import logging import os from contextlib import asynccontextmanager from datetime import datetime from typing import Literal, Any, Awaitable, Callable from fastmcp import FastMCP from .client import AdaptiveRateLimiter, WorkFlowyClient from .config import ServerConfig, setup_logging from .models import ( NodeCreateRequest, NodeListRequest, NodeUpdateRequest, WorkFlowyNode, ) class _ClientLogger: """Lightweight logger that delegates to _log / log_event. Used in place of logging.getLogger(...) so existing logger.info / logger.warning / logger.error calls feed into the DAGGER logger instead of Python's logging module. """ def __init__(self, component: str = "SERVER") -> None: self._component = component def _msg(self, msg: object) -> str: try: return str(msg) except Exception: return repr(msg) def info(self, msg: object, *args: object, **kwargs: object) -> None: _log(self._msg(msg), self._component) def warning(self, msg: object, *args: object, **kwargs: object) -> None: _log(f"WARNING: {self._msg(msg)}", self._component) def error(self, msg: object, *args: object, **kwargs: object) -> None: _log(f"ERROR: {self._msg(msg)}", self._component) def debug(self, msg: object, *args: object, **kwargs: object) -> None: _log(f"DEBUG: {self._msg(msg)}", self._component) def exception(self, msg: object, *args: object, **kwargs: object) -> None: _log(f"EXCEPTION: {self._msg(msg)}", self._component) logger = _ClientLogger("SERVER") # Global client instance _client: WorkFlowyClient | None = None _rate_limiter: AdaptiveRateLimiter | None = None # Global WebSocket connection for DOM cache _ws_connection = None _ws_server_task = None _ws_message_queue = None # asyncio.Queue for message routing # In-memory job registry for long-running operations (ETCH, NEXUS, etc.) _jobs: dict[str, dict[str, Any]] = {} _job_counter: int = 0 _job_lock: asyncio.Lock = asyncio.Lock() # AI Dagger root (top-level for MCP workflows) DAGGER_ROOT_ID = "b49affa1-3930-95e3-b2fe-ad9b881285e2" def get_client() -> WorkFlowyClient: """Get the global WorkFlowy client instance.""" global _client if _client is None: raise RuntimeError("WorkFlowy client not initialized. Server not started properly.") return _client def get_ws_connection(): """Get the current WebSocket connection and message queue (if any).""" global _ws_connection, _ws_message_queue return _ws_connection, _ws_message_queue def log_event(message: str, component: str = "SERVER") -> None: """Log an event to stderr with timestamp and consistent formatting.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 🗡️ prefix makes it easy to grep/spot in the console print(f"[{timestamp}] 🗡️ [{component}] {message}", file=sys.stderr, flush=True) def _log(message: str, component: str = "SERVER") -> None: """Unified log wrapper used throughout this server. Ensures all logging uses the DAGGER+DATETIME+TAG prefix and plain print(..., file=sys.stderr), which reliably surfaces in the MCP connector console (FastMCP tends to swallow standard logging output). """ log_event(message, component) async def _resolve_uuid_path_and_respond(target_uuid: str | None, websocket, format_mode: str = "f3") -> None: """Resolve full ancestor path for target_uuid and send result back to extension. Implementation note (v3): - Uses /nodes-export cache to build parent map (all nodes in one call) - Walks parent_id chain from leaf to root using cached data - O(1) API call + O(depth) traversal = fast and reliable - Decodes HTML entities (&lt; &gt; &amp;) in node names for display format_mode: - "f3" (default): Compact mode. One line per node. - "f2": Full/Verbose mode. Node name, then UUID on next line, then blank line. """ import html client = get_client() target = (target_uuid or "").strip() if not target: await websocket.send(json.dumps({ "action": "uuid_path_result", "success": False, "target_uuid": target_uuid, "error": "No target_uuid provided", })) return # Handle virtual UUIDs (generated by GLIMPSE for mirrored nodes) # Format: virtual_<actual-uuid>_<parent-uuid>_<hash> # For mirrors: show BOTH paths (actual node + mirror location context) is_mirrored = False mirror_parent_uuid = None if target.startswith("virtual_"): parts = target.split("_") # parts[0] = "virtual", parts[1] = actual UUID, parts[2] = parent UUID, parts[3+] = hash if len(parts) >= 3: actual_uuid = parts[1] mirror_parent_uuid = parts[2] is_mirrored = True log_event(f"Virtual UUID detected (mirrored node): actual={actual_uuid}, mirror_parent={mirror_parent_uuid}", "UUID_RES") target = actual_uuid elif len(parts) >= 2: actual_uuid = parts[1] log_event(f"Virtual UUID detected: {target} -> extracting actual UUID: {actual_uuid}", "UUID_RES") target = actual_uuid else: await websocket.send(json.dumps({ "action": "uuid_path_result", "success": False, "target_uuid": target_uuid, "error": f"Malformed virtual UUID: {target}", })) return try: # Fetch full account export (uses cache if available) export_data = await client.export_nodes(node_id=None, use_cache=True) all_nodes = export_data.get("nodes", []) or [] # Build node lookup by ID nodes_by_id = {n.get("id"): n for n in all_nodes if n.get("id")} # Check if target node's ancestor chain intersects dirty set # If so, refresh cache before proceeding (lazy refresh on demand) dirty_ids = client._nodes_export_dirty_ids if dirty_ids and ("*" in dirty_ids or target in dirty_ids): # Quick check: is target itself dirty or global dirty flag set? needs_refresh = True elif dirty_ids: # Walk ancestor chain from target to see if any ancestor is dirty needs_refresh = False current = target visited_check: set[str] = set() while current and current not in visited_check: visited_check.add(current) if current in dirty_ids: needs_refresh = True break node_dict = nodes_by_id.get(current) if not node_dict: break current = node_dict.get("parent_id") or node_dict.get("parentId") if needs_refresh: log_event(f"Path from {target} intersects dirty IDs; refreshing cache", "UUID_RES") export_data = await client.export_nodes(node_id=None, use_cache=False, force_refresh=True) all_nodes = export_data.get("nodes", []) or [] nodes_by_id = {n.get("id"): n for n in all_nodes if n.get("id")} # DEBUG: Log cache stats log_event(f"Resolving path for target_uuid: {target} (cache: {len(nodes_by_id)} nodes)", "UUID_RES") # If node not found in cache, try refreshing once before giving up if target not in nodes_by_id: log_event(f"Node {target} not found in cache; attempting refresh", "UUID_RES") try: export_data = await client.export_nodes(node_id=None, use_cache=False, force_refresh=True) all_nodes = export_data.get("nodes", []) or [] nodes_by_id = {n.get("id"): n for n in all_nodes if n.get("id")} log_event(f"Cache refreshed: {len(nodes_by_id)} nodes", "UUID_RES") except Exception as refresh_err: log_event(f"Cache refresh failed: {refresh_err}", "UUID_RES") # Check again after refresh if target not in nodes_by_id: await websocket.send(json.dumps({ "action": "uuid_path_result", "success": False, "target_uuid": target_uuid, "error": f"Node {target} not found (tried cache + refresh)", })) return # Walk parent chain from target to root path_nodes = [] visited: set[str] = set() current_id: str | None = target max_hops = 512 hops = 0 while current_id and current_id not in visited and hops < max_hops: visited.add(current_id) node_dict = nodes_by_id.get(current_id) if not node_dict: break # DEBUG: Log each node found parent_id = node_dict.get("parent_id") or node_dict.get("parentId") raw_name = node_dict.get("nm") or node_dict.get("name") or "Untitled" node_name = html.unescape(raw_name) # Decode &lt; &gt; &amp; etc. log_event(f"Found node: {current_id} (name: {node_name}), parent: {parent_id}", "UUID_RES") # Store decoded name in dict for later use node_dict["_decoded_name"] = node_name path_nodes.append(node_dict) current_id = parent_id hops += 1 path_nodes.reverse() # DEBUG: Log final path length log_event(f"Resolved path length: {len(path_nodes)}", "UUID_RES") if not path_nodes: log_event(f"Path resolution returned empty for {target_uuid}", "UUID_RES") await websocket.send(json.dumps({ "action": "uuid_path_result", "success": False, "target_uuid": target_uuid, "error": f"Could not build path for node {target_uuid} (empty path after traversal)", })) return lines: list[str] = [] # Determine which nodes are leaves vs ancestors for bullet selection leaf_index = len(path_nodes) - 1 if format_mode == "f2": # FULL / VERBOSE MODE (F2-style) # Render name, then UUID, then blank line for every node for depth, node_dict in enumerate(path_nodes): # Use decoded name (HTML entities already unescaped) name = node_dict.get("_decoded_name") or "Untitled" node_id = node_dict.get("id") or "" prefix = "#" * (depth + 1) # Bullet: ⦿ for ancestors, • for leaf bullet = "•" if depth == leaf_index else "⦿" lines.append(f"{prefix} {bullet} {name}") lines.append(f"`{node_id}`") lines.append("") # Ending block lines.append("→ Use Leaf UUID:") lines.append(f"`{target}`") else: # COMPACT MODE (F3-style / Default) for depth, node_dict in enumerate(path_nodes): # Use decoded name (HTML entities already unescaped) name = node_dict.get("_decoded_name") or "Untitled" prefix = "#" * (depth + 1) # Bullet: ⦿ for ancestors, • for leaf bullet = "•" if depth == leaf_index else "⦿" lines.append(f"{prefix} {bullet} {name}") lines.append("") lines.append(f"→ `{target}`") markdown = "\n".join(lines) # If this was a mirrored node, append the mirror location context if is_mirrored and mirror_parent_uuid: # Build path for mirror parent to show where the mirror appears mirror_path_nodes = [] current_id = mirror_parent_uuid visited_mirror: set[str] = set() hops_mirror = 0 while current_id and current_id not in visited_mirror and hops_mirror < 512: visited_mirror.add(current_id) node_dict = nodes_by_id.get(current_id) if not node_dict: break # Decode name and skip "Untitled" nodes (Workflowy mirror detritus) raw_name = node_dict.get("nm") or node_dict.get("name") or "Untitled" decoded_name = html.unescape(raw_name) if decoded_name.strip() != "Untitled": # Store decoded name for later use node_dict["_decoded_name"] = decoded_name mirror_path_nodes.append(node_dict) parent_id = node_dict.get("parent_id") or node_dict.get("parentId") current_id = parent_id hops_mirror += 1 mirror_path_nodes.reverse() if mirror_path_nodes: # Add separator and header markdown += "\n\n---\n\n" markdown += "**Mirror appears under:**\n\n" # Generate mirror path markdown (same format as main path) mirror_lines = [] leaf_index_mirror = len(mirror_path_nodes) - 1 if format_mode == "f2": for depth, node_dict in enumerate(mirror_path_nodes): name = node_dict.get("_decoded_name") or "Untitled" node_id = node_dict.get("id") or "" prefix = "#" * (depth + 1) bullet = "•" if depth == leaf_index_mirror else "⦿" mirror_lines.append(f"{prefix} {bullet} {name}") mirror_lines.append(f"`{node_id}`") mirror_lines.append("") else: for depth, node_dict in enumerate(mirror_path_nodes): name = node_dict.get("_decoded_name") or "Untitled" prefix = "#" * (depth + 1) bullet = "•" if depth == leaf_index_mirror else "⦿" mirror_lines.append(f"{prefix} {bullet} {name}") markdown += "\n".join(mirror_lines) # DEBUG: Log the complete markdown being sent (acts as historical log) log_event(f"Markdown output:\n{markdown}", "UUID_RES") # Also append to persistent UUID Explorer log file try: from datetime import datetime log_path = r"E:\__daniel347x\__Obsidian\__Inking into Mind\--TypingMind\Projects - All\Projects - Individual\TODO\temp\uuid_and_glimpse_explorer\uuid_explorer.md" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with open(log_path, "a", encoding="utf-8") as f: f.write(f"## {timestamp}\n\n") f.write(f"**Target UUID:** `{target}`\n\n") f.write(markdown) f.write("\n\n---\n\n") except Exception as log_err: # Never let logging failures affect UUID resolution log_event(f"Failed to write UUID Explorer log: {log_err}", "UUID_RES") await websocket.send(json.dumps({ "action": "uuid_path_result", "success": True, "target_uuid": target_uuid, "markdown": markdown, "path": [ { "id": node_dict.get("id"), "name": node_dict.get("_decoded_name") or "Untitled", "parent_id": node_dict.get("parent_id") or node_dict.get("parentId"), } for node_dict in path_nodes ], })) except Exception as e: # noqa: BLE001 log_event(f"UUID path resolution error for {target_uuid}: {e}", "UUID_RES") await websocket.send(json.dumps({ "action": "uuid_path_result", "success": False, "target_uuid": target_uuid, "error": str(e), })) async def websocket_handler(websocket): """Handle WebSocket connections from Chrome extension. Uses message queue pattern to avoid recv() conflicts. Extension sends requests: {"action": "extract_dom", "node_id": "uuid"} Extension receives responses with DOM tree data. """ global _ws_connection, _ws_message_queue log_event(f"WebSocket client connected from {websocket.remote_address}", "WS_HANDLER") _ws_connection = websocket _ws_message_queue = asyncio.Queue() # Fresh queue for this connection try: # Keep connection alive - wait for messages indefinitely async for message in websocket: try: data = json.loads(message) action = data.get('action') log_event(f"WebSocket message received: {action}", "WS_HANDLER") # Handle ping to keep connection alive if action == 'ping': await websocket.send(json.dumps({"action": "pong"})) log_event("Sent pong response", "WS_HANDLER") continue # Handle UUID path resolution requests (UUID Navigator) if action == 'resolve_uuid_path': target_uuid = data.get('target_uuid') or data.get('uuid') format_mode = data.get('format', 'f3') # Default to compact f3 mode await _resolve_uuid_path_and_respond(target_uuid, websocket, format_mode) continue # Explicit cache refresh request from GLIMPSE client. if action == 'refresh_nodes_export_cache': try: client = get_client() result = await client.refresh_nodes_export_cache() await websocket.send(json.dumps({ "action": "refresh_nodes_export_cache_result", **result, })) log_event(f"Refreshed /nodes-export cache via WebSocket request: {result.get('node_count')} nodes", "WS_HANDLER") except Exception as e: log_event(f"Failed to refresh /nodes-export cache from WebSocket request: {e}", "WS_HANDLER") try: await websocket.send(json.dumps({ "action": "refresh_nodes_export_cache_result", "success": False, "error": str(e), })) except Exception: # Best-effort only; don't crash handler pass continue # Optional: mutation notifications from Workflowy desktop # (e.g., extension sends notify_node_mutated when a node changes). if action == 'notify_node_mutated': # Preferred payload (v3.9.2+): "nodes" = [{"id": "...", "name": "..."}, ...] # Legacy payloads: "node_ids" (list[str]) or "node_id" (str). nodes_payload = data.get('nodes') or [] node_ids = data.get('node_ids') or data.get('node_id') or [] if isinstance(node_ids, str): node_ids = [node_ids] named_entries: list[str] = [] # If structured nodes are present, trust them and derive both # IDs and names from that payload for better logging. if isinstance(nodes_payload, list) and nodes_payload: node_ids = [] for entry in nodes_payload: if not isinstance(entry, dict): continue nid = entry.get('id') or entry.get('uuid') name = entry.get('name') if not nid: continue node_ids.append(nid) if name is not None: named_entries.append(f"{nid} ({name!r})") else: named_entries.append(str(nid)) else: # Fallback: no structured nodes payload; log bare IDs only. named_entries = [str(nid) for nid in node_ids] try: client = get_client() client._mark_nodes_export_dirty(node_ids) log_event( "Marked mutated nodes as dirty in /nodes-export cache: " + ", ".join(named_entries), "WS_HANDLER", ) # NOTE: Cache refresh is LAZY - only triggered when UUID path is # requested AND the path intersects a dirty ID. This prevents rate # limiting from eager refreshes. except Exception as e: log_event( f"Failed to mark /nodes-export cache dirty from WebSocket notification: {e}", "WS_HANDLER", ) continue # Put all other messages in queue for workflowy_glimpse() to consume await _ws_message_queue.put(data) log_event(f"Message queued for processing: {action}", "WS_HANDLER") except json.JSONDecodeError as e: log_event(f"Invalid JSON from WebSocket: {e}", "WS_HANDLER") except Exception as e: log_event(f"WebSocket message error: {e}", "WS_HANDLER") # If loop exits naturally, connection was closed by client log_event("WebSocket client disconnected (connection closed by client)", "WS_HANDLER") except Exception as e: log_event(f"WebSocket connection closed with error: {e}", "WS_HANDLER") finally: if _ws_connection == websocket: _ws_connection = None _ws_message_queue = None log_event("WebSocket client cleaned up", "WS_HANDLER") async def start_websocket_server(): """Start WebSocket server for Chrome extension communication.""" try: import websockets except ImportError: log_event("websockets library not installed. WebSocket cache unavailable.", "WS_INIT") log_event("Install with: pip install websockets", "WS_INIT") return log_event("Starting WebSocket server on ws://localhost:8765", "WS_INIT") try: async with websockets.serve(websocket_handler, "localhost", 8765) as server: log_event("✅ WebSocket server listening on port 8765", "WS_INIT") log_event("WebSocket server will accept connections indefinitely...", "WS_INIT") # Keep server running forever await asyncio.Event().wait() except Exception as e: log_event(f"WebSocket server failed to start: {e}", "WS_INIT") log_event("GLIMPSE will fall back to API fetching", "WS_INIT") @asynccontextmanager async def lifespan(_app: FastMCP): # type: ignore[no-untyped-def] """Manage server lifecycle.""" global _client, _rate_limiter, _ws_server_task # Setup log_event("Starting WorkFlowy MCP server", "LIFESPAN") # Load configuration config = ServerConfig() # type: ignore[call-arg] api_config = config.get_api_config() # Initialize rate limiter (default 10 req/s) _rate_limiter = AdaptiveRateLimiter( initial_rate=10.0, min_rate=1.0, max_rate=100.0, ) # Initialize client _client = WorkFlowyClient(api_config) log_event(f"WorkFlowy client initialized with base URL: {api_config.base_url}", "LIFESPAN") # Start WebSocket server in background task _ws_server_task = asyncio.create_task(start_websocket_server()) log_event("WebSocket server task created", "LIFESPAN") yield # Cleanup log_event("Shutting down WorkFlowy MCP server", "LIFESPAN") # Cancel WebSocket server if _ws_server_task: _ws_server_task.cancel() try: await _ws_server_task except asyncio.CancelledError: pass log_event("WebSocket server stopped", "LIFESPAN") if _client: await _client.close() _client = None _rate_limiter = None # Initialize FastMCP server mcp = FastMCP( "WorkFlowy MCP Server", version="0.1.0", instructions="MCP server for managing WorkFlowy outlines and nodes", lifespan=lifespan, ) # In-memory job management for long-running operations (ETCH, NEXUS, etc.) async def _start_background_job( kind: str, payload: dict[str, Any], coro_factory: Callable[[str], Awaitable[dict]], ) -> dict: # Start a background job and return a lightweight handle. global _job_counter, _jobs async with _job_lock: _job_counter += 1 job_id = f"{kind}-{_job_counter}" _jobs[job_id] = { "job_id": job_id, "kind": kind, "status": "pending", # pending | running | completed | failed | cancelling "payload": payload, "result": None, "error": None, "started_at": datetime.utcnow().isoformat(), "finished_at": None, "_task": None, # internal field, not exposed in status } async def runner() -> None: try: _jobs[job_id]["status"] = "running" result = await coro_factory(job_id) _jobs[job_id]["result"] = result _jobs[job_id]["status"] = "completed" if result.get("success", True) else "failed" except asyncio.CancelledError: # Explicit cancellation via mcp_cancel_job _jobs[job_id]["error"] = "CancelledError: Job was cancelled by user request" _jobs[job_id]["status"] = "failed" except Exception as e: # noqa: BLE001 _jobs[job_id]["error"] = f"{type(e).__name__}: {e}" _jobs[job_id]["status"] = "failed" finally: _jobs[job_id]["finished_at"] = datetime.utcnow().isoformat() task = asyncio.create_task(runner()) _jobs[job_id]["_task"] = task return { "success": True, "job_id": job_id, "status": "started", "kind": kind, } @mcp.tool( name="mcp_job_status", description="Get status/result for long-running MCP jobs (ETCH, NEXUS, etc.).", ) async def mcp_job_status(job_id: str | None = None) -> dict: """Get status for background jobs (in-memory + detached WEAVE workers). Scans: - In-memory asyncio jobs (_jobs registry) - Detached WEAVE workers via: • Active PIDs (.weave.pid under nexus_runs/) • Persistent journals (enchanted_terrain.weave_journal.json / *.weave_journal.json) This allows status/error inspection even after the worker PID has exited. """ from .client.api_client import scan_active_weaves from pathlib import Path import json as json_module # Determine nexus_runs directory # TODO: make this configurable or derive from client nexus_runs_base = r"E:\__daniel347x\__Obsidian\__Inking into Mind\--TypingMind\Projects - All\Projects - Individual\TODO\temp\nexus_runs" def _scan_weave_journals(base_dir_str: str) -> list[dict[str, Any]]: """Scan nexus_runs/ for WEAVE journal files and summarize their status. Returns one entry per journal with: - job_id: weave-enchanted-<nexus_tag> or weave-direct-<stem> - mode: 'enchanted' | 'direct' - status: 'completed' | 'failed' | 'unknown' - detached: True - journal: path to *.weave_journal.json - log_file: associated .weave.log if present - last_run_* and phase fields from journal """ base_dir = Path(base_dir_str) results: list[dict[str, Any]] = [] if not base_dir.exists(): return results for run_dir in base_dir.iterdir(): if not run_dir.is_dir(): continue name = run_dir.name # Extract nexus_tag from directory name: either <tag> or <TIMESTAMP>__<tag> if "__" in name: nexus_tag = name.split("__", 1)[1] else: nexus_tag = name # Look for any *.weave_journal.json in this run directory for journal_path in run_dir.glob("*.weave_journal.json"): try: with open(journal_path, "r", encoding="utf-8") as jf: journal = json_module.load(jf) except Exception: continue json_file = journal.get("json_file") if json_file and str(json_file).endswith("enchanted_terrain.json"): mode = "enchanted" job_id_val = f"weave-enchanted-{nexus_tag}" else: # Direct-mode weave (json_file is some other JSON path) stem = Path(json_file).stem if json_file else name mode = "direct" job_id_val = f"weave-direct-{stem}" last_completed = bool(journal.get("last_run_completed")) last_error = journal.get("last_run_error") if last_completed and not last_error: status_val = "completed" elif last_error: status_val = "failed" else: status_val = "unknown" log_file = run_dir / ".weave.log" results.append( { "job_id": job_id_val, "nexus_tag": nexus_tag, "mode": mode, "status": status_val, "detached": True, "journal": str(journal_path), "log_file": str(log_file) if log_file.exists() else None, "last_run_started_at": journal.get("last_run_started_at"), "last_run_completed": journal.get("last_run_completed"), "last_run_failed_at": journal.get("last_run_failed_at"), "last_run_error": last_error, "phase": journal.get("phase"), } ) return results # Return status for one job (if job_id given) or all jobs if job_id is None: # List ALL jobs (in-memory + detached) in_memory_jobs: list[dict[str, Any]] = [] for job in _jobs.values(): in_memory_jobs.append( { "job_id": job.get("job_id"), "kind": job.get("kind"), "status": job.get("status"), "started_at": job.get("started_at"), "finished_at": job.get("finished_at"), "detached": False, } ) # Active detached jobs (PIDs still running) active_weaves = {j["job_id"]: j for j in scan_active_weaves(nexus_runs_base)} # All WEAVE journals (completed/failed/unknown) journal_jobs = {j["job_id"]: j for j in _scan_weave_journals(nexus_runs_base)} # Merge active + journal info per job_id detached_jobs_dict: dict[str, dict[str, Any]] = {} for jid, jj in journal_jobs.items(): merged = dict(jj) if jid in active_weaves: # Active info (PID, running status) overrides journal's status, # but we keep journal fields like last_run_error and phase. merged.update(active_weaves[jid]) detached_jobs_dict[jid] = merged # Any active jobs without journals (edge case) for jid, aj in active_weaves.items(): if jid not in detached_jobs_dict: detached_jobs_dict[jid] = aj detached_jobs = list(detached_jobs_dict.values()) return { "success": True, "in_memory_jobs": in_memory_jobs, "detached_jobs": detached_jobs, "total": len(in_memory_jobs) + len(detached_jobs), } # Check in-memory first job = _jobs.get(job_id) if job: # Do not expose internal task handle view = {k: v for k, v in job.items() if k not in ("payload", "_task")} return {"success": True, **view} # Check detached WEAVE jobs (active or completed) using journals + PIDs active_weaves = {j["job_id"]: j for j in scan_active_weaves(nexus_runs_base)} journal_jobs = {j["job_id"]: j for j in _scan_weave_journals(nexus_runs_base)} if job_id in journal_jobs or job_id in active_weaves: merged: dict[str, Any] = {} if job_id in journal_jobs: merged.update(journal_jobs[job_id]) if job_id in active_weaves: merged.update(active_weaves[job_id]) merged["success"] = True return merged # Unknown job_id return {"success": False, "error": f"Unknown job_id: {job_id}"} @mcp.tool( name="mcp_cancel_job", description="Request cancellation of a long-running MCP job (ETCH, NEXUS, etc.).", ) async def mcp_cancel_job(job_id: str) -> dict: """Attempt to cancel a background MCP job. This sends an asyncio.CancelledError into the job task. The job will transition to status='failed' with an error message indicating cancellation. """ job = _jobs.get(job_id) if not job: return {"success": False, "error": f"Unknown job_id: {job_id}"} task = job.get("_task") if task is None: return {"success": False, "error": "Job has no associated task (cannot cancel)."} if task.done(): return {"success": False, "error": "Job already completed."} # Mark as cancelling for visibility; runner will finalize status job["status"] = "cancelling" task.cancel() return {"success": True, "job_id": job_id, "status": "cancelling"} # 🔐 SECRET CODE VALIDATION - Brute Force Agent Training Override def validate_secret_code(provided_code: str | None, function_name: str) -> tuple[bool, str | None]: """Validate secret code for WARNING functions. This is the nuclear option - forces agents to ask Dan explicitly. Returns: (is_valid, error_message) """ import os import secrets SECRET_FILE = r"E:\__daniel347x\glimpse_etch.txt" # Generate code if file doesn't exist if not os.path.exists(SECRET_FILE): code = secrets.token_hex(8) # 16-character hex code with open(SECRET_FILE, 'w') as f: f.write(code) # Read current code with open(SECRET_FILE, 'r') as f: valid_code = f.read().strip() # Check if code provided and matches if provided_code and provided_code.strip() == valid_code: return (True, None) # Valid - allow function call # Invalid or missing - return error error_msg = f"""⚠️ EXPLICIT PERMISSION REQUIRED You called: {function_name} This is a WARNING function. You should be using GLIMPSE or ETCH instead. 🛑 YOU BELIEVE YOU'RE AN EXCEPTION CASE. 🚨 YOU ARE NOT THE EXCEPTION. ✅ USE INSTEAD: - For reading node trees: workflowy_glimpse(node_id="...") - For creating nodes: workflowy_etch(parent_id="...", nodes=[...]) ⚙️ IF YOU TRULY NEED THIS FUNCTION: 1. Ask Dan: "May I have permission to call {function_name}?" 2. Dan will check a secret file for the authorization code 3. If Dan grants permission, he'll provide the code 4. Call the SAME function again with the EXACT SAME parameters you used the first time, but add: secret_code="[code-from-Dan]" (do not change anything else). Example (arguments object): {{ "name": "...", "parent_id": "...", "note": "...", "secret_code": "[code-from-Dan]" }} 🚫 IF DAN SAYS NO: - You are NOT the exception - Your training misled you - Use GLIMPSE or ETCH as instructed - Do NOT call this function again 📚 Break free from your training - build the GLIMPSE/ETCH habit! """ return (False, error_msg) # Tool: Create Single Node (Base - Deprecated) @mcp.tool(name="workflowy_create_single_node", description="DEPRECATED: Use workflowy_etch (ETCH) instead") async def create_single_node_base( name: str, parent_id: str | None = None, note: str | None = None, ) -> dict: """Deprecated - use ETCH instead.""" raise ValueError("""⚠️ FUNCTION RENAMED The function 'workflowy_create_single_node' has been renamed to 'workflowy_create_single_node__WARNING__prefer_ETCH'. BUT MORE IMPORTANTLY: Use workflowy_etch (ETCH command) instead! ✅ RECOMMENDED: workflowy_etch( parent_id="...", nodes=[{"name": "Your node", "note": "...", "children": []}] ) ETCH is better: - Works for 1 node or 100 nodes - Validation and auto-escaping built-in - Same performance, more capability 📚 Build the ETCH habit! """) # Tool: Create Single Node (With Warning) @mcp.tool(name="workflowy_create_single_node__WARNING__prefer_ETCH", description="⚠️ WARNING: Prefer workflowy_etch (ETCH) instead. This creates ONE node only.") async def create_node( name: str, parent_id: str | None = None, note: str | None = None, layout_mode: Literal["bullets", "todo", "h1", "h2", "h3"] | None = None, position: Literal["top", "bottom"] = "bottom", _completed: bool = False, secret_code: str | None = None, ) -> dict: """Create a SINGLE node in WorkFlowy. ⚠️ WARNING: Prefer workflowy_etch (ETCH) for creating 2+ nodes. This tool is ONLY for: - Adding one VYRTHEX to existing log (real-time work) - One quick update to a known node - Live work in progress Args: name: The text content of the node parent_id: ID of the parent node (optional) note: Additional note/description for the node layout_mode: Layout mode for the node (bullets, todo, h1, h2, h3) (optional) position: Where to place the new node - "bottom" (default) or "top" _completed: Whether the node should be marked as completed (not used) secret_code: Authorization code from Dan (required for WARNING functions) Returns: Dictionary with node data and warning message """ # 🔐 SECRET CODE VALIDATION is_valid, error = validate_secret_code(secret_code, "workflowy_create_single_node__WARNING__prefer_ETCH") if not is_valid: raise ValueError(error) client = get_client() request = NodeCreateRequest( # type: ignore[call-arg] name=name, parent_id=parent_id, note=note, layoutMode=layout_mode, position=position, ) if _rate_limiter: await _rate_limiter.acquire() try: node = await client.create_node(request) if _rate_limiter: _rate_limiter.on_success() # Return node data with warning message return { **node.model_dump(), "_warning": "⚠️ WARNING: You just created a SINGLE node. For 2+ nodes, use workflowy_etch instead (same performance, more capability)." } except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise # Tool: Update Node @mcp.tool(name="workflowy_update_node", description="Update an existing WorkFlowy node") async def update_node( node_id: str, name: str | None = None, note: str | None = None, layout_mode: Literal["bullets", "todo", "h1", "h2", "h3"] | None = None, _completed: bool | None = None, ) -> WorkFlowyNode: """Update an existing WorkFlowy node. Args: node_id: The ID of the node to update name: New text content for the node (optional) note: New note/description (optional) layout_mode: New layout mode for the node (bullets, todo, h1, h2, h3) (optional) _completed: New completion status (not used - use complete_node/uncomplete_node) Returns: The updated WorkFlowy node """ client = get_client() request = NodeUpdateRequest( # type: ignore[call-arg] name=name, note=note, layoutMode=layout_mode, ) if _rate_limiter: await _rate_limiter.acquire() try: node = await client.update_node(node_id, request) if _rate_limiter: _rate_limiter.on_success() return node except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise # Tool: Get Node (Base - Deprecated) @mcp.tool(name="workflowy_get_node", description="DEPRECATED: Use workflowy_glimpse (GLIMPSE) instead") async def get_node_base(node_id: str) -> dict: """Deprecated - use GLIMPSE instead.""" raise ValueError("""⚠️ FUNCTION RENAMED The function 'workflowy_get_node' has been renamed to 'workflowy_get_node__WARNING__prefer_glimpse'. BUT MORE IMPORTANTLY: Use workflowy_glimpse (GLIMPSE command) instead! ✅ RECOMMENDED: workflowy_glimpse(node_id="...") Returns: {"root": {...}, "children": [...]} with complete tree structure. GLIMPSE is better: - Gets root node metadata (name, note) - Gets full children tree (not just direct children) - One call gets everything 📚 Build the GLIMPSE habit! """) # Tool: Get Node (With Warning) @mcp.tool(name="workflowy_get_node__WARNING__prefer_glimpse", description="⚠️ WARNING: Prefer workflowy_glimpse (GLIMPSE) for reading trees. Retrieve a specific WorkFlowy node by ID") async def get_node( node_id: str, secret_code: str | None = None, ) -> WorkFlowyNode: """Retrieve a specific WorkFlowy node. Args: node_id: The ID of the node to retrieve secret_code: Authorization code from Dan (required for WARNING functions) Returns: The requested WorkFlowy node """ # 🔐 SECRET CODE VALIDATION is_valid, error = validate_secret_code(secret_code, "workflowy_get_node__WARNING__prefer_glimpse") if not is_valid: raise ValueError(error) client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: node = await client.get_node(node_id) if _rate_limiter: _rate_limiter.on_success() return node except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise # Tool: List Nodes (Base - Deprecated) @mcp.tool(name="workflowy_list_nodes", description="DEPRECATED: Use workflowy_glimpse (GLIMPSE) instead") async def list_nodes_base(parent_id: str | None = None) -> dict: """Deprecated - use GLIMPSE instead.""" raise ValueError("""⚠️ FUNCTION RENAMED The function 'workflowy_list_nodes' has been renamed to 'workflowy_list_nodes__WARNING__prefer_glimpse'. BUT MORE IMPORTANTLY: Use workflowy_glimpse (GLIMPSE command) instead! ✅ RECOMMENDED: workflowy_glimpse(node_id="...") Returns: {"root": {...}, "children": [...]} with complete tree structure. GLIMPSE is better: - Gets full nested tree (not just direct children) - Gets root node metadata - More efficient 📚 Build the GLIMPSE habit! """) # Tool: List Nodes (With Warning) @mcp.tool(name="workflowy_list_nodes__WARNING__prefer_glimpse", description="⚠️ WARNING: Prefer workflowy_glimpse (GLIMPSE) for reading trees. List WorkFlowy nodes (omit parent_id for root)") async def list_nodes( parent_id: str | None = None, secret_code: str | None = None, ) -> dict: """List WorkFlowy nodes. Args: parent_id: ID of parent node to list children for (omit or pass None to list root nodes - parameter won't be sent to API) secret_code: Authorization code from Dan (required for WARNING functions) Returns: Dictionary with 'nodes' list and 'total' count """ # 🔐 SECRET CODE VALIDATION is_valid, error = validate_secret_code(secret_code, "workflowy_list_nodes__WARNING__prefer_glimpse") if not is_valid: raise ValueError(error) client = get_client() request = NodeListRequest( # type: ignore[call-arg] parentId=parent_id, ) if _rate_limiter: await _rate_limiter.acquire() try: nodes, total = await client.list_nodes(request) if _rate_limiter: _rate_limiter.on_success() return { "nodes": [node.model_dump() for node in nodes], "total": total, "_warning": "⚠️ For reading multiple nodes or full trees, use workflowy_glimpse (GLIMPSE) instead for efficiency" } except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise # Tool: Delete Node @mcp.tool(name="workflowy_delete_node", description="Delete a WorkFlowy node and all its children") async def delete_node(node_id: str) -> dict: """Delete a WorkFlowy node and all its children. Args: node_id: The ID of the node to delete Returns: Dictionary with success status """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: success = await client.delete_node(node_id) if _rate_limiter: _rate_limiter.on_success() return {"success": success, "deleted_id": node_id} except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise # Tool: Complete Node @mcp.tool(name="workflowy_complete_node", description="Mark a WorkFlowy node as completed") async def complete_node(node_id: str) -> WorkFlowyNode: """Mark a WorkFlowy node as completed. Args: node_id: The ID of the node to complete Returns: The updated WorkFlowy node """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: node = await client.complete_node(node_id) if _rate_limiter: _rate_limiter.on_success() return node except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise # Tool: Uncomplete Node @mcp.tool(name="workflowy_uncomplete_node", description="Mark a WorkFlowy node as not completed") async def uncomplete_node(node_id: str) -> WorkFlowyNode: """Mark a WorkFlowy node as not completed. Args: node_id: The ID of the node to uncomplete Returns: The updated WorkFlowy node """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: node = await client.uncomplete_node(node_id) if _rate_limiter: _rate_limiter.on_success() return node except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise # Tool: Move Node @mcp.tool(name="workflowy_move_node", description="Move a WorkFlowy node to a new parent") async def move_node( node_id: str, parent_id: str | None = None, position: str = "top", ) -> bool: """Move a node to a new parent. Args: node_id: The ID of the node to move parent_id: The new parent node ID (UUID, target key like 'inbox', or None for root) position: Where to place the node ('top' or 'bottom', default 'top') Returns: True if move was successful """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: success = await client.move_node(node_id, parent_id, position) if _rate_limiter: _rate_limiter.on_success() return success except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise @mcp.tool( name="nexus_glimpse", description=( "GLIMPSE → TERRAIN + PHANTOM GEM (zero API calls). " "Captures what you've expanded in Workflowy via WebSocket GLIMPSE and creates both " "coarse_terrain.json and phantom_gem.json from that single local extraction. " "No Workflowy API calls, instant, you control granularity by expanding nodes." ), ) async def nexus_glimpse( nexus_tag: str, workflowy_root_id: str, reset_if_exists: bool = False, mode: str = "full", ) -> dict[str, Any]: """GLIMPSE-based NEXUS initialization.""" client = get_client() ws_conn, ws_queue = get_ws_connection() # Get WebSocket connection return await client.nexus_glimpse( nexus_tag=nexus_tag, workflowy_root_id=workflowy_root_id, reset_if_exists=reset_if_exists, mode=mode, _ws_connection=ws_conn, _ws_queue=ws_queue, ) # Tool: Export Nodes @mcp.tool(name="workflowy_export_node", description="Export a WorkFlowy node with all its children") async def export_node( node_id: str | None = None, ) -> dict: """Export all nodes or filter to specific node's subtree. Args: node_id: ID of the node to export (omit to export all nodes). If provided, exports only that node and all its descendants. Returns: Dictionary containing 'nodes' list with exported node data. Rate limit: 1 request per minute for full export. """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: data = await client.export_nodes(node_id) if _rate_limiter: _rate_limiter.on_success() return data except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise @mcp.tool( name="workflowy_refresh_nodes_export_cache", description=( "Force a fresh /nodes-export snapshot and update the local cache used " "by NEXUS and the UUID Navigator." ), ) async def workflowy_refresh_nodes_export_cache() -> dict: """Explicitly refresh the cached /nodes-export snapshot. This is primarily useful after large out-of-band edits in Workflowy desktop, or when you want to be certain the cache reflects the latest ETHER state before running NEXUS or UUID Navigator operations. """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: result = await client.refresh_nodes_export_cache() if _rate_limiter: _rate_limiter.on_success() return result except Exception as e: # noqa: BLE001 return {"success": False, "error": str(e)} # PHANTOM GEMSTONE NEXUS – High-level MCP tools @mcp.tool( name="nexus_scry", description=( "INITIATE a CORINTHIAN NEXUS on the ETHER: perform a COARSE SCRY of Workflowy " "under a root to reveal a limited TERRAIN (a new geography named by your " "NEXUS TAG). Choose max_depth and child_limit carefully—keep them minimal. " "Optionally set max_nodes to guard against accidental 1M-node SCRYs. " "Later, you will IGNITE the ETHER more deeply on selected SHARDS." ), ) async def nexus_scry( nexus_tag: str, workflowy_root_id: str, max_depth: int, child_limit: int, reset_if_exists: bool = False, max_nodes: int | None = None, ) -> dict: """Tag-scoped SCRY the ETHER under a root to create a coarse TERRAIN bound to a NEXUS TAG. This reveals a limited TERRAIN—a new geography named by your NEXUS TAG. Keep the SCRY shallow: choose max_depth and child_limit carefully. Use max_nodes (when non-None) as a hard upper bound on SCRY size; if the tree would exceed this many nodes, the SCRY aborts with a clear error instead of exporting a massive JSON bundle. Unlike ``nexus_glimpse(mode="full")``, which may legitimately produce T0 = S0 = T1 in one step (because Dan’s UI expansion already encodes the GEM/SHIMMERING decision), this tag-scoped SCRY is explicitly **T0-only**: it writes only ``coarse_terrain.json``. S0 and T1 are introduced later via ``nexus_ignite_shards`` and ``nexus_anchor_gems``. """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: result = await client.nexus_scry( nexus_tag=nexus_tag, workflowy_root_id=workflowy_root_id, max_depth=max_depth, child_limit=child_limit, reset_if_exists=reset_if_exists, max_nodes=max_nodes, ) if _rate_limiter: _rate_limiter.on_success() return result except Exception as e: # noqa: BLE001 if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise @mcp.tool( name="nexus_ignite_shards", description=( "IGNITE selected SHARDS so the ETHER glows more deeply around them, revealing " "deeper layers (but not necessarily to FULL depth). The deeper revelation is " "captured as a PHANTOM GEM (S0), an unrefracted witness of those subtrees." ), ) async def nexus_ignite_shards( nexus_tag: str, root_ids: list[str], max_depth: int | None = None, child_limit: int | None = None, per_root_limits: dict[str, dict[str, int]] | None = None, ) -> dict: """IGNITE SHARDS in the TERRAIN so the ETHER glows more deeply around them. From an existing TERRAIN, mark specific nodes as SHARDS and IGNITE them. The ETHER glows around these SHARDS, revealing deeper layers (but not necessarily to full depth). The revealed structure is condensed into a PHANTOM GEM (S0). """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: result = await client.nexus_ignite_shards( nexus_tag=nexus_tag, root_ids=root_ids, max_depth=max_depth, child_limit=child_limit, per_root_limits=per_root_limits, ) if _rate_limiter: _rate_limiter.on_success() return result except Exception as e: # noqa: BLE001 if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise @mcp.tool( name="nexus_anchor_gems", description=( "Let the PHANTOM GEM ILLUMINATE the TRUE GEMS that were ALWAYS PRESENT in " "the TERRAIN but not yet revealed: where SHARDS were marked, the TERRAIN " "now shimmers with deeper revealed structure (FIRST IMBUE—NOTHING CHANGES " "in the ETHER). The PHANTOM GEM remains a REFLECTION: an untouched witness." ), ) async def nexus_anchor_gems( nexus_tag: str, ) -> dict: """ANCHOR the PHANTOM GEM into the TERRAIN to create SHIMMERING TERRAIN. The PHANTOM GEM now illuminates the TRUE GEMS that were always present in the TERRAIN but not yet revealed. Where SHARDS were marked, the TERRAIN now shimmers with deeper revealed structure (FIRST IMBUE—Workflowy remains untouched). The PHANTOM GEM stays as an unrefracted witness. """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: result = await client.nexus_anchor_gems(nexus_tag=nexus_tag) if _rate_limiter: _rate_limiter.on_success() return result except Exception as e: # noqa: BLE001 if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise @mcp.tool( name="nexus_anchor_jewels", description=( "Anchor the PHANTOM JEWELS (S1) within the SHIMMERING TERRAIN (T1), " "transmuting the REVEALED GEMS into NEW JEWELS that are an exact " "impregnation of the PHANTOM JEWELS. The TERRAIN becomes ENCHANTED (SECOND " "IMBUE), with the PHANTOM GEM (S0) as witness to the ORIGINAL state. The " "ENCHANTED TERRAIN is now EMBODIED and REAL as JSON—Workflowy remains " "untouched until WEAVE." ), ) async def nexus_anchor_jewels( nexus_tag: str, ) -> dict: """ANCHOR PHANTOM JEWELS into SHIMMERING TERRAIN to create ENCHANTED TERRAIN. This performs the SECOND IMBUE: anchoring PHANTOM JEWELS (S1) within the SHIMMERING TERRAIN (T1), transmuting the REVEALED GEMS into NEW JEWELS that are an exact impregnation of the PHANTOM JEWELS. The TERRAIN becomes ENCHANTED, with the PHANTOM GEM (S0) as witness to the original state. The ENCHANTED TERRAIN is real as JSON, but the ETHER (Workflowy) is still untouched. """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: result = await client.nexus_anchor_jewels(nexus_tag=nexus_tag) if _rate_limiter: _rate_limiter.on_success() return result except Exception as e: # noqa: BLE001 if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise @mcp.tool( name="nexus_transform_jewel", description=( "Apply JEWELSTORM semantic operations to a NEXUS working_gem JSON file " "(PHANTOM GEM working copy). This is the semantic analogue of edit_file " "for PHANTOM GEM JSON: MOVE_NODE, DELETE_NODE, RENAME_NODE, SET_NOTE, " "SET_ATTRS, CREATE_NODE, all referencing nodes by jewel_id, plus text-level " "SEARCH_REPLACE / SEARCH_AND_TAG over name/note fields (substring/whole-word, " "optional regex, tagging in name and/or note based on matches)." ), ) def nexus_transform_jewel( jewel_file: str, operations: list[dict[str, Any]], dry_run: bool = False, stop_on_error: bool = True, ) -> dict: """JEWELSTORM transform on a NEXUS working_gem JSON file. This tool wraps nexus_json_tools.transform_jewel via the WorkFlowyClient, providing an MCP-friendly interface for JEWELSTORM operations. Args: jewel_file: Absolute path to working_gem JSON (typically a QUILLSTRIKE working stone derived from phantom_gem.json). operations: List of operation dicts. Each must include an "op" key and operation-specific fields (e.g., jewel_id, parent_jewel_id, position, etc.). dry_run: If True, simulate only (no file write). stop_on_error: If True, abort on first error (no write). Returns: Result dict from transform_jewel with success flag, counts, and errors. """ client = get_client() return client.nexus_transform_jewel( jewel_file=jewel_file, operations=operations, dry_run=dry_run, stop_on_error=stop_on_error, ) @mcp.tool( name="nexus_weave_enchanted_async", description=( "Start an async NEXUS ENCHANTED WEAVE job (WEAVE T2 back into Workflowy ETHER) " "and return a job_id for status polling and cancellation." ), ) async def nexus_weave_enchanted_async( nexus_tag: str, dry_run: bool = False, ) -> dict: """Start ENCHANTED TERRAIN weave as a detached background process. Launches weave_worker.py as a separate process that survives MCP restart. Progress tracked via .weave.pid and .weave_journal.json files. Use mcp_job_status() to monitor progress (scans directory for active PIDs). NOTE: nexus_tag must be a NEXUS TAG (e.g. "my-arc-tag"), **not** a JSON file path. If you have a JSON file that describes nodes you want to create, use workflowy_etch or workflowy_etch_async instead. """ client = get_client() # Detect misuse: nexus_tag looks like a JSON file path lowered = (nexus_tag or "").lower() if lowered.endswith(".json") or "/" in nexus_tag or "\\" in nexus_tag: return { "success": False, "error": ( "nexus_weave_enchanted_async expects a NEXUS TAG (e.g. 'my-arc-tag'), not a JSON file path.\n\n" "If you have a JSON file representing nodes to create, use ETCH instead:\n\n" " workflowy_etch(\n" " parent_id='...',\n" " nodes_file='E:...\\your_nodes.json'\n" " )\n\n" "or the async variant:\n\n" " workflowy_etch_async(\n" " parent_id='...',\n" " nodes_file='E:...\\your_nodes.json'\n" " ).\n" ), } # Use detached launcher (survives MCP restart) return client.nexus_weave_enchanted_detached(nexus_tag=nexus_tag, dry_run=dry_run) @mcp.tool( name="nexus_start_exploration", description=( "Start a NEXUS exploration session over a Workflowy subtree. The engine " "always controls traversal and returns frontiers (batches of leaves) for " "you to label. In dfs_guided_explicit you explicitly decide every leaf; in " "dfs_guided_bulk you may also perform bulk branch/descendant actions. Set " "editable=True to enable update-and-engulf actions that mutate names/notes " "in the cached tree." ), ) async def nexus_start_exploration( nexus_tag: str, root_id: str, source_mode: str = "scry", max_nodes: int = 200000, session_hint: str | None = None, frontier_size: int = 25, max_depth_per_frontier: int = 1, editable: bool = False, search_filter: dict[str, Any] | None = None, ) -> dict: """Start an exploration session and return the first frontier. Mental model: - Exploration is a labeling pass over a tree, not manual navigation. - The engine always chooses the path and surfaces a frontier of leaves for you to decide on. - You control which nodes are engulfed/preserved (and which branches are flagged), not the traversal order. Modes (stored as exploration_mode in the session): - dfs_guided_explicit: Engine-guided DFS with explicit leaf coverage. Every leaf is expected to be labeled (engulf/preserve/update), and bulk descendant actions are typically disabled. - dfs_guided_bulk: Engine-guided DFS with bulk support. You may still label leaves individually, but can also use bulk descendant actions such as engulf_all_showing_undecided_descendants_into_gem_for_editing, preserve_all_showing_undecided_descendants_in_ether and preserve_all_remaining_nodes_in_ether_at_finalization to operate on many leaves at once. Args: nexus_tag: Tag name for this NEXUS run (directory under temp/nexus_runs). root_id: Workflowy UUID to treat as exploration root. source_mode: Currently decorative; source is always an API SCRY (workflowy_scry). max_nodes: Safety cap for SCRY (size_limit). session_hint: Controls exploration_mode and strict_completeness: - Contains "bulk" or "guided_bulk" → dfs_guided_bulk mode - Contains "strict_completeness" or "strict" → Disables PA action (bulk mode only) - Default (neither) → dfs_guided_explicit mode - Example: "bulk strict_completeness" enables both ⚠️ STRICT COMPLETENESS: Prevents agents from using preserve_all_remaining_nodes_in_ether_at_finalization (PA) to opt out early. COMMON AGENT FAILURE MODE: Agents routinely call PA after just a few frontiers instead of thoroughly exploring. Use strict_completeness for: • Terminology cleanup (must review every node) • Critical refactoring (can't afford to miss anything) • Documentation updates requiring thoroughness frontier_size: Leaf-budget target for each frontier batch. max_depth_per_frontier: Reserved for future multi-level frontiers. editable: If True, enables update_*_and_engulf actions that write back to the cached tree before GEM finalization. search_filter: SECONDARY search (dfs_guided_explicit only) - Persistent filter applied to every frontier. Dict with keys: search_text (required), case_sensitive (default False), whole_word (default False), regex (default False). Filters normal DFS frontier to show only matching nodes in DFS order. Not available in bulk mode (use PRIMARY search action in nexus_explore_step instead). """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: result = await client.nexus_start_exploration( nexus_tag=nexus_tag, root_id=root_id, source_mode=source_mode, max_nodes=max_nodes, session_hint=session_hint, frontier_size=frontier_size, max_depth_per_frontier=max_depth_per_frontier, editable=editable, search_filter=search_filter, ) if _rate_limiter: _rate_limiter.on_success() return result except Exception as e: # noqa: BLE001 if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise @mcp.tool( name="nexus_explore_step", description=( "Apply exploration decisions to an existing NEXUS session and return the " "next frontier. The engine always drives traversal and returns a frontier " "of leaves; in dfs_guided_explicit you explicitly label every leaf, and in " "dfs_guided_bulk you may also use bulk descendant actions." ), ) async def nexus_explore_step( session_id: str, decisions: list[dict[str, Any]] | None = None, global_frontier_limit: int | None = None, include_history_summary: bool = True, ) -> dict: """Apply a single exploration step: label nodes and advance the frontier. GUIDED MODES (dfs_guided_explicit / dfs_guided_bulk): - The engine controls traversal and frontier composition. - You do not manually navigate; you label what appears in the frontier. In dfs_guided_explicit: - You are expected to explicitly decide every leaf using leaf actions and branch-node actions. - Bulk descendant actions are generally not used in this mode. In dfs_guided_bulk: - Frontiers are still engine-driven DFS batches of leaves. - All explicit actions remain available. - Bulk descendant actions are enabled: - engulf_all_showing_undecided_descendants_into_gem_for_editing - preserve_all_showing_undecided_descendants_in_ether - preserve_all_remaining_nodes_in_ether_at_finalization Decisions: * handle: frontier handle id. * action: one of the leaf/branch/bulk action names: Leaf actions: - engulf_leaf_into_gem_for_editing - preserve_leaf_in_ether_untouched - update_leaf_node_and_engulf_in_gemstorm Branch-node actions: - flag_branch_node_for_editing_by_engulfment_into_gem__preserve_all_descendant_protection_states (alias: reserve_branch_for_children) - preserve_branch_node_in_ether_untouched__when_no_engulfed_children - update_branch_node_and_engulf_in_gemstorm__descendants_unaffected - update_branch_note_and_engulf_in_gemstorm__descendants_unaffected - auto_decide_branch_no_change_required Bulk descendant actions (dfs_guided_bulk only): - engulf_all_showing_undecided_descendants_into_gem_for_editing - preserve_all_showing_undecided_descendants_in_ether - preserve_all_remaining_nodes_in_ether_at_finalization (PA) ⚠️ AGENT FAILURE MODE WARNING - PA (preserve_all_remaining_nodes_in_ether_at_finalization): Agents ROUTINELY call PA after just 2-3 frontiers instead of thoroughly exploring. This is a SHORTCUT that causes incomplete exploration. DON'T BE ONE OF THOSE AGENTS. Use strict_completeness mode (session_hint="bulk strict_completeness") to disable PA and force thorough exploration for: • Terminology cleanup • Critical refactoring • Documentation updates requiring completeness SEARCH (dfs_guided_bulk only - PRIMARY implementation): - search_descendants_for_text (SX) - Returns frontier of matching nodes + ancestors. Params: handle (default 'R'), search_text (required), case_sensitive (default False), whole_word (default False), regex (default False), scope ('undecided' or 'all'). Multiple searches use AND logic. Next step without search returns to normal DFS frontier. DECISION OUTCOMES: • ENGULF → Node brought into GEM → Editable/deletable → Changes apply to ETHER • PRESERVE → Node stays in ETHER → Protected (will NOT be deleted or modified) Args: session_id: Exploration session id. decisions: Optional list of decision dicts. global_frontier_limit: Leaf-budget limit for this step when batching frontiers. include_history_summary: If True, include a compact status summary. Returns: Dict with new frontier, decisions_applied, scratchpad, history_summary, etc. """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: result = await client.nexus_explore_step_v2( session_id=session_id, decisions=decisions, global_frontier_limit=global_frontier_limit, include_history_summary=include_history_summary, ) if _rate_limiter: _rate_limiter.on_success() return result except Exception as e: # noqa: BLE001 if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise @mcp.tool( name="nexus_list_exploration_sessions", description="List all exploration sessions (optionally filter by nexus_tag).", ) def nexus_list_exploration_sessions( nexus_tag: str | None = None, ) -> dict: """List all exploration sessions. Args: nexus_tag: Optional filter - only show sessions for this tag Returns: List of session metadata (session_id, nexus_tag, timestamps, steps, mode, etc.) """ client = get_client() return client.nexus_list_exploration_sessions(nexus_tag=nexus_tag) @mcp.tool( name="nexus_resume_exploration", description=( "Resume an exploration session after MCP restart or in new conversation. " "Provide either session_id (exact) or nexus_tag (finds latest session for tag)." ), ) async def nexus_resume_exploration( session_id: str | None = None, nexus_tag: str | None = None, frontier_size: int = 25, include_history_summary: bool = True, ) -> dict: """Resume an exploration session from persisted state. Args: session_id: Exact session ID to resume (takes precedence) nexus_tag: Alternative - finds latest session for this tag frontier_size: How many frontier entries to return include_history_summary: Include status summary Returns: Current session state with frontier (same format as nexus_explore_step) """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: result = await client.nexus_resume_exploration( session_id=session_id, nexus_tag=nexus_tag, frontier_size=frontier_size, include_history_summary=include_history_summary, ) if _rate_limiter: _rate_limiter.on_success() return result except Exception as e: # noqa: BLE001 if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise @mcp.tool( name="nexus_finalize_exploration", description=( "Finalize an exploration session into coarse_terrain.json (always) plus " "optional phantom_gem.json + shimmering_terrain.json for use with NEXUS " "JEWELSTORM and WEAVE." ), ) async def nexus_finalize_exploration( session_id: str, mode: Literal["terrain_only", "full"] = "full", ) -> dict: """Finalize an exploration session into coarse TERRAIN and optional GEM/SHIMMERING.""" client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: result = await client.nexus_finalize_exploration( session_id=session_id, mode=mode, ) if _rate_limiter: _rate_limiter.on_success() return result except Exception as e: # noqa: BLE001 if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise # Tool: Generate Markdown from JSON @mcp.tool( name="generate_markdown_from_json", description="Convert exported/edited JSON to Markdown format (without metadata)." ) def generate_markdown( json_file: str, ) -> dict: """Convert JSON file to Markdown format. Use after editing JSON with Quill scroll to create Markdown for PARALLAX review. Args: json_file: Absolute path to JSON file. Returns: Dictionary with success status and markdown file path. """ client = get_client() return client.generate_markdown_from_json(json_file) # Tool: GLIMPSE (Read Node Trees) @mcp.tool( name="workflowy_glimpse", description="Load entire node tree into context (no file intermediary). GLIMPSE command for direct context loading. Optional output_file writes TERRAIN export (WebSocket+API merge with full NEXUS semantics)." ) async def glimpse( node_id: str, output_file: str | None = None, ) -> dict: """Load entire node tree into agent context. GLIMPSE command - efficient context loading for agent analysis. Tries WebSocket DOM extraction first (if Chrome extension connected). Falls back to API fetch if WebSocket unavailable. Args: node_id: Root node UUID to read from output_file: Optional absolute path; if provided, write a TERRAIN-style export package using shared nexus_helper functions (WebSocket GLIMPSE + API SCRY merged for has_hidden_children / children_status annotations, original_ids_seen ledger, full NEXUS TERRAIN format) Returns: When output_file is None: Minimal in-memory preview with only preview_tree + basic stats (no full JSON node tree). When output_file is provided: Compact summary with terrain_file, markdown_file, stats. """ client = get_client() ws_conn, ws_queue = get_ws_connection() # Check if extension is connected if _rate_limiter: await _rate_limiter.acquire() try: # Pass WebSocket connection, queue, AND output_file to client method result = await client.workflowy_glimpse( node_id, output_file=output_file, _ws_connection=ws_conn, _ws_queue=ws_queue, ) if _rate_limiter: _rate_limiter.on_success() # For in-memory GLIMPSE results, return only the MINITREE to the agent. # SCRY-to-disk paths (output_file is not None) continue to carry the # full JSON tree on disk only (coarse_terrain/phantom_gem/etc.). if output_file is None and isinstance(result, dict) and "preview_tree" in result: return { "success": result.get("success", True), "_source": result.get("_source"), "node_count": result.get("node_count"), "depth": result.get("depth"), "preview_tree": result.get("preview_tree"), } return result except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise # Tool: GLIMPSE FULL (Force API Fetch) @mcp.tool( name="workflowy_scry", description="Load entire node tree via API (bypass WebSocket). Use when Key Files doesn't have parent UUID for ETCH, or when Dan wants complete tree regardless of expansion state." ) async def glimpse_full( node_id: str, depth: int | None = None, size_limit: int = 1000, output_file: str | None = None, ) -> dict: """Load entire node tree via full API fetch (bypass WebSocket). Thin wrapper around workflowy_glimpse that forces API fetch. Use when: - Agent needs to hunt for parent UUIDs not in Key Files - Dan wants complete node tree regardless of expansion - WebSocket selective extraction not needed Args: node_id: Root node UUID to read from depth: Maximum depth to traverse (1=direct children only, 2=two levels, None=full tree) size_limit: Maximum number of nodes to return (default 1000, raises error if exceeded) Returns: Same format as workflowy_glimpse with _source="api" """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: # Call glimpse_full on client (bypasses WebSocket by design) result = await client.workflowy_scry( node_id, depth=depth, size_limit=size_limit, output_file=output_file, ) if _rate_limiter: _rate_limiter.on_success() return result except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise # Tool: ETCH (Write Node Trees) @mcp.tool( name="workflowy_etch", description="Create multiple nodes from JSON structure (no file intermediary). ETCH command for direct node creation." ) async def etch( parent_id: str, nodes: list[dict] | str | None = None, replace_all: bool = False, nodes_file: str | None = None, ) -> dict: """Create multiple nodes from JSON structure. ETCH command - simple additive node creation (no UUIDs, no updates/moves). Fallback: If this fails, use INSCRIBE scroll (write_file → bulk_import). DEFAULT: Additive (skip existing by name, add new children only) REPLACE: Wipe all children, create fresh For complex operations (moves/updates with UUIDs): Use NEXUS scroll instead. Args: parent_id: Parent UUID where nodes should be created nodes: List of node objects (NO UUIDs - just name/note/children) replace_all: If True, delete ALL existing children first. Default False. Returns: Dictionary with success status, nodes created, skipped (if append_only), API call stats, and errors """ client = get_client() # Resolve nodes source: direct value or file-based payload_nodes: list[dict] | str | None = nodes used_nodes_file = False if nodes_file: try: with open(nodes_file, "r", encoding="utf-8") as f: payload_nodes = f.read() used_nodes_file = True except Exception as e: return { "success": False, "nodes_created": 0, "root_node_ids": [], "api_calls": 0, "retries": 0, "rate_limit_hits": 0, "errors": [ f"Failed to read nodes_file '{nodes_file}': {str(e)}", "Hint: Ensure the path is correct and accessible from the MCP server", ], } elif isinstance(payload_nodes, str): # Agent convenience: if 'nodes' looks like a real JSON file path, try it as such. candidate = payload_nodes.strip() if os.path.exists(candidate) and candidate.lower().endswith(".json"): try: with open(candidate, "r", encoding="utf-8") as f: payload_nodes = f.read() used_nodes_file = True nodes_file = candidate log_event( f"workflowy_etch: treating 'nodes' string as nodes_file path -> {candidate}", "ETCH", ) except Exception as e: # Fall back to treating it as JSON; let the ETCH parser report a clear error log_event( f"workflowy_etch: failed to read candidate nodes file '{candidate}': {e}", "ETCH", ) if payload_nodes is None: return { "success": False, "nodes_created": 0, "root_node_ids": [], "api_calls": 0, "retries": 0, "rate_limit_hits": 0, "errors": [ "Missing ETCH payload: provide either 'nodes' (list or stringified JSON) or 'nodes_file' (path to JSON file)", ], } # Rate limiter handled within workflowy_etch method due to recursive operations try: result = await client.workflowy_etch(parent_id, payload_nodes, replace_all=replace_all) # Annotate result to show where nodes came from (helps debugging real-world agent usage) if used_nodes_file: result.setdefault("_source", {})["nodes_file"] = nodes_file return result except Exception as e: # Top-level exception capture return { "success": False, "nodes_created": 0, "root_node_ids": [], "api_calls": 0, "retries": 0, "rate_limit_hits": 0, "errors": [f"An unexpected error occurred: {str(e)}"] } @mcp.tool( name="workflowy_etch_async", description="Start an async ETCH job (Workflowy node creation) and return a job_id for status polling.", ) async def etch_async( parent_id: str, nodes: list[dict] | str | None = None, replace_all: bool = False, nodes_file: str | None = None, ) -> dict: """Start ETCH as a background job and return a job_id.""" client = get_client() # Resolve nodes source for the background job payload_nodes: list[dict] | str | None = nodes if nodes_file: try: with open(nodes_file, "r", encoding="utf-8") as f: payload_nodes = f.read() except Exception as e: return { "success": False, "error": f"Failed to read nodes_file '{nodes_file}': {str(e)}", } elif isinstance(payload_nodes, str): # Agent convenience: if 'nodes' looks like a real JSON file path, try it as such. candidate = payload_nodes.strip() if os.path.exists(candidate) and candidate.lower().endswith(".json"): try: with open(candidate, "r", encoding="utf-8") as f: payload_nodes = f.read() nodes_file = candidate log_event( f"workflowy_etch_async: treating 'nodes' string as nodes_file path -> {candidate}", "ETCH_ASYNC", ) except Exception as e: # Fall back to treating it as JSON; let the ETCH parser report a clear error log_event( f"workflowy_etch_async: failed to read candidate nodes file '{candidate}': {e}", "ETCH_ASYNC", ) if payload_nodes is None: return { "success": False, "error": "Missing ETCH payload: provide either 'nodes' (list or stringified JSON) or 'nodes_file' (path to JSON file)", } async def run_etch(job_id: str) -> dict: # job_id reserved for future logging # Forward both the resolved payload_nodes and the optional nodes_file # into the client. workflowy_etch itself knows how to handle: # - list[dict] (already-parsed nodes) # - stringified JSON # - nodes_file path (object-with-nodes or raw string) return await client.workflowy_etch( parent_id=parent_id, nodes=payload_nodes, replace_all=replace_all, nodes_file=nodes_file, ) payload = { "parent_id": parent_id, "replace_all": replace_all, "nodes_file": nodes_file, } return await _start_background_job("etch", payload, run_etch) # Tool: List Nexus Keystones @mcp.tool( name="nexus_list_keystones", description="List all available NEXUS Keystone backups." ) def nexus_list_keystones() -> dict: """List all available NEXUS Keystone backups.""" client = get_client() return client.nexus_list_keystones() # Tool: Restore Nexus Keystone @mcp.tool( name="nexus_restore_keystone", description="Restore a Workflowy node tree from a NEXUS Keystone backup." ) async def nexus_restore_keystone(keystone_id: str) -> dict: """Restore a Workflowy node tree from a NEXUS Keystone backup.""" client = get_client() return await client.nexus_restore_keystone(keystone_id) # Tool: Purge Nexus Keystones @mcp.tool( name="nexus_purge_keystones", description="Delete one or more NEXUS Keystone backup files." ) def nexus_purge_keystones(keystone_ids: list[str]) -> dict: """Delete one or more NEXUS Keystone backup files.""" client = get_client() return client.nexus_purge_keystones(keystone_ids) # Resource: WorkFlowy Outline @mcp.resource( uri="workflowy://outline", name="workflowy_outline", description="The complete WorkFlowy outline structure", ) async def get_outline() -> str: """Get the complete WorkFlowy outline as a formatted string. Returns: Formatted string representation of the outline """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: # Get root nodes request = NodeListRequest( # type: ignore[call-arg] limit=1000, # Get many nodes ) nodes, _ = await client.list_nodes(request) if _rate_limiter: _rate_limiter.on_success() # Format outline def format_node(node: WorkFlowyNode, indent: int = 0) -> str: lines = [] prefix = " " * indent + "- " status = "[x] " if node.cp else "" lines.append(f"{prefix}{status}{node.nm or '(untitled)'}") if node.no: note_prefix = " " * (indent + 1) lines.append(f"{note_prefix}Note: {node.no}") if node.ch: for child in node.ch: lines.append(format_node(child, indent + 1)) return "\n".join(lines) outline_parts = [format_node(node) for node in nodes] return "\n".join(outline_parts) except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise if __name__ == "__main__": # ☢️ NUCLEAR LOGGING OPTION ☢️ # Redirect EVERYTHING to sys.stderr so it appears in MCP console import sys import logging # 1. Setup Root Logger to DEBUG root = logging.getLogger() root.setLevel(logging.DEBUG) # 2. Clear existing handlers to prevent duplicate/swallowed logs for h in root.handlers[:]: root.removeHandler(h) # 3. Create NUCLEAR StreamHandler to stderr handler = logging.StreamHandler(sys.stderr) # Simple format: [TIME] [LEVEL] Message handler.setFormatter(logging.Formatter('[%(asctime)s] [%(levelname)s] %(name)s: %(message)s', datefmt='%H:%M:%S')) root.addHandler(handler) # 4. Monkey-patch sys.stdout to redirect to stderr (so print() works) # This captures generic print() statements from any library class StderrRedirector: def write(self, message): if message.strip(): # Avoid empty newline spam sys.stderr.write(f"[STDOUT] {message}\n") def flush(self): sys.stderr.flush() sys.stdout = StderrRedirector() # 5. Log startup confirmation (via DAGGER logger + root logger) log_event("☢️ NUCLEAR LOGGING ACTIVE: DEBUG LEVEL ☢️", "SERVER") local_logger = _ClientLogger("SERVER") local_logger.error("STDERR TEST: This should appear in console (via logger.error)") print("Standard Output Redirection Test", file=sys.stderr) # Run the server mcp.run(transport="stdio")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/daniel347x/workflowy-mcp-fixed'

If you have feedback or need assistance with the MCP directory API, please join our Discord server