write.py•9.71 kB
"""mcp_write tool implementation."""
from ..copyq_client import client
from ..errors import (
MCPCopyQError,
InvalidModeError,
TabNotFoundError,
PermissionDeniedError,
MissingParamError,
)
from ..models import (
WriteMode,
EditMode,
FieldType,
Intent,
MCP_ROOT,
ALLOWED_ROOT_TABS,
TABS_WITH_SUBTABS,
PREVIEW_LENGTH,
)
async def mcp_write(
mode: str,
tab: str | None = None,
index: int | None = None,
text: str | None = None,
tags: list[str] | None = None,
note: str | None = None,
field: str | None = None,
edit_mode: str = "replace",
match: str | None = None,
to_tab: str | None = None,
path: str | None = None,
intent: str = "execute",
) -> str:
"""
Universal write operation for CopyQ MCP.
Modes:
- add: Add new item to tab
- update: Update existing item
- delete: Delete item
- move: Move item to another tab
- tab_create: Create new subtab (workspace only)
- tab_delete: Delete subtab (workspace only)
intent:
- execute: Perform the operation
- preview: Show what would happen without executing
Returns compact pipe-separated format.
"""
try:
# Validate mode
valid_modes = [m.value for m in WriteMode]
if mode not in valid_modes:
raise InvalidModeError(mode, valid_modes)
is_preview = intent == "preview"
if mode == "add":
return await _write_add(tab, text, tags, note, is_preview)
elif mode == "update":
return await _write_update(
tab, index, field, edit_mode, text, tags, note, match, is_preview
)
elif mode == "delete":
return await _write_delete(tab, index, is_preview)
elif mode == "move":
return await _write_move(tab, index, to_tab, is_preview)
elif mode == "tab_create":
return await _write_tab_create(path, is_preview)
elif mode == "tab_delete":
return await _write_tab_delete(path, is_preview)
return "error|INVALID_MODE|Unknown mode"
except MCPCopyQError as e:
return e.to_response()
except Exception as e:
return f"error|INTERNAL|{str(e)}"
def _check_tab_permission(tab: str, operation: str) -> None:
"""Check if operation is allowed on tab."""
# Tab operations (create/delete) only allowed in workspace
if operation in ("tab_create", "tab_delete"):
if not tab.startswith("workspace"):
raise PermissionDeniedError(operation, tab)
async def _write_add(
tab: str | None,
text: str | None,
tags: list[str] | None,
note: str | None,
is_preview: bool,
) -> str:
"""Add new item to tab."""
if not tab:
raise MissingParamError("tab", "add")
if not text:
raise MissingParamError("text", "add")
# Check tab exists (will be created if not)
if not await client.tab_exists(tab):
# Auto-create only for existing root tabs
root = tab.split("/")[0]
if root not in ALLOWED_ROOT_TABS:
raise TabNotFoundError(tab)
if is_preview:
text_preview = text[:PREVIEW_LENGTH] + "..." if len(text) > PREVIEW_LENGTH else text
tags_str = f"[{','.join(tags)}]" if tags else "[]"
note_preview = (note[:50] + "...") if note and len(note) > 50 else (note or "")
return (
f"preview|mode:add\n"
f"tab:{tab}\n"
f"will_add:\n"
f" text:\"{text_preview}\"\n"
f" tags:{tags_str}\n"
f" note:\"{note_preview}\""
)
new_index = await client.add_item(tab, text, tags, note)
return (
f"ok|mode:add|tab:{tab}|full_path:{MCP_ROOT}/{tab}|index:{new_index}|"
f"text_len:{len(text)}|tags:{len(tags) if tags else 0}|note:{bool(note)}"
)
async def _write_update(
tab: str | None,
index: int | None,
field: str | None,
edit_mode: str,
text: str | None,
tags: list[str] | None,
note: str | None,
match: str | None,
is_preview: bool,
) -> str:
"""Update existing item."""
if not tab:
raise MissingParamError("tab", "update")
if index is None:
raise MissingParamError("index", "update")
if not field:
raise MissingParamError("field", "update")
# Validate field
valid_fields = [f.value for f in FieldType]
if field not in valid_fields:
from ..errors import MCPCopyQError, ErrorCode
raise MCPCopyQError(ErrorCode.INVALID_PARAM, f"Invalid field '{field}'. Valid: {', '.join(valid_fields)}")
# Check tab exists
if not await client.tab_exists(tab):
raise TabNotFoundError(tab)
# Get current item for preview
current = await client.read_item_full(tab, index)
if is_preview:
if field == "text":
old_val = current["text"][:100] + "..." if len(current["text"]) > 100 else current["text"]
new_val = text[:100] + "..." if text and len(text) > 100 else text
elif field == "tags":
old_val = str(current["tags"])
new_val = str(tags)
elif field == "note":
old_val = current["note"][:100] + "..." if len(current["note"]) > 100 else current["note"]
new_val = note[:100] + "..." if note and len(note) > 100 else note
else:
old_val = "?"
new_val = "?"
return (
f"preview|mode:update\n"
f"tab:{tab}|index:{index}|field:{field}|edit_mode:{edit_mode}\n"
f"old:{old_val}\n"
f"new:{new_val}"
)
# Execute update
if field == "text":
if text is None:
raise MissingParamError("text", "update (field=text)")
await client.update_text(tab, index, text, edit_mode, match)
elif field == "tags":
if tags is None:
raise MissingParamError("tags", "update (field=tags)")
await client.update_tags(tab, index, tags, edit_mode)
elif field == "note":
if note is None:
raise MissingParamError("note", "update (field=note)")
await client.update_note(tab, index, note, edit_mode, match)
return f"ok|mode:update|tab:{tab}|full_path:{MCP_ROOT}/{tab}|index:{index}|field:{field}"
async def _write_delete(
tab: str | None,
index: int | None,
is_preview: bool,
) -> str:
"""Delete item."""
if not tab:
raise MissingParamError("tab", "delete")
if index is None:
raise MissingParamError("index", "delete")
# Check tab exists
if not await client.tab_exists(tab):
raise TabNotFoundError(tab)
# Get item for preview
item = await client.read_item_full(tab, index)
if is_preview:
text_preview = item["text"][:100] + "..." if len(item["text"]) > 100 else item["text"]
tags_str = f"[{','.join(item['tags'])}]" if item["tags"] else "[]"
note_str = "+note" if item["note"] else ""
return (
f"preview|mode:delete\n"
f"tab:{tab}|index:{index}\n"
f"will_delete:\n"
f" text:\"{text_preview}\"\n"
f" tags:{tags_str} {note_str}"
)
await client.remove_item(tab, index)
return f"ok|mode:delete|tab:{tab}|full_path:{MCP_ROOT}/{tab}|index:{index}"
async def _write_move(
tab: str | None,
index: int | None,
to_tab: str | None,
is_preview: bool,
) -> str:
"""Move item to another tab."""
if not tab:
raise MissingParamError("tab", "move")
if index is None:
raise MissingParamError("index", "move")
if not to_tab:
raise MissingParamError("to_tab", "move")
# Check tabs exist
if not await client.tab_exists(tab):
raise TabNotFoundError(tab)
# Get item for preview
item = await client.read_item_full(tab, index)
if is_preview:
text_preview = item["text"][:80] + "..." if len(item["text"]) > 80 else item["text"]
return (
f"preview|mode:move\n"
f"from:{tab}[{index}] -> to:{to_tab}[0]\n"
f"item:\"{text_preview}\""
)
new_index = await client.move_item(tab, index, to_tab)
return f"ok|mode:move|from:{MCP_ROOT}/{tab}|to:{MCP_ROOT}/{to_tab}|new_index:{new_index}"
async def _write_tab_create(
path: str | None,
is_preview: bool,
) -> str:
"""Create new subtab."""
if not path:
raise MissingParamError("path", "tab_create")
# Check permission
_check_tab_permission(path, "tab_create")
# Check if already exists
if await client.tab_exists(path):
return f"ok|mode:tab_create|path:{path}|full_path:{MCP_ROOT}/{path}|status:already_exists"
if is_preview:
return (
f"preview|mode:tab_create\n"
f"will_create:{MCP_ROOT}/{path}"
)
await client.create_tab(path)
return f"ok|mode:tab_create|path:{path}|full_path:{MCP_ROOT}/{path}|status:created"
async def _write_tab_delete(
path: str | None,
is_preview: bool,
) -> str:
"""Delete subtab."""
if not path:
raise MissingParamError("path", "tab_delete")
# Check permission
_check_tab_permission(path, "tab_delete")
# Check exists
if not await client.tab_exists(path):
raise TabNotFoundError(path)
# Get count for preview
count = await client.get_count(path)
if is_preview:
return (
f"preview|mode:tab_delete\n"
f"will_delete:{MCP_ROOT}/{path}\n"
f"items_count:{count}"
)
await client.remove_tab(path)
return f"ok|mode:tab_delete|path:{path}|full_path:{MCP_ROOT}/{path}|items_removed:{count}"