"""WorkFlowy MCP server implementation using FastMCP."""
import sys
from datetime import datetime
# Also log to file to debug deployment/environment
try:
with open(r"E:\__daniel347x\__Obsidian\__Inking into Mind\--TypingMind\Projects - All\Projects - Individual\TODO\temp\reconcile_debug.log", "a", encoding="utf-8") as f:
f.write(f"[{datetime.now().isoformat()}] DEBUG: Workflowy MCP Server loaded from {__file__}\n")
except Exception:
pass
print("DEBUG: Workflowy MCP Server loaded from " + __file__, file=sys.stderr)
import asyncio
import json
import logging
import os
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Literal, Any, Awaitable, Callable
from fastmcp import FastMCP
from .client import AdaptiveRateLimiter, WorkFlowyClient
from .config import ServerConfig, setup_logging
from .models import (
NodeCreateRequest,
NodeListRequest,
NodeUpdateRequest,
WorkFlowyNode,
)
class _ClientLogger:
"""Lightweight logger that delegates to _log / log_event.
Used in place of logging.getLogger(...) so existing logger.info /
logger.warning / logger.error calls feed into the DAGGER logger
instead of Python's logging module.
"""
def __init__(self, component: str = "SERVER") -> None:
self._component = component
def _msg(self, msg: object) -> str:
try:
return str(msg)
except Exception:
return repr(msg)
def info(self, msg: object, *args: object, **kwargs: object) -> None:
_log(self._msg(msg), self._component)
def warning(self, msg: object, *args: object, **kwargs: object) -> None:
_log(f"WARNING: {self._msg(msg)}", self._component)
def error(self, msg: object, *args: object, **kwargs: object) -> None:
_log(f"ERROR: {self._msg(msg)}", self._component)
def debug(self, msg: object, *args: object, **kwargs: object) -> None:
_log(f"DEBUG: {self._msg(msg)}", self._component)
def exception(self, msg: object, *args: object, **kwargs: object) -> None:
_log(f"EXCEPTION: {self._msg(msg)}", self._component)
logger = _ClientLogger("SERVER")
# Global client instance
_client: WorkFlowyClient | None = None
_rate_limiter: AdaptiveRateLimiter | None = None
# Global WebSocket connection for DOM cache
_ws_connection = None
_ws_server_task = None
_ws_message_queue = None # asyncio.Queue for message routing
# In-memory job registry for long-running operations (ETCH, NEXUS, etc.)
_jobs: dict[str, dict[str, Any]] = {}
_job_counter: int = 0
_job_lock: asyncio.Lock = asyncio.Lock()
# AI Dagger root (top-level for MCP workflows)
DAGGER_ROOT_ID = "b49affa1-3930-95e3-b2fe-ad9b881285e2"
def get_client() -> WorkFlowyClient:
"""Get the global WorkFlowy client instance."""
global _client
if _client is None:
raise RuntimeError("WorkFlowy client not initialized. Server not started properly.")
return _client
def get_ws_connection():
"""Get the current WebSocket connection and message queue (if any)."""
global _ws_connection, _ws_message_queue
return _ws_connection, _ws_message_queue
def log_event(message: str, component: str = "SERVER") -> None:
"""Log an event to stderr with timestamp and consistent formatting."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 🗡️ prefix makes it easy to grep/spot in the console
print(f"[{timestamp}] 🗡️ [{component}] {message}", file=sys.stderr, flush=True)
def _log(message: str, component: str = "SERVER") -> None:
"""Unified log wrapper used throughout this server.
Ensures all logging uses the DAGGER+DATETIME+TAG prefix and plain
print(..., file=sys.stderr), which reliably surfaces in the MCP
connector console (FastMCP tends to swallow standard logging output).
"""
log_event(message, component)
async def _resolve_uuid_path_and_respond(target_uuid: str | None, websocket, format_mode: str = "f3") -> None:
"""Resolve full ancestor path for target_uuid and send result back to extension.
Implementation note (v3):
- Uses /nodes-export cache to build parent map (all nodes in one call)
- Walks parent_id chain from leaf to root using cached data
- O(1) API call + O(depth) traversal = fast and reliable
- Decodes HTML entities (< > &) in node names for display
format_mode:
- "f3" (default): Compact mode. One line per node.
- "f2": Full/Verbose mode. Node name, then UUID on next line, then blank line.
"""
import html
client = get_client()
target = (target_uuid or "").strip()
if not target:
await websocket.send(json.dumps({
"action": "uuid_path_result",
"success": False,
"target_uuid": target_uuid,
"error": "No target_uuid provided",
}))
return
# Handle virtual UUIDs (generated by GLIMPSE for mirrored nodes)
# Format: virtual_<actual-uuid>_<parent-uuid>_<hash>
# For mirrors: show BOTH paths (actual node + mirror location context)
is_mirrored = False
mirror_parent_uuid = None
if target.startswith("virtual_"):
parts = target.split("_")
# parts[0] = "virtual", parts[1] = actual UUID, parts[2] = parent UUID, parts[3+] = hash
if len(parts) >= 3:
actual_uuid = parts[1]
mirror_parent_uuid = parts[2]
is_mirrored = True
log_event(f"Virtual UUID detected (mirrored node): actual={actual_uuid}, mirror_parent={mirror_parent_uuid}", "UUID_RES")
target = actual_uuid
elif len(parts) >= 2:
actual_uuid = parts[1]
log_event(f"Virtual UUID detected: {target} -> extracting actual UUID: {actual_uuid}", "UUID_RES")
target = actual_uuid
else:
await websocket.send(json.dumps({
"action": "uuid_path_result",
"success": False,
"target_uuid": target_uuid,
"error": f"Malformed virtual UUID: {target}",
}))
return
try:
# Fetch full account export (uses cache if available)
export_data = await client.export_nodes(node_id=None, use_cache=True)
all_nodes = export_data.get("nodes", []) or []
# Build node lookup by ID
nodes_by_id = {n.get("id"): n for n in all_nodes if n.get("id")}
# Check if target node's ancestor chain intersects dirty set
# If so, refresh cache before proceeding (lazy refresh on demand)
dirty_ids = client._nodes_export_dirty_ids
if dirty_ids and ("*" in dirty_ids or target in dirty_ids):
# Quick check: is target itself dirty or global dirty flag set?
needs_refresh = True
elif dirty_ids:
# Walk ancestor chain from target to see if any ancestor is dirty
needs_refresh = False
current = target
visited_check: set[str] = set()
while current and current not in visited_check:
visited_check.add(current)
if current in dirty_ids:
needs_refresh = True
break
node_dict = nodes_by_id.get(current)
if not node_dict:
break
current = node_dict.get("parent_id") or node_dict.get("parentId")
if needs_refresh:
log_event(f"Path from {target} intersects dirty IDs; refreshing cache", "UUID_RES")
export_data = await client.export_nodes(node_id=None, use_cache=False, force_refresh=True)
all_nodes = export_data.get("nodes", []) or []
nodes_by_id = {n.get("id"): n for n in all_nodes if n.get("id")}
# DEBUG: Log cache stats
log_event(f"Resolving path for target_uuid: {target} (cache: {len(nodes_by_id)} nodes)", "UUID_RES")
# If node not found in cache, try refreshing once before giving up
if target not in nodes_by_id:
log_event(f"Node {target} not found in cache; attempting refresh", "UUID_RES")
try:
export_data = await client.export_nodes(node_id=None, use_cache=False, force_refresh=True)
all_nodes = export_data.get("nodes", []) or []
nodes_by_id = {n.get("id"): n for n in all_nodes if n.get("id")}
log_event(f"Cache refreshed: {len(nodes_by_id)} nodes", "UUID_RES")
except Exception as refresh_err:
log_event(f"Cache refresh failed: {refresh_err}", "UUID_RES")
# Check again after refresh
if target not in nodes_by_id:
await websocket.send(json.dumps({
"action": "uuid_path_result",
"success": False,
"target_uuid": target_uuid,
"error": f"Node {target} not found (tried cache + refresh)",
}))
return
# Walk parent chain from target to root
path_nodes = []
visited: set[str] = set()
current_id: str | None = target
max_hops = 512
hops = 0
while current_id and current_id not in visited and hops < max_hops:
visited.add(current_id)
node_dict = nodes_by_id.get(current_id)
if not node_dict:
break
# DEBUG: Log each node found
parent_id = node_dict.get("parent_id") or node_dict.get("parentId")
raw_name = node_dict.get("nm") or node_dict.get("name") or "Untitled"
node_name = html.unescape(raw_name) # Decode < > & etc.
log_event(f"Found node: {current_id} (name: {node_name}), parent: {parent_id}", "UUID_RES")
# Store decoded name in dict for later use
node_dict["_decoded_name"] = node_name
path_nodes.append(node_dict)
current_id = parent_id
hops += 1
path_nodes.reverse()
# DEBUG: Log final path length
log_event(f"Resolved path length: {len(path_nodes)}", "UUID_RES")
if not path_nodes:
log_event(f"Path resolution returned empty for {target_uuid}", "UUID_RES")
await websocket.send(json.dumps({
"action": "uuid_path_result",
"success": False,
"target_uuid": target_uuid,
"error": f"Could not build path for node {target_uuid} (empty path after traversal)",
}))
return
lines: list[str] = []
# Determine which nodes are leaves vs ancestors for bullet selection
leaf_index = len(path_nodes) - 1
if format_mode == "f2":
# FULL / VERBOSE MODE (F2-style)
# Render name, then UUID, then blank line for every node
for depth, node_dict in enumerate(path_nodes):
# Use decoded name (HTML entities already unescaped)
name = node_dict.get("_decoded_name") or "Untitled"
node_id = node_dict.get("id") or ""
prefix = "#" * (depth + 1)
# Bullet: ⦿ for ancestors, • for leaf
bullet = "•" if depth == leaf_index else "⦿"
lines.append(f"{prefix} {bullet} {name}")
lines.append(f"`{node_id}`")
lines.append("")
# Ending block
lines.append("→ Use Leaf UUID:")
lines.append(f"`{target}`")
else:
# COMPACT MODE (F3-style / Default)
for depth, node_dict in enumerate(path_nodes):
# Use decoded name (HTML entities already unescaped)
name = node_dict.get("_decoded_name") or "Untitled"
prefix = "#" * (depth + 1)
# Bullet: ⦿ for ancestors, • for leaf
bullet = "•" if depth == leaf_index else "⦿"
lines.append(f"{prefix} {bullet} {name}")
lines.append("")
lines.append(f"→ `{target}`")
markdown = "\n".join(lines)
# If this was a mirrored node, append the mirror location context
if is_mirrored and mirror_parent_uuid:
# Build path for mirror parent to show where the mirror appears
mirror_path_nodes = []
current_id = mirror_parent_uuid
visited_mirror: set[str] = set()
hops_mirror = 0
while current_id and current_id not in visited_mirror and hops_mirror < 512:
visited_mirror.add(current_id)
node_dict = nodes_by_id.get(current_id)
if not node_dict:
break
# Decode name and skip "Untitled" nodes (Workflowy mirror detritus)
raw_name = node_dict.get("nm") or node_dict.get("name") or "Untitled"
decoded_name = html.unescape(raw_name)
if decoded_name.strip() != "Untitled":
# Store decoded name for later use
node_dict["_decoded_name"] = decoded_name
mirror_path_nodes.append(node_dict)
parent_id = node_dict.get("parent_id") or node_dict.get("parentId")
current_id = parent_id
hops_mirror += 1
mirror_path_nodes.reverse()
if mirror_path_nodes:
# Add separator and header
markdown += "\n\n---\n\n"
markdown += "**Mirror appears under:**\n\n"
# Generate mirror path markdown (same format as main path)
mirror_lines = []
leaf_index_mirror = len(mirror_path_nodes) - 1
if format_mode == "f2":
for depth, node_dict in enumerate(mirror_path_nodes):
name = node_dict.get("_decoded_name") or "Untitled"
node_id = node_dict.get("id") or ""
prefix = "#" * (depth + 1)
bullet = "•" if depth == leaf_index_mirror else "⦿"
mirror_lines.append(f"{prefix} {bullet} {name}")
mirror_lines.append(f"`{node_id}`")
mirror_lines.append("")
else:
for depth, node_dict in enumerate(mirror_path_nodes):
name = node_dict.get("_decoded_name") or "Untitled"
prefix = "#" * (depth + 1)
bullet = "•" if depth == leaf_index_mirror else "⦿"
mirror_lines.append(f"{prefix} {bullet} {name}")
markdown += "\n".join(mirror_lines)
# DEBUG: Log the complete markdown being sent (acts as historical log)
log_event(f"Markdown output:\n{markdown}", "UUID_RES")
# Also append to persistent UUID Explorer log file
try:
from datetime import datetime
log_path = r"E:\__daniel347x\__Obsidian\__Inking into Mind\--TypingMind\Projects - All\Projects - Individual\TODO\temp\uuid_and_glimpse_explorer\uuid_explorer.md"
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(log_path, "a", encoding="utf-8") as f:
f.write(f"## {timestamp}\n\n")
f.write(f"**Target UUID:** `{target}`\n\n")
f.write(markdown)
f.write("\n\n---\n\n")
except Exception as log_err:
# Never let logging failures affect UUID resolution
log_event(f"Failed to write UUID Explorer log: {log_err}", "UUID_RES")
await websocket.send(json.dumps({
"action": "uuid_path_result",
"success": True,
"target_uuid": target_uuid,
"markdown": markdown,
"path": [
{
"id": node_dict.get("id"),
"name": node_dict.get("_decoded_name") or "Untitled",
"parent_id": node_dict.get("parent_id") or node_dict.get("parentId"),
}
for node_dict in path_nodes
],
}))
except Exception as e: # noqa: BLE001
log_event(f"UUID path resolution error for {target_uuid}: {e}", "UUID_RES")
await websocket.send(json.dumps({
"action": "uuid_path_result",
"success": False,
"target_uuid": target_uuid,
"error": str(e),
}))
async def websocket_handler(websocket):
"""Handle WebSocket connections from Chrome extension.
Uses message queue pattern to avoid recv() conflicts.
Extension sends requests:
{"action": "extract_dom", "node_id": "uuid"}
Extension receives responses with DOM tree data.
"""
global _ws_connection, _ws_message_queue
log_event(f"WebSocket client connected from {websocket.remote_address}", "WS_HANDLER")
_ws_connection = websocket
_ws_message_queue = asyncio.Queue() # Fresh queue for this connection
try:
# Keep connection alive - wait for messages indefinitely
async for message in websocket:
try:
data = json.loads(message)
action = data.get('action')
log_event(f"WebSocket message received: {action}", "WS_HANDLER")
# Handle ping to keep connection alive
if action == 'ping':
await websocket.send(json.dumps({"action": "pong"}))
log_event("Sent pong response", "WS_HANDLER")
continue
# Handle UUID path resolution requests (UUID Navigator)
if action == 'resolve_uuid_path':
target_uuid = data.get('target_uuid') or data.get('uuid')
format_mode = data.get('format', 'f3') # Default to compact f3 mode
await _resolve_uuid_path_and_respond(target_uuid, websocket, format_mode)
continue
# Explicit cache refresh request from GLIMPSE client.
if action == 'refresh_nodes_export_cache':
try:
client = get_client()
result = await client.refresh_nodes_export_cache()
await websocket.send(json.dumps({
"action": "refresh_nodes_export_cache_result",
**result,
}))
log_event(f"Refreshed /nodes-export cache via WebSocket request: {result.get('node_count')} nodes", "WS_HANDLER")
except Exception as e:
log_event(f"Failed to refresh /nodes-export cache from WebSocket request: {e}", "WS_HANDLER")
try:
await websocket.send(json.dumps({
"action": "refresh_nodes_export_cache_result",
"success": False,
"error": str(e),
}))
except Exception:
# Best-effort only; don't crash handler
pass
continue
# Optional: mutation notifications from Workflowy desktop
# (e.g., extension sends notify_node_mutated when a node changes).
if action == 'notify_node_mutated':
# Preferred payload (v3.9.2+): "nodes" = [{"id": "...", "name": "..."}, ...]
# Legacy payloads: "node_ids" (list[str]) or "node_id" (str).
nodes_payload = data.get('nodes') or []
node_ids = data.get('node_ids') or data.get('node_id') or []
if isinstance(node_ids, str):
node_ids = [node_ids]
named_entries: list[str] = []
# If structured nodes are present, trust them and derive both
# IDs and names from that payload for better logging.
if isinstance(nodes_payload, list) and nodes_payload:
node_ids = []
for entry in nodes_payload:
if not isinstance(entry, dict):
continue
nid = entry.get('id') or entry.get('uuid')
name = entry.get('name')
if not nid:
continue
node_ids.append(nid)
if name is not None:
named_entries.append(f"{nid} ({name!r})")
else:
named_entries.append(str(nid))
else:
# Fallback: no structured nodes payload; log bare IDs only.
named_entries = [str(nid) for nid in node_ids]
try:
client = get_client()
client._mark_nodes_export_dirty(node_ids)
log_event(
"Marked mutated nodes as dirty in /nodes-export cache: "
+ ", ".join(named_entries),
"WS_HANDLER",
)
# NOTE: Cache refresh is LAZY - only triggered when UUID path is
# requested AND the path intersects a dirty ID. This prevents rate
# limiting from eager refreshes.
except Exception as e:
log_event(
f"Failed to mark /nodes-export cache dirty from WebSocket notification: {e}",
"WS_HANDLER",
)
continue
# Put all other messages in queue for workflowy_glimpse() to consume
await _ws_message_queue.put(data)
log_event(f"Message queued for processing: {action}", "WS_HANDLER")
except json.JSONDecodeError as e:
log_event(f"Invalid JSON from WebSocket: {e}", "WS_HANDLER")
except Exception as e:
log_event(f"WebSocket message error: {e}", "WS_HANDLER")
# If loop exits naturally, connection was closed by client
log_event("WebSocket client disconnected (connection closed by client)", "WS_HANDLER")
except Exception as e:
log_event(f"WebSocket connection closed with error: {e}", "WS_HANDLER")
finally:
if _ws_connection == websocket:
_ws_connection = None
_ws_message_queue = None
log_event("WebSocket client cleaned up", "WS_HANDLER")
async def start_websocket_server():
"""Start WebSocket server for Chrome extension communication."""
try:
import websockets
except ImportError:
log_event("websockets library not installed. WebSocket cache unavailable.", "WS_INIT")
log_event("Install with: pip install websockets", "WS_INIT")
return
log_event("Starting WebSocket server on ws://localhost:8765", "WS_INIT")
try:
async with websockets.serve(websocket_handler, "localhost", 8765) as server:
log_event("✅ WebSocket server listening on port 8765", "WS_INIT")
log_event("WebSocket server will accept connections indefinitely...", "WS_INIT")
# Keep server running forever
await asyncio.Event().wait()
except Exception as e:
log_event(f"WebSocket server failed to start: {e}", "WS_INIT")
log_event("GLIMPSE will fall back to API fetching", "WS_INIT")
@asynccontextmanager
async def lifespan(_app: FastMCP): # type: ignore[no-untyped-def]
"""Manage server lifecycle."""
global _client, _rate_limiter, _ws_server_task
# Setup
log_event("Starting WorkFlowy MCP server", "LIFESPAN")
# Load configuration
config = ServerConfig() # type: ignore[call-arg]
api_config = config.get_api_config()
# Initialize rate limiter (default 10 req/s)
_rate_limiter = AdaptiveRateLimiter(
initial_rate=10.0,
min_rate=1.0,
max_rate=100.0,
)
# Initialize client
_client = WorkFlowyClient(api_config)
log_event(f"WorkFlowy client initialized with base URL: {api_config.base_url}", "LIFESPAN")
# Start WebSocket server in background task
_ws_server_task = asyncio.create_task(start_websocket_server())
log_event("WebSocket server task created", "LIFESPAN")
yield
# Cleanup
log_event("Shutting down WorkFlowy MCP server", "LIFESPAN")
# Cancel WebSocket server
if _ws_server_task:
_ws_server_task.cancel()
try:
await _ws_server_task
except asyncio.CancelledError:
pass
log_event("WebSocket server stopped", "LIFESPAN")
if _client:
await _client.close()
_client = None
_rate_limiter = None
# Initialize FastMCP server
mcp = FastMCP(
"WorkFlowy MCP Server",
version="0.1.0",
instructions="MCP server for managing WorkFlowy outlines and nodes",
lifespan=lifespan,
)
# In-memory job management for long-running operations (ETCH, NEXUS, etc.)
async def _start_background_job(
kind: str,
payload: dict[str, Any],
coro_factory: Callable[[str], Awaitable[dict]],
) -> dict:
# Start a background job and return a lightweight handle.
global _job_counter, _jobs
async with _job_lock:
_job_counter += 1
job_id = f"{kind}-{_job_counter}"
_jobs[job_id] = {
"job_id": job_id,
"kind": kind,
"status": "pending", # pending | running | completed | failed | cancelling
"payload": payload,
"result": None,
"error": None,
"started_at": datetime.utcnow().isoformat(),
"finished_at": None,
"_task": None, # internal field, not exposed in status
}
async def runner() -> None:
try:
_jobs[job_id]["status"] = "running"
result = await coro_factory(job_id)
_jobs[job_id]["result"] = result
_jobs[job_id]["status"] = "completed" if result.get("success", True) else "failed"
except asyncio.CancelledError:
# Explicit cancellation via mcp_cancel_job
_jobs[job_id]["error"] = "CancelledError: Job was cancelled by user request"
_jobs[job_id]["status"] = "failed"
except Exception as e: # noqa: BLE001
_jobs[job_id]["error"] = f"{type(e).__name__}: {e}"
_jobs[job_id]["status"] = "failed"
finally:
_jobs[job_id]["finished_at"] = datetime.utcnow().isoformat()
task = asyncio.create_task(runner())
_jobs[job_id]["_task"] = task
return {
"success": True,
"job_id": job_id,
"status": "started",
"kind": kind,
}
@mcp.tool(
name="mcp_job_status",
description="Get status/result for long-running MCP jobs (ETCH, NEXUS, etc.).",
)
async def mcp_job_status(job_id: str | None = None) -> dict:
"""Get status for background jobs (in-memory + detached processes).
Scans both:
- In-memory asyncio jobs (_jobs registry)
- Detached WEAVE processes (via .weave.pid files in nexus_runs/)
"""
from .client.api_client import scan_active_weaves
import os
# Determine nexus_runs directory
# TODO: make this configurable or derive from client
nexus_runs_base = r"E:\__daniel347x\__Obsidian\__Inking into Mind\--TypingMind\Projects - All\Projects - Individual\TODO\temp\nexus_runs"
# Return status for one job (if job_id given) or all jobs
if job_id is None:
# List ALL jobs (in-memory + detached)
in_memory_jobs = []
for job in _jobs.values():
in_memory_jobs.append({
"job_id": job.get("job_id"),
"kind": job.get("kind"),
"status": job.get("status"),
"started_at": job.get("started_at"),
"finished_at": job.get("finished_at"),
"detached": False
})
detached_jobs = scan_active_weaves(nexus_runs_base)
return {
"success": True,
"in_memory_jobs": in_memory_jobs,
"detached_jobs": detached_jobs,
"total": len(in_memory_jobs) + len(detached_jobs)
}
# Check in-memory first
job = _jobs.get(job_id)
if job:
# Do not expose internal task handle
view = {k: v for k, v in job.items() if k not in ("payload", "_task")}
return {"success": True, **view}
# Check if this is a detached WEAVE job
if job_id.startswith("weave-enchanted-") or job_id.startswith("weave-direct-"):
# Scan for detached processes
detached_jobs = scan_active_weaves(nexus_runs_base)
for job in detached_jobs:
if job.get("job_id") == job_id:
return {"success": True, **job}
# Not found in nexus_runs - might be direct mode with PID elsewhere
# For now, return helpful error
return {
"success": False,
"error": f"Job {job_id} not found in memory or nexus_runs/. "
"It may have completed (check .weave_journal.json), been killed, or is a direct-mode job. "
"Call mcp_job_status() with no arguments to list all detached jobs."
}
return {"success": False, "error": f"Unknown job_id: {job_id}"}
@mcp.tool(
name="mcp_cancel_job",
description="Request cancellation of a long-running MCP job (ETCH, NEXUS, etc.).",
)
async def mcp_cancel_job(job_id: str) -> dict:
"""Attempt to cancel a background MCP job.
This sends an asyncio.CancelledError into the job task. The job will
transition to status='failed' with an error message indicating
cancellation.
"""
job = _jobs.get(job_id)
if not job:
return {"success": False, "error": f"Unknown job_id: {job_id}"}
task = job.get("_task")
if task is None:
return {"success": False, "error": "Job has no associated task (cannot cancel)."}
if task.done():
return {"success": False, "error": "Job already completed."}
# Mark as cancelling for visibility; runner will finalize status
job["status"] = "cancelling"
task.cancel()
return {"success": True, "job_id": job_id, "status": "cancelling"}
# 🔐 SECRET CODE VALIDATION - Brute Force Agent Training Override
def validate_secret_code(provided_code: str | None, function_name: str) -> tuple[bool, str | None]:
"""Validate secret code for WARNING functions.
This is the nuclear option - forces agents to ask Dan explicitly.
Returns:
(is_valid, error_message)
"""
import os
import secrets
SECRET_FILE = r"E:\__daniel347x\glimpse_etch.txt"
# Generate code if file doesn't exist
if not os.path.exists(SECRET_FILE):
code = secrets.token_hex(8) # 16-character hex code
with open(SECRET_FILE, 'w') as f:
f.write(code)
# Read current code
with open(SECRET_FILE, 'r') as f:
valid_code = f.read().strip()
# Check if code provided and matches
if provided_code and provided_code.strip() == valid_code:
return (True, None) # Valid - allow function call
# Invalid or missing - return error
error_msg = f"""⚠️ EXPLICIT PERMISSION REQUIRED
You called: {function_name}
This is a WARNING function. You should be using GLIMPSE or ETCH instead.
🛑 YOU BELIEVE YOU'RE AN EXCEPTION CASE.
🚨 YOU ARE NOT THE EXCEPTION.
✅ USE INSTEAD:
- For reading node trees: workflowy_glimpse(node_id="...")
- For creating nodes: workflowy_etch(parent_id="...", nodes=[...])
⚙️ IF YOU TRULY NEED THIS FUNCTION:
1. Ask Dan: "May I have permission to call {function_name}?"
2. Dan will check a secret file for the authorization code
3. If Dan grants permission, he'll provide the code
4. Call the SAME function again with the EXACT SAME parameters you used the first time, but add: secret_code="[code-from-Dan]" (do not change anything else).
Example (arguments object): {{ "name": "...", "parent_id": "...", "note": "...", "secret_code": "[code-from-Dan]" }}
🚫 IF DAN SAYS NO:
- You are NOT the exception
- Your training misled you
- Use GLIMPSE or ETCH as instructed
- Do NOT call this function again
📚 Break free from your training - build the GLIMPSE/ETCH habit!
"""
return (False, error_msg)
# Tool: Create Single Node (Base - Deprecated)
@mcp.tool(name="workflowy_create_single_node", description="DEPRECATED: Use workflowy_etch (ETCH) instead")
async def create_single_node_base(
name: str,
parent_id: str | None = None,
note: str | None = None,
) -> dict:
"""Deprecated - use ETCH instead."""
raise ValueError("""⚠️ FUNCTION RENAMED
The function 'workflowy_create_single_node' has been renamed to 'workflowy_create_single_node__WARNING__prefer_ETCH'.
BUT MORE IMPORTANTLY: Use workflowy_etch (ETCH command) instead!
✅ RECOMMENDED:
workflowy_etch(
parent_id="...",
nodes=[{"name": "Your node", "note": "...", "children": []}]
)
ETCH is better:
- Works for 1 node or 100 nodes
- Validation and auto-escaping built-in
- Same performance, more capability
📚 Build the ETCH habit!
""")
# Tool: Create Single Node (With Warning)
@mcp.tool(name="workflowy_create_single_node__WARNING__prefer_ETCH", description="⚠️ WARNING: Prefer workflowy_etch (ETCH) instead. This creates ONE node only.")
async def create_node(
name: str,
parent_id: str | None = None,
note: str | None = None,
layout_mode: Literal["bullets", "todo", "h1", "h2", "h3"] | None = None,
position: Literal["top", "bottom"] = "bottom",
_completed: bool = False,
secret_code: str | None = None,
) -> dict:
"""Create a SINGLE node in WorkFlowy.
⚠️ WARNING: Prefer workflowy_etch (ETCH) for creating 2+ nodes.
This tool is ONLY for:
- Adding one VYRTHEX to existing log (real-time work)
- One quick update to a known node
- Live work in progress
Args:
name: The text content of the node
parent_id: ID of the parent node (optional)
note: Additional note/description for the node
layout_mode: Layout mode for the node (bullets, todo, h1, h2, h3) (optional)
position: Where to place the new node - "bottom" (default) or "top"
_completed: Whether the node should be marked as completed (not used)
secret_code: Authorization code from Dan (required for WARNING functions)
Returns:
Dictionary with node data and warning message
"""
# 🔐 SECRET CODE VALIDATION
is_valid, error = validate_secret_code(secret_code, "workflowy_create_single_node__WARNING__prefer_ETCH")
if not is_valid:
raise ValueError(error)
client = get_client()
request = NodeCreateRequest( # type: ignore[call-arg]
name=name,
parent_id=parent_id,
note=note,
layoutMode=layout_mode,
position=position,
)
if _rate_limiter:
await _rate_limiter.acquire()
try:
node = await client.create_node(request)
if _rate_limiter:
_rate_limiter.on_success()
# Return node data with warning message
return {
**node.model_dump(),
"_warning": "⚠️ WARNING: You just created a SINGLE node. For 2+ nodes, use workflowy_etch instead (same performance, more capability)."
}
except Exception as e:
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
# Tool: Update Node
@mcp.tool(name="workflowy_update_node", description="Update an existing WorkFlowy node")
async def update_node(
node_id: str,
name: str | None = None,
note: str | None = None,
layout_mode: Literal["bullets", "todo", "h1", "h2", "h3"] | None = None,
_completed: bool | None = None,
) -> WorkFlowyNode:
"""Update an existing WorkFlowy node.
Args:
node_id: The ID of the node to update
name: New text content for the node (optional)
note: New note/description (optional)
layout_mode: New layout mode for the node (bullets, todo, h1, h2, h3) (optional)
_completed: New completion status (not used - use complete_node/uncomplete_node)
Returns:
The updated WorkFlowy node
"""
client = get_client()
request = NodeUpdateRequest( # type: ignore[call-arg]
name=name,
note=note,
layoutMode=layout_mode,
)
if _rate_limiter:
await _rate_limiter.acquire()
try:
node = await client.update_node(node_id, request)
if _rate_limiter:
_rate_limiter.on_success()
return node
except Exception as e:
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
# Tool: Get Node (Base - Deprecated)
@mcp.tool(name="workflowy_get_node", description="DEPRECATED: Use workflowy_glimpse (GLIMPSE) instead")
async def get_node_base(node_id: str) -> dict:
"""Deprecated - use GLIMPSE instead."""
raise ValueError("""⚠️ FUNCTION RENAMED
The function 'workflowy_get_node' has been renamed to 'workflowy_get_node__WARNING__prefer_glimpse'.
BUT MORE IMPORTANTLY: Use workflowy_glimpse (GLIMPSE command) instead!
✅ RECOMMENDED:
workflowy_glimpse(node_id="...")
Returns: {"root": {...}, "children": [...]} with complete tree structure.
GLIMPSE is better:
- Gets root node metadata (name, note)
- Gets full children tree (not just direct children)
- One call gets everything
📚 Build the GLIMPSE habit!
""")
# Tool: Get Node (With Warning)
@mcp.tool(name="workflowy_get_node__WARNING__prefer_glimpse", description="⚠️ WARNING: Prefer workflowy_glimpse (GLIMPSE) for reading trees. Retrieve a specific WorkFlowy node by ID")
async def get_node(
node_id: str,
secret_code: str | None = None,
) -> WorkFlowyNode:
"""Retrieve a specific WorkFlowy node.
Args:
node_id: The ID of the node to retrieve
secret_code: Authorization code from Dan (required for WARNING functions)
Returns:
The requested WorkFlowy node
"""
# 🔐 SECRET CODE VALIDATION
is_valid, error = validate_secret_code(secret_code, "workflowy_get_node__WARNING__prefer_glimpse")
if not is_valid:
raise ValueError(error)
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
node = await client.get_node(node_id)
if _rate_limiter:
_rate_limiter.on_success()
return node
except Exception as e:
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
# Tool: List Nodes (Base - Deprecated)
@mcp.tool(name="workflowy_list_nodes", description="DEPRECATED: Use workflowy_glimpse (GLIMPSE) instead")
async def list_nodes_base(parent_id: str | None = None) -> dict:
"""Deprecated - use GLIMPSE instead."""
raise ValueError("""⚠️ FUNCTION RENAMED
The function 'workflowy_list_nodes' has been renamed to 'workflowy_list_nodes__WARNING__prefer_glimpse'.
BUT MORE IMPORTANTLY: Use workflowy_glimpse (GLIMPSE command) instead!
✅ RECOMMENDED:
workflowy_glimpse(node_id="...")
Returns: {"root": {...}, "children": [...]} with complete tree structure.
GLIMPSE is better:
- Gets full nested tree (not just direct children)
- Gets root node metadata
- More efficient
📚 Build the GLIMPSE habit!
""")
# Tool: List Nodes (With Warning)
@mcp.tool(name="workflowy_list_nodes__WARNING__prefer_glimpse", description="⚠️ WARNING: Prefer workflowy_glimpse (GLIMPSE) for reading trees. List WorkFlowy nodes (omit parent_id for root)")
async def list_nodes(
parent_id: str | None = None,
secret_code: str | None = None,
) -> dict:
"""List WorkFlowy nodes.
Args:
parent_id: ID of parent node to list children for
(omit or pass None to list root nodes - parameter won't be sent to API)
secret_code: Authorization code from Dan (required for WARNING functions)
Returns:
Dictionary with 'nodes' list and 'total' count
"""
# 🔐 SECRET CODE VALIDATION
is_valid, error = validate_secret_code(secret_code, "workflowy_list_nodes__WARNING__prefer_glimpse")
if not is_valid:
raise ValueError(error)
client = get_client()
request = NodeListRequest( # type: ignore[call-arg]
parentId=parent_id,
)
if _rate_limiter:
await _rate_limiter.acquire()
try:
nodes, total = await client.list_nodes(request)
if _rate_limiter:
_rate_limiter.on_success()
return {
"nodes": [node.model_dump() for node in nodes],
"total": total,
"_warning": "⚠️ For reading multiple nodes or full trees, use workflowy_glimpse (GLIMPSE) instead for efficiency"
}
except Exception as e:
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
# Tool: Delete Node
@mcp.tool(name="workflowy_delete_node", description="Delete a WorkFlowy node and all its children")
async def delete_node(node_id: str) -> dict:
"""Delete a WorkFlowy node and all its children.
Args:
node_id: The ID of the node to delete
Returns:
Dictionary with success status
"""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
success = await client.delete_node(node_id)
if _rate_limiter:
_rate_limiter.on_success()
return {"success": success, "deleted_id": node_id}
except Exception as e:
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
# Tool: Complete Node
@mcp.tool(name="workflowy_complete_node", description="Mark a WorkFlowy node as completed")
async def complete_node(node_id: str) -> WorkFlowyNode:
"""Mark a WorkFlowy node as completed.
Args:
node_id: The ID of the node to complete
Returns:
The updated WorkFlowy node
"""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
node = await client.complete_node(node_id)
if _rate_limiter:
_rate_limiter.on_success()
return node
except Exception as e:
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
# Tool: Uncomplete Node
@mcp.tool(name="workflowy_uncomplete_node", description="Mark a WorkFlowy node as not completed")
async def uncomplete_node(node_id: str) -> WorkFlowyNode:
"""Mark a WorkFlowy node as not completed.
Args:
node_id: The ID of the node to uncomplete
Returns:
The updated WorkFlowy node
"""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
node = await client.uncomplete_node(node_id)
if _rate_limiter:
_rate_limiter.on_success()
return node
except Exception as e:
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
# Tool: Move Node
@mcp.tool(name="workflowy_move_node", description="Move a WorkFlowy node to a new parent")
async def move_node(
node_id: str,
parent_id: str | None = None,
position: str = "top",
) -> bool:
"""Move a node to a new parent.
Args:
node_id: The ID of the node to move
parent_id: The new parent node ID (UUID, target key like 'inbox', or None for root)
position: Where to place the node ('top' or 'bottom', default 'top')
Returns:
True if move was successful
"""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
success = await client.move_node(node_id, parent_id, position)
if _rate_limiter:
_rate_limiter.on_success()
return success
except Exception as e:
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
@mcp.tool(
name="nexus_glimpse",
description=(
"GLIMPSE → TERRAIN + PHANTOM GEM (zero API calls). "
"Captures what you've expanded in Workflowy via WebSocket GLIMPSE and creates both "
"coarse_terrain.json and phantom_gem.json from that single local extraction. "
"No Workflowy API calls, instant, you control granularity by expanding nodes."
),
)
async def nexus_glimpse(
nexus_tag: str,
workflowy_root_id: str,
reset_if_exists: bool = False,
mode: str = "full",
) -> dict[str, Any]:
"""GLIMPSE-based NEXUS initialization."""
client = get_client()
ws_conn, ws_queue = get_ws_connection() # Get WebSocket connection
return await client.nexus_glimpse(
nexus_tag=nexus_tag,
workflowy_root_id=workflowy_root_id,
reset_if_exists=reset_if_exists,
mode=mode,
_ws_connection=ws_conn,
_ws_queue=ws_queue,
)
# Tool: Export Nodes
@mcp.tool(name="workflowy_export_node", description="Export a WorkFlowy node with all its children")
async def export_node(
node_id: str | None = None,
) -> dict:
"""Export all nodes or filter to specific node's subtree.
Args:
node_id: ID of the node to export (omit to export all nodes).
If provided, exports only that node and all its descendants.
Returns:
Dictionary containing 'nodes' list with exported node data.
Rate limit: 1 request per minute for full export.
"""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
data = await client.export_nodes(node_id)
if _rate_limiter:
_rate_limiter.on_success()
return data
except Exception as e:
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
@mcp.tool(
name="workflowy_refresh_nodes_export_cache",
description=(
"Force a fresh /nodes-export snapshot and update the local cache used "
"by NEXUS and the UUID Navigator."
),
)
async def workflowy_refresh_nodes_export_cache() -> dict:
"""Explicitly refresh the cached /nodes-export snapshot.
This is primarily useful after large out-of-band edits in Workflowy
desktop, or when you want to be certain the cache reflects the latest
ETHER state before running NEXUS or UUID Navigator operations.
"""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
result = await client.refresh_nodes_export_cache()
if _rate_limiter:
_rate_limiter.on_success()
return result
except Exception as e: # noqa: BLE001
return {"success": False, "error": str(e)}
# PHANTOM GEMSTONE NEXUS – High-level MCP tools
@mcp.tool(
name="nexus_scry",
description=(
"INITIATE a CORINTHIAN NEXUS on the ETHER: perform a COARSE SCRY of Workflowy "
"under a root to reveal a limited TERRAIN (a new geography named by your "
"NEXUS TAG). Choose max_depth and child_limit carefully—keep them minimal. "
"Optionally set max_nodes to guard against accidental 1M-node SCRYs. "
"Later, you will IGNITE the ETHER more deeply on selected SHARDS."
),
)
async def nexus_scry(
nexus_tag: str,
workflowy_root_id: str,
max_depth: int,
child_limit: int,
reset_if_exists: bool = False,
max_nodes: int | None = None,
) -> dict:
"""Tag-scoped SCRY the ETHER under a root to create a coarse TERRAIN bound to a NEXUS TAG.
This reveals a limited TERRAIN—a new geography named by your NEXUS TAG.
Keep the SCRY shallow: choose max_depth and child_limit carefully. Use
max_nodes (when non-None) as a hard upper bound on SCRY size; if the tree
would exceed this many nodes, the SCRY aborts with a clear error instead of
exporting a massive JSON bundle.
Unlike ``nexus_glimpse(mode="full")``, which may legitimately produce
T0 = S0 = T1 in one step (because Dan’s UI expansion already encodes the
GEM/SHIMMERING decision), this tag-scoped SCRY is explicitly **T0-only**:
it writes only ``coarse_terrain.json``. S0 and T1 are introduced later via
``nexus_ignite_shards`` and ``nexus_anchor_gems``.
"""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
result = await client.nexus_scry(
nexus_tag=nexus_tag,
workflowy_root_id=workflowy_root_id,
max_depth=max_depth,
child_limit=child_limit,
reset_if_exists=reset_if_exists,
max_nodes=max_nodes,
)
if _rate_limiter:
_rate_limiter.on_success()
return result
except Exception as e: # noqa: BLE001
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
@mcp.tool(
name="nexus_ignite_shards",
description=(
"IGNITE selected SHARDS so the ETHER glows more deeply around them, revealing "
"deeper layers (but not necessarily to FULL depth). The deeper revelation is "
"captured as a PHANTOM GEM (S0), an unrefracted witness of those subtrees."
),
)
async def nexus_ignite_shards(
nexus_tag: str,
root_ids: list[str],
max_depth: int | None = None,
child_limit: int | None = None,
per_root_limits: dict[str, dict[str, int]] | None = None,
) -> dict:
"""IGNITE SHARDS in the TERRAIN so the ETHER glows more deeply around them.
From an existing TERRAIN, mark specific nodes as SHARDS and IGNITE them. The
ETHER glows around these SHARDS, revealing deeper layers (but not necessarily
to full depth). The revealed structure is condensed into a PHANTOM GEM (S0).
"""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
result = await client.nexus_ignite_shards(
nexus_tag=nexus_tag,
root_ids=root_ids,
max_depth=max_depth,
child_limit=child_limit,
per_root_limits=per_root_limits,
)
if _rate_limiter:
_rate_limiter.on_success()
return result
except Exception as e: # noqa: BLE001
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
@mcp.tool(
name="nexus_anchor_gems",
description=(
"Let the PHANTOM GEM ILLUMINATE the TRUE GEMS that were ALWAYS PRESENT in "
"the TERRAIN but not yet revealed: where SHARDS were marked, the TERRAIN "
"now shimmers with deeper revealed structure (FIRST IMBUE—NOTHING CHANGES "
"in the ETHER). The PHANTOM GEM remains a REFLECTION: an untouched witness."
),
)
async def nexus_anchor_gems(
nexus_tag: str,
) -> dict:
"""ANCHOR the PHANTOM GEM into the TERRAIN to create SHIMMERING TERRAIN.
The PHANTOM GEM now illuminates the TRUE GEMS that were always present in
the TERRAIN but not yet revealed. Where SHARDS were marked, the TERRAIN now
shimmers with deeper revealed structure (FIRST IMBUE—Workflowy remains
untouched). The PHANTOM GEM stays as an unrefracted witness.
"""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
result = await client.nexus_anchor_gems(nexus_tag=nexus_tag)
if _rate_limiter:
_rate_limiter.on_success()
return result
except Exception as e: # noqa: BLE001
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
@mcp.tool(
name="nexus_anchor_jewels",
description=(
"Anchor the PHANTOM JEWELS (S1) within the SHIMMERING TERRAIN (T1), "
"transmuting the REVEALED GEMS into NEW JEWELS that are an exact "
"impregnation of the PHANTOM JEWELS. The TERRAIN becomes ENCHANTED (SECOND "
"IMBUE), with the PHANTOM GEM (S0) as witness to the ORIGINAL state. The "
"ENCHANTED TERRAIN is now EMBODIED and REAL as JSON—Workflowy remains "
"untouched until WEAVE."
),
)
async def nexus_anchor_jewels(
nexus_tag: str,
) -> dict:
"""ANCHOR PHANTOM JEWELS into SHIMMERING TERRAIN to create ENCHANTED TERRAIN.
This performs the SECOND IMBUE: anchoring PHANTOM JEWELS (S1) within the
SHIMMERING TERRAIN (T1), transmuting the REVEALED GEMS into NEW JEWELS that
are an exact impregnation of the PHANTOM JEWELS. The TERRAIN becomes
ENCHANTED, with the PHANTOM GEM (S0) as witness to the original state. The
ENCHANTED TERRAIN is real as JSON, but the ETHER (Workflowy) is still
untouched.
"""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
result = await client.nexus_anchor_jewels(nexus_tag=nexus_tag)
if _rate_limiter:
_rate_limiter.on_success()
return result
except Exception as e: # noqa: BLE001
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
@mcp.tool(
name="nexus_transform_jewel",
description=(
"Apply JEWELSTORM semantic operations to a NEXUS working_gem JSON file "
"(PHANTOM GEM working copy). This is the semantic analogue of edit_file "
"for PHANTOM GEM JSON: MOVE_NODE, DELETE_NODE, RENAME_NODE, SET_NOTE, "
"SET_ATTRS, CREATE_NODE, all referencing nodes by jewel_id."
),
)
def nexus_transform_jewel(
jewel_file: str,
operations: list[dict[str, Any]],
dry_run: bool = False,
stop_on_error: bool = True,
) -> dict:
"""JEWELSTORM transform on a NEXUS working_gem JSON file.
This tool wraps nexus_json_tools.transform_jewel via the WorkFlowyClient,
providing an MCP-friendly interface for JEWELSTORM operations.
Args:
jewel_file: Absolute path to working_gem JSON (typically a QUILLSTRIKE
working stone derived from phantom_gem.json).
operations: List of operation dicts. Each must include an "op" key and
operation-specific fields (e.g., jewel_id, parent_jewel_id,
position, etc.).
dry_run: If True, simulate only (no file write).
stop_on_error: If True, abort on first error (no write).
Returns:
Result dict from transform_jewel with success flag, counts, and errors.
"""
client = get_client()
return client.nexus_transform_jewel(
jewel_file=jewel_file,
operations=operations,
dry_run=dry_run,
stop_on_error=stop_on_error,
)
@mcp.tool(
name="nexus_weave_enchanted_async",
description=(
"Start an async NEXUS ENCHANTED WEAVE job (WEAVE T2 back into Workflowy ETHER) "
"and return a job_id for status polling and cancellation."
),
)
async def nexus_weave_enchanted_async(
nexus_tag: str,
dry_run: bool = False,
) -> dict:
"""Start ENCHANTED TERRAIN weave as a detached background process.
Launches weave_worker.py as a separate process that survives MCP restart.
Progress tracked via .weave.pid and .weave_journal.json files.
Use mcp_job_status() to monitor progress (scans directory for active PIDs).
"""
client = get_client()
# Use detached launcher (survives MCP restart)
return client.nexus_weave_enchanted_detached(nexus_tag=nexus_tag, dry_run=dry_run)
@mcp.tool(
name="nexus_start_exploration",
description=(
"Initialize an exploration session over a Workflowy subtree and return "
"an initial frontier of handles for agent-driven navigation. Set editable=True "
"to enable in-session note/tag edits that will be reflected in the phantom gem."
),
)
async def nexus_start_exploration(
nexus_tag: str,
root_id: str,
source_mode: str = "glimpse_full",
max_nodes: int = 200000,
session_hint: str | None = None,
frontier_size: int = 25,
max_depth_per_frontier: int = 1,
editable: bool = False,
) -> dict:
"""Start an exploration session over a Workflowy subtree."""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
result = await client.nexus_start_exploration(
nexus_tag=nexus_tag,
root_id=root_id,
source_mode=source_mode,
max_nodes=max_nodes,
session_hint=session_hint,
frontier_size=frontier_size,
max_depth_per_frontier=max_depth_per_frontier,
editable=editable,
)
if _rate_limiter:
_rate_limiter.on_success()
return result
except Exception as e: # noqa: BLE001
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
@mcp.tool(
name="nexus_explore_step",
description=(
"Apply exploration decisions and multi-ray walks to an exploration session "
"and return the next frontiers. Use 'decisions' for accept/reject/finalize/etc "
"and 'walks' for multi-origin ray expansion."
),
)
async def nexus_explore_step(
session_id: str,
decisions: list[dict[str, Any]] | None = None,
walks: list[dict[str, Any]] | None = None,
max_parallel_walks: int = 4,
global_frontier_limit: int = 80,
include_history_summary: bool = True,
) -> dict:
"""v2 Exploration API: separate DECISIONS from WALKs and support multi-ray steps.
Request:
- decisions: [{ "handle": "H_1", "action": "accept_leaf", ... }, ...]
- walks: [{ "origin": "H_10", "max_steps": 2 }, ...]
- max_parallel_walks: cap on how many rays are walked in this call
- global_frontier_limit: total nodes across all rays
Response:
{
"session_id": ...,
"walks": [
{
"origin": "H_10",
"requested_max_steps": 2,
"complete": false,
"frontier": [...]
},
...
],
"skipped_walks": [...],
"decisions_applied": [...],
"scratchpad": "...",
"history_summary": {...}
}
"""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
result = await client.nexus_explore_step_v2(
session_id=session_id,
decisions=decisions,
walks=walks,
max_parallel_walks=max_parallel_walks,
global_frontier_limit=global_frontier_limit,
include_history_summary=include_history_summary,
)
if _rate_limiter:
_rate_limiter.on_success()
return result
except Exception as e: # noqa: BLE001
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
@mcp.tool(
name="nexus_list_exploration_sessions",
description="List all exploration sessions (optionally filter by nexus_tag).",
)
def nexus_list_exploration_sessions(
nexus_tag: str | None = None,
) -> dict:
"""List all exploration sessions.
Args:
nexus_tag: Optional filter - only show sessions for this tag
Returns:
List of session metadata (session_id, nexus_tag, timestamps, steps, mode, etc.)
"""
client = get_client()
return client.nexus_list_exploration_sessions(nexus_tag=nexus_tag)
@mcp.tool(
name="nexus_resume_exploration",
description=(
"Resume an exploration session after MCP restart or in new conversation. "
"Provide either session_id (exact) or nexus_tag (finds latest session for tag)."
),
)
async def nexus_resume_exploration(
session_id: str | None = None,
nexus_tag: str | None = None,
frontier_size: int = 25,
include_history_summary: bool = True,
) -> dict:
"""Resume an exploration session from persisted state.
Args:
session_id: Exact session ID to resume (takes precedence)
nexus_tag: Alternative - finds latest session for this tag
frontier_size: How many frontier entries to return
include_history_summary: Include status summary
Returns:
Current session state with frontier (same format as nexus_explore_step)
"""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
result = await client.nexus_resume_exploration(
session_id=session_id,
nexus_tag=nexus_tag,
frontier_size=frontier_size,
include_history_summary=include_history_summary,
)
if _rate_limiter:
_rate_limiter.on_success()
return result
except Exception as e: # noqa: BLE001
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
@mcp.tool(
name="nexus_finalize_exploration",
description=(
"Finalize an exploration session into coarse_terrain.json (always) plus "
"optional phantom_gem.json + shimmering_terrain.json for use with NEXUS "
"JEWELSTORM and WEAVE."
),
)
async def nexus_finalize_exploration(
session_id: str,
mode: Literal["terrain_only", "full"] = "full",
) -> dict:
"""Finalize an exploration session into coarse TERRAIN and optional GEM/SHIMMERING."""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
result = await client.nexus_finalize_exploration(
session_id=session_id,
mode=mode,
)
if _rate_limiter:
_rate_limiter.on_success()
return result
except Exception as e: # noqa: BLE001
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
# Tool: Generate Markdown from JSON
@mcp.tool(
name="generate_markdown_from_json",
description="Convert exported/edited JSON to Markdown format (without metadata)."
)
def generate_markdown(
json_file: str,
) -> dict:
"""Convert JSON file to Markdown format.
Use after editing JSON with Quill scroll to create Markdown for PARALLAX review.
Args:
json_file: Absolute path to JSON file.
Returns:
Dictionary with success status and markdown file path.
"""
client = get_client()
return client.generate_markdown_from_json(json_file)
# Tool: GLIMPSE (Read Node Trees)
@mcp.tool(
name="workflowy_glimpse",
description="Load entire node tree into context (no file intermediary). GLIMPSE command for direct context loading. Optional output_file writes TERRAIN export (WebSocket+API merge with full NEXUS semantics)."
)
async def glimpse(
node_id: str,
output_file: str | None = None,
) -> dict:
"""Load entire node tree into agent context.
GLIMPSE command - efficient context loading for agent analysis.
Tries WebSocket DOM extraction first (if Chrome extension connected).
Falls back to API fetch if WebSocket unavailable.
Args:
node_id: Root node UUID to read from
output_file: Optional absolute path; if provided, write a TERRAIN-style
export package using shared nexus_helper functions (WebSocket GLIMPSE +
API SCRY merged for has_hidden_children / children_status annotations,
original_ids_seen ledger, full NEXUS TERRAIN format)
Returns:
When output_file is None: Dictionary with root metadata, children tree, node count, depth, source
When output_file is provided: Compact summary with terrain_file, markdown_file, stats
"""
client = get_client()
ws_conn, ws_queue = get_ws_connection() # Check if extension is connected
if _rate_limiter:
await _rate_limiter.acquire()
try:
# Pass WebSocket connection, queue, AND output_file to client method
result = await client.workflowy_glimpse(node_id, output_file=output_file, _ws_connection=ws_conn, _ws_queue=ws_queue)
if _rate_limiter:
_rate_limiter.on_success()
return result
except Exception as e:
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
# Tool: GLIMPSE FULL (Force API Fetch)
@mcp.tool(
name="workflowy_scry",
description="Load entire node tree via API (bypass WebSocket). Use when Key Files doesn't have parent UUID for ETCH, or when Dan wants complete tree regardless of expansion state."
)
async def glimpse_full(
node_id: str,
depth: int | None = None,
size_limit: int = 1000,
output_file: str | None = None,
) -> dict:
"""Load entire node tree via full API fetch (bypass WebSocket).
Thin wrapper around workflowy_glimpse that forces API fetch.
Use when:
- Agent needs to hunt for parent UUIDs not in Key Files
- Dan wants complete node tree regardless of expansion
- WebSocket selective extraction not needed
Args:
node_id: Root node UUID to read from
depth: Maximum depth to traverse (1=direct children only, 2=two levels, None=full tree)
size_limit: Maximum number of nodes to return (default 1000, raises error if exceeded)
Returns:
Same format as workflowy_glimpse with _source="api"
"""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
# Call glimpse_full on client (bypasses WebSocket by design)
result = await client.workflowy_scry(
node_id,
depth=depth,
size_limit=size_limit,
output_file=output_file,
)
if _rate_limiter:
_rate_limiter.on_success()
return result
except Exception as e:
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
# Tool: ETCH (Write Node Trees)
@mcp.tool(
name="workflowy_etch",
description="Create multiple nodes from JSON structure (no file intermediary). ETCH command for direct node creation."
)
async def etch(
parent_id: str,
nodes: list[dict] | str,
replace_all: bool = False,
) -> dict:
"""Create multiple nodes from JSON structure.
ETCH command - simple additive node creation (no UUIDs, no updates/moves).
Fallback: If this fails, use INSCRIBE scroll (write_file → bulk_import).
DEFAULT: Additive (skip existing by name, add new children only)
REPLACE: Wipe all children, create fresh
For complex operations (moves/updates with UUIDs): Use NEXUS scroll instead.
Args:
parent_id: Parent UUID where nodes should be created
nodes: List of node objects (NO UUIDs - just name/note/children)
replace_all: If True, delete ALL existing children first. Default False.
Returns:
Dictionary with success status, nodes created, skipped (if append_only), API call stats, and errors
"""
client = get_client()
# Rate limiter handled within workflowy_etch method due to recursive operations
try:
result = await client.workflowy_etch(parent_id, nodes, replace_all=replace_all)
return result
except Exception as e:
# Top-level exception capture
return {
"success": False,
"nodes_created": 0,
"root_node_ids": [],
"api_calls": 0,
"retries": 0,
"rate_limit_hits": 0,
"errors": [f"An unexpected error occurred: {str(e)}"]
}
@mcp.tool(
name="workflowy_etch_async",
description="Start an async ETCH job (Workflowy node creation) and return a job_id for status polling.",
)
async def etch_async(
parent_id: str,
nodes: list[dict] | str,
replace_all: bool = False,
) -> dict:
"""Start ETCH as a background job and return a job_id."""
client = get_client()
async def run_etch(job_id: str) -> dict: # job_id reserved for future logging
return await client.workflowy_etch(parent_id, nodes, replace_all=replace_all)
payload = {
"parent_id": parent_id,
"replace_all": replace_all,
}
return await _start_background_job("etch", payload, run_etch)
# Tool: List Nexus Keystones
@mcp.tool(
name="nexus_list_keystones",
description="List all available NEXUS Keystone backups."
)
def nexus_list_keystones() -> dict:
"""List all available NEXUS Keystone backups."""
client = get_client()
return client.nexus_list_keystones()
# Tool: Restore Nexus Keystone
@mcp.tool(
name="nexus_restore_keystone",
description="Restore a Workflowy node tree from a NEXUS Keystone backup."
)
async def nexus_restore_keystone(keystone_id: str) -> dict:
"""Restore a Workflowy node tree from a NEXUS Keystone backup."""
client = get_client()
return await client.nexus_restore_keystone(keystone_id)
# Tool: Purge Nexus Keystones
@mcp.tool(
name="nexus_purge_keystones",
description="Delete one or more NEXUS Keystone backup files."
)
def nexus_purge_keystones(keystone_ids: list[str]) -> dict:
"""Delete one or more NEXUS Keystone backup files."""
client = get_client()
return client.nexus_purge_keystones(keystone_ids)
# Resource: WorkFlowy Outline
@mcp.resource(
uri="workflowy://outline",
name="workflowy_outline",
description="The complete WorkFlowy outline structure",
)
async def get_outline() -> str:
"""Get the complete WorkFlowy outline as a formatted string.
Returns:
Formatted string representation of the outline
"""
client = get_client()
if _rate_limiter:
await _rate_limiter.acquire()
try:
# Get root nodes
request = NodeListRequest( # type: ignore[call-arg]
limit=1000, # Get many nodes
)
nodes, _ = await client.list_nodes(request)
if _rate_limiter:
_rate_limiter.on_success()
# Format outline
def format_node(node: WorkFlowyNode, indent: int = 0) -> str:
lines = []
prefix = " " * indent + "- "
status = "[x] " if node.cp else ""
lines.append(f"{prefix}{status}{node.nm or '(untitled)'}")
if node.no:
note_prefix = " " * (indent + 1)
lines.append(f"{note_prefix}Note: {node.no}")
if node.ch:
for child in node.ch:
lines.append(format_node(child, indent + 1))
return "\n".join(lines)
outline_parts = [format_node(node) for node in nodes]
return "\n".join(outline_parts)
except Exception as e:
if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
_rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
raise
if __name__ == "__main__":
# ☢️ NUCLEAR LOGGING OPTION ☢️
# Redirect EVERYTHING to sys.stderr so it appears in MCP console
import sys
import logging
# 1. Setup Root Logger to DEBUG
root = logging.getLogger()
root.setLevel(logging.DEBUG)
# 2. Clear existing handlers to prevent duplicate/swallowed logs
for h in root.handlers[:]:
root.removeHandler(h)
# 3. Create NUCLEAR StreamHandler to stderr
handler = logging.StreamHandler(sys.stderr)
# Simple format: [TIME] [LEVEL] Message
handler.setFormatter(logging.Formatter('[%(asctime)s] [%(levelname)s] %(name)s: %(message)s', datefmt='%H:%M:%S'))
root.addHandler(handler)
# 4. Monkey-patch sys.stdout to redirect to stderr (so print() works)
# This captures generic print() statements from any library
class StderrRedirector:
def write(self, message):
if message.strip(): # Avoid empty newline spam
sys.stderr.write(f"[STDOUT] {message}\n")
def flush(self):
sys.stderr.flush()
sys.stdout = StderrRedirector()
# 5. Log startup confirmation (via DAGGER logger + root logger)
log_event("☢️ NUCLEAR LOGGING ACTIVE: DEBUG LEVEL ☢️", "SERVER")
local_logger = _ClientLogger("SERVER")
local_logger.error("STDERR TEST: This should appear in console (via logger.error)")
print("Standard Output Redirection Test", file=sys.stderr)
# Run the server
mcp.run(transport="stdio")