Skip to main content
Glama
write.py9.71 kB
"""mcp_write tool implementation.""" from ..copyq_client import client from ..errors import ( MCPCopyQError, InvalidModeError, TabNotFoundError, PermissionDeniedError, MissingParamError, ) from ..models import ( WriteMode, EditMode, FieldType, Intent, MCP_ROOT, ALLOWED_ROOT_TABS, TABS_WITH_SUBTABS, PREVIEW_LENGTH, ) async def mcp_write( mode: str, tab: str | None = None, index: int | None = None, text: str | None = None, tags: list[str] | None = None, note: str | None = None, field: str | None = None, edit_mode: str = "replace", match: str | None = None, to_tab: str | None = None, path: str | None = None, intent: str = "execute", ) -> str: """ Universal write operation for CopyQ MCP. Modes: - add: Add new item to tab - update: Update existing item - delete: Delete item - move: Move item to another tab - tab_create: Create new subtab (workspace only) - tab_delete: Delete subtab (workspace only) intent: - execute: Perform the operation - preview: Show what would happen without executing Returns compact pipe-separated format. """ try: # Validate mode valid_modes = [m.value for m in WriteMode] if mode not in valid_modes: raise InvalidModeError(mode, valid_modes) is_preview = intent == "preview" if mode == "add": return await _write_add(tab, text, tags, note, is_preview) elif mode == "update": return await _write_update( tab, index, field, edit_mode, text, tags, note, match, is_preview ) elif mode == "delete": return await _write_delete(tab, index, is_preview) elif mode == "move": return await _write_move(tab, index, to_tab, is_preview) elif mode == "tab_create": return await _write_tab_create(path, is_preview) elif mode == "tab_delete": return await _write_tab_delete(path, is_preview) return "error|INVALID_MODE|Unknown mode" except MCPCopyQError as e: return e.to_response() except Exception as e: return f"error|INTERNAL|{str(e)}" def _check_tab_permission(tab: str, operation: str) -> None: """Check if operation is allowed on tab.""" # Tab operations (create/delete) only allowed in workspace if operation in ("tab_create", "tab_delete"): if not tab.startswith("workspace"): raise PermissionDeniedError(operation, tab) async def _write_add( tab: str | None, text: str | None, tags: list[str] | None, note: str | None, is_preview: bool, ) -> str: """Add new item to tab.""" if not tab: raise MissingParamError("tab", "add") if not text: raise MissingParamError("text", "add") # Check tab exists (will be created if not) if not await client.tab_exists(tab): # Auto-create only for existing root tabs root = tab.split("/")[0] if root not in ALLOWED_ROOT_TABS: raise TabNotFoundError(tab) if is_preview: text_preview = text[:PREVIEW_LENGTH] + "..." if len(text) > PREVIEW_LENGTH else text tags_str = f"[{','.join(tags)}]" if tags else "[]" note_preview = (note[:50] + "...") if note and len(note) > 50 else (note or "") return ( f"preview|mode:add\n" f"tab:{tab}\n" f"will_add:\n" f" text:\"{text_preview}\"\n" f" tags:{tags_str}\n" f" note:\"{note_preview}\"" ) new_index = await client.add_item(tab, text, tags, note) return ( f"ok|mode:add|tab:{tab}|full_path:{MCP_ROOT}/{tab}|index:{new_index}|" f"text_len:{len(text)}|tags:{len(tags) if tags else 0}|note:{bool(note)}" ) async def _write_update( tab: str | None, index: int | None, field: str | None, edit_mode: str, text: str | None, tags: list[str] | None, note: str | None, match: str | None, is_preview: bool, ) -> str: """Update existing item.""" if not tab: raise MissingParamError("tab", "update") if index is None: raise MissingParamError("index", "update") if not field: raise MissingParamError("field", "update") # Validate field valid_fields = [f.value for f in FieldType] if field not in valid_fields: from ..errors import MCPCopyQError, ErrorCode raise MCPCopyQError(ErrorCode.INVALID_PARAM, f"Invalid field '{field}'. Valid: {', '.join(valid_fields)}") # Check tab exists if not await client.tab_exists(tab): raise TabNotFoundError(tab) # Get current item for preview current = await client.read_item_full(tab, index) if is_preview: if field == "text": old_val = current["text"][:100] + "..." if len(current["text"]) > 100 else current["text"] new_val = text[:100] + "..." if text and len(text) > 100 else text elif field == "tags": old_val = str(current["tags"]) new_val = str(tags) elif field == "note": old_val = current["note"][:100] + "..." if len(current["note"]) > 100 else current["note"] new_val = note[:100] + "..." if note and len(note) > 100 else note else: old_val = "?" new_val = "?" return ( f"preview|mode:update\n" f"tab:{tab}|index:{index}|field:{field}|edit_mode:{edit_mode}\n" f"old:{old_val}\n" f"new:{new_val}" ) # Execute update if field == "text": if text is None: raise MissingParamError("text", "update (field=text)") await client.update_text(tab, index, text, edit_mode, match) elif field == "tags": if tags is None: raise MissingParamError("tags", "update (field=tags)") await client.update_tags(tab, index, tags, edit_mode) elif field == "note": if note is None: raise MissingParamError("note", "update (field=note)") await client.update_note(tab, index, note, edit_mode, match) return f"ok|mode:update|tab:{tab}|full_path:{MCP_ROOT}/{tab}|index:{index}|field:{field}" async def _write_delete( tab: str | None, index: int | None, is_preview: bool, ) -> str: """Delete item.""" if not tab: raise MissingParamError("tab", "delete") if index is None: raise MissingParamError("index", "delete") # Check tab exists if not await client.tab_exists(tab): raise TabNotFoundError(tab) # Get item for preview item = await client.read_item_full(tab, index) if is_preview: text_preview = item["text"][:100] + "..." if len(item["text"]) > 100 else item["text"] tags_str = f"[{','.join(item['tags'])}]" if item["tags"] else "[]" note_str = "+note" if item["note"] else "" return ( f"preview|mode:delete\n" f"tab:{tab}|index:{index}\n" f"will_delete:\n" f" text:\"{text_preview}\"\n" f" tags:{tags_str} {note_str}" ) await client.remove_item(tab, index) return f"ok|mode:delete|tab:{tab}|full_path:{MCP_ROOT}/{tab}|index:{index}" async def _write_move( tab: str | None, index: int | None, to_tab: str | None, is_preview: bool, ) -> str: """Move item to another tab.""" if not tab: raise MissingParamError("tab", "move") if index is None: raise MissingParamError("index", "move") if not to_tab: raise MissingParamError("to_tab", "move") # Check tabs exist if not await client.tab_exists(tab): raise TabNotFoundError(tab) # Get item for preview item = await client.read_item_full(tab, index) if is_preview: text_preview = item["text"][:80] + "..." if len(item["text"]) > 80 else item["text"] return ( f"preview|mode:move\n" f"from:{tab}[{index}] -> to:{to_tab}[0]\n" f"item:\"{text_preview}\"" ) new_index = await client.move_item(tab, index, to_tab) return f"ok|mode:move|from:{MCP_ROOT}/{tab}|to:{MCP_ROOT}/{to_tab}|new_index:{new_index}" async def _write_tab_create( path: str | None, is_preview: bool, ) -> str: """Create new subtab.""" if not path: raise MissingParamError("path", "tab_create") # Check permission _check_tab_permission(path, "tab_create") # Check if already exists if await client.tab_exists(path): return f"ok|mode:tab_create|path:{path}|full_path:{MCP_ROOT}/{path}|status:already_exists" if is_preview: return ( f"preview|mode:tab_create\n" f"will_create:{MCP_ROOT}/{path}" ) await client.create_tab(path) return f"ok|mode:tab_create|path:{path}|full_path:{MCP_ROOT}/{path}|status:created" async def _write_tab_delete( path: str | None, is_preview: bool, ) -> str: """Delete subtab.""" if not path: raise MissingParamError("path", "tab_delete") # Check permission _check_tab_permission(path, "tab_delete") # Check exists if not await client.tab_exists(path): raise TabNotFoundError(path) # Get count for preview count = await client.get_count(path) if is_preview: return ( f"preview|mode:tab_delete\n" f"will_delete:{MCP_ROOT}/{path}\n" f"items_count:{count}" ) await client.remove_tab(path) return f"ok|mode:tab_delete|path:{path}|full_path:{MCP_ROOT}/{path}|items_removed:{count}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/list91/mcp-copyq'

If you have feedback or need assistance with the MCP directory API, please join our Discord server